LCOV - code coverage report
Current view: top level - pageserver/src - task_mgr.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 62.2 % 251 156
Test Date: 2024-11-25 17:48:16 Functions: 22.8 % 101 23

            Line data    Source code
       1              : //!
       2              : //! This module provides centralized handling of tokio tasks in the Page Server.
       3              : //!
       4              : //! We provide a few basic facilities:
       5              : //! - A global registry of tasks that lists what kind of tasks they are, and
       6              : //!   which tenant or timeline they are working on
       7              : //!
       8              : //! - The ability to request a task to shut down.
       9              : //!
      10              : //!
      11              : //! # How it works?
      12              : //!
      13              : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
      14              : //! task is spawned, a PageServerTask entry is added there, and when a
      15              : //! task dies, it removes itself from the hashmap. If you want to kill a
      16              : //! task, you can scan the hashmap to find it.
      17              : //!
      18              : //! # Task shutdown
      19              : //!
      20              : //! To kill a task, we rely on co-operation from the victim. Each task is
      21              : //! expected to periodically call the `is_shutdown_requested()` function, and
      22              : //! if it returns true, exit gracefully. In addition to that, when waiting for
      23              : //! the network or other long-running operation, you can use
      24              : //! `shutdown_watcher()` function to get a Future that will become ready if
      25              : //! the current task has been requested to shut down. You can use that with
      26              : //! Tokio select!().
      27              : //!
      28              : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
      29              : //! Depending on what task panics, we might want to kill the whole server, or
      30              : //! only a single tenant or timeline.
      31              : //!
      32              : 
      33              : use std::collections::HashMap;
      34              : use std::fmt;
      35              : use std::future::Future;
      36              : use std::num::NonZeroUsize;
      37              : use std::panic::AssertUnwindSafe;
      38              : use std::str::FromStr;
      39              : use std::sync::atomic::{AtomicU64, Ordering};
      40              : use std::sync::{Arc, Mutex};
      41              : 
      42              : use futures::FutureExt;
      43              : use pageserver_api::shard::TenantShardId;
      44              : use tokio::task::JoinHandle;
      45              : use tokio::task_local;
      46              : use tokio_util::sync::CancellationToken;
      47              : 
      48              : use tracing::{debug, error, info, warn};
      49              : 
      50              : use once_cell::sync::Lazy;
      51              : 
      52              : use utils::env;
      53              : use utils::id::TimelineId;
      54              : 
      55              : use crate::metrics::set_tokio_runtime_setup;
      56              : 
      57              : //
      58              : // There are four runtimes:
      59              : //
      60              : // Compute request runtime
      61              : //  - used to handle connections from compute nodes. Any tasks related to satisfying
      62              : //    GetPage requests, base backups, import, and other such compute node operations
      63              : //    are handled by the Compute request runtime
      64              : //  - page_service.rs
      65              : //  - this includes layer downloads from remote storage, if a layer is needed to
      66              : //    satisfy a GetPage request
      67              : //
      68              : // Management request runtime
      69              : //  - used to handle HTTP API requests
      70              : //
      71              : // WAL receiver runtime:
      72              : //  - used to handle WAL receiver connections.
      73              : //  - and to receiver updates from storage_broker
      74              : //
      75              : // Background runtime
      76              : //  - layer flushing
      77              : //  - garbage collection
      78              : //  - compaction
      79              : //  - remote storage uploads
      80              : //  - initial tenant loading
      81              : //
      82              : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
      83              : // runtime.
      84              : //
      85              : // There might be situations when one task needs to wait for a task running in another
      86              : // Runtime to finish. For example, if a background operation needs a layer from remote
      87              : // storage, it will start to download it. If a background operation needs a remote layer,
      88              : // and the download was already initiated by a GetPage request, the background task
      89              : // will wait for the download - running in the Page server runtime - to finish.
      90              : // Another example: the initial tenant loading tasks are launched in the background ops
      91              : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
      92              : // GetPage request will wait for the tenant load to finish.
      93              : //
      94              : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
      95              : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
      96              : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
      97              : //
      98              : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
      99              : // time each class of operations is taking, with 'top -H' or similar.
     100              : //
     101              : // It's also good to avoid hogging all threads that would be needed to process
     102              : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
     103              : // happen, but still.
     104              : //
     105              : 
     106          168 : pub(crate) static TOKIO_WORKER_THREADS: Lazy<NonZeroUsize> = Lazy::new(|| {
     107          168 :     // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
     108          168 :     // tokio would had already panicked for parsing errors or NotUnicode
     109          168 :     //
     110          168 :     // this will be wrong if any of the runtimes gets their worker threads configured to something
     111          168 :     // else, but that has not been needed in a long time.
     112          168 :     NonZeroUsize::new(
     113          168 :         std::env::var("TOKIO_WORKER_THREADS")
     114          168 :             .map(|s| s.parse::<usize>().unwrap())
     115          168 :             .unwrap_or_else(|_e| usize::max(2, num_cpus::get())),
     116          168 :     )
     117          168 :     .expect("the max() ensures that this is not zero")
     118          168 : });
     119              : 
     120              : enum TokioRuntimeMode {
     121              :     SingleThreaded,
     122              :     MultiThreaded { num_workers: NonZeroUsize },
     123              : }
     124              : 
     125              : impl FromStr for TokioRuntimeMode {
     126              :     type Err = String;
     127              : 
     128            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     129            0 :         match s {
     130            0 :             "current_thread" => Ok(TokioRuntimeMode::SingleThreaded),
     131            0 :             s => match s.strip_prefix("multi_thread:") {
     132            0 :                 Some("default") => Ok(TokioRuntimeMode::MultiThreaded {
     133            0 :                     num_workers: *TOKIO_WORKER_THREADS,
     134            0 :                 }),
     135            0 :                 Some(suffix) => {
     136            0 :                     let num_workers = suffix.parse::<NonZeroUsize>().map_err(|e| {
     137            0 :                         format!(
     138            0 :                             "invalid number of multi-threaded runtime workers ({suffix:?}): {e}",
     139            0 :                         )
     140            0 :                     })?;
     141            0 :                     Ok(TokioRuntimeMode::MultiThreaded { num_workers })
     142              :                 }
     143            0 :                 None => Err(format!("invalid runtime config: {s:?}")),
     144              :             },
     145              :         }
     146            0 :     }
     147              : }
     148              : 
     149          168 : static TOKIO_THREAD_STACK_SIZE: Lazy<NonZeroUsize> = Lazy::new(|| {
     150          168 :     env::var("NEON_PAGESERVER_TOKIO_THREAD_STACK_SIZE")
     151          168 :         // the default 2MiB are insufficent, especially in debug mode
     152          168 :         .unwrap_or_else(|| NonZeroUsize::new(4 * 1024 * 1024).unwrap())
     153          168 : });
     154              : 
     155          168 : static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
     156          168 :     let thread_name = "pageserver-tokio";
     157          168 :     let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else {
     158              :         // If the env var is not set, leave this static as None.
     159          168 :         set_tokio_runtime_setup(
     160          168 :             "multiple-runtimes",
     161          168 :             NUM_MULTIPLE_RUNTIMES
     162          168 :                 .checked_mul(*TOKIO_WORKER_THREADS)
     163          168 :                 .unwrap(),
     164          168 :         );
     165          168 :         return None;
     166              :     };
     167            0 :     Some(match mode {
     168              :         TokioRuntimeMode::SingleThreaded => {
     169            0 :             set_tokio_runtime_setup("one-runtime-single-threaded", NonZeroUsize::new(1).unwrap());
     170            0 :             tokio::runtime::Builder::new_current_thread()
     171            0 :                 .thread_name(thread_name)
     172            0 :                 .enable_all()
     173            0 :                 .thread_stack_size(TOKIO_THREAD_STACK_SIZE.get())
     174            0 :                 .build()
     175            0 :                 .expect("failed to create one single runtime")
     176              :         }
     177            0 :         TokioRuntimeMode::MultiThreaded { num_workers } => {
     178            0 :             set_tokio_runtime_setup("one-runtime-multi-threaded", num_workers);
     179            0 :             tokio::runtime::Builder::new_multi_thread()
     180            0 :                 .thread_name(thread_name)
     181            0 :                 .enable_all()
     182            0 :                 .worker_threads(num_workers.get())
     183            0 :                 .thread_stack_size(TOKIO_THREAD_STACK_SIZE.get())
     184            0 :                 .build()
     185            0 :                 .expect("failed to create one multi-threaded runtime")
     186              :         }
     187              :     })
     188          168 : });
     189              : 
     190              : /// Declare a lazy static variable named `$varname` that will resolve
     191              : /// to a tokio runtime handle. If the env var `NEON_PAGESERVER_USE_ONE_RUNTIME`
     192              : /// is set, this will resolve to `ONE_RUNTIME`. Otherwise, the macro invocation
     193              : /// declares a separate runtime and the lazy static variable `$varname`
     194              : /// will resolve to that separate runtime.
     195              : ///
     196              : /// The result is is that `$varname.spawn()` will use `ONE_RUNTIME` if
     197              : /// `NEON_PAGESERVER_USE_ONE_RUNTIME` is set, and will use the separate runtime
     198              : /// otherwise.
     199              : macro_rules! pageserver_runtime {
     200              :     ($varname:ident, $name:literal) => {
     201          178 :         pub static $varname: Lazy<&'static tokio::runtime::Runtime> = Lazy::new(|| {
     202          178 :             if let Some(runtime) = &*ONE_RUNTIME {
     203            0 :                 return runtime;
     204          178 :             }
     205          178 :             static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
     206          178 :                 tokio::runtime::Builder::new_multi_thread()
     207          178 :                     .thread_name($name)
     208          178 :                     .worker_threads(TOKIO_WORKER_THREADS.get())
     209          178 :                     .enable_all()
     210          178 :                     .thread_stack_size(TOKIO_THREAD_STACK_SIZE.get())
     211          178 :                     .build()
     212          178 :                     .expect(std::concat!("Failed to create runtime ", $name))
     213          178 :             });
     214          178 :             &*RUNTIME
     215          178 :         });
     216              :     };
     217              : }
     218              : 
     219              : pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
     220              : pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
     221              : pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
     222              : pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
     223              : // Bump this number when adding a new pageserver_runtime!
     224              : // SAFETY: it's obviously correct
     225              : const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
     226              : 
     227              : #[derive(Debug, Clone, Copy)]
     228              : pub struct PageserverTaskId(u64);
     229              : 
     230              : impl fmt::Display for PageserverTaskId {
     231            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     232            0 :         self.0.fmt(f)
     233            0 :     }
     234              : }
     235              : 
     236              : /// Each task that we track is associated with a "task ID". It's just an
     237              : /// increasing number that we assign. Note that it is different from tokio::task::Id.
     238              : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
     239              : 
     240              : /// Global registry of tasks
     241              : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
     242          172 :     Lazy::new(|| Mutex::new(HashMap::new()));
     243              : 
     244              : task_local! {
     245              :     // This is a cancellation token which will be cancelled when a task needs to shut down. The
     246              :     // root token is kept in the global registry, so that anyone can send the signal to request
     247              :     // task shutdown.
     248              :     static SHUTDOWN_TOKEN: CancellationToken;
     249              : 
     250              :     // Each task holds reference to its own PageServerTask here.
     251              :     static CURRENT_TASK: Arc<PageServerTask>;
     252              : }
     253              : 
     254              : ///
     255              : /// There are many kinds of tasks in the system. Some are associated with a particular
     256              : /// tenant or timeline, while others are global.
     257              : ///
     258              : /// Note that we don't try to limit how many task of a certain kind can be running
     259              : /// at the same time.
     260              : ///
     261              : #[derive(
     262              :     Debug,
     263              :     // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
     264            0 :     enumset::EnumSetType,
     265              :     enum_map::Enum,
     266              :     serde::Serialize,
     267            0 :     serde::Deserialize,
     268         3060 :     strum_macros::IntoStaticStr,
     269            0 :     strum_macros::EnumString,
     270              : )]
     271              : pub enum TaskKind {
     272              :     // Pageserver startup, i.e., `main`
     273              :     Startup,
     274              : 
     275              :     // libpq listener task. It just accepts connection and spawns a
     276              :     // PageRequestHandler task for each connection.
     277              :     LibpqEndpointListener,
     278              : 
     279              :     // HTTP endpoint listener.
     280              :     HttpEndpointListener,
     281              : 
     282              :     // Task that handles a single connection. A PageRequestHandler task
     283              :     // starts detached from any particular tenant or timeline, but it can be
     284              :     // associated with one later, after receiving a command from the client.
     285              :     PageRequestHandler,
     286              : 
     287              :     /// Manages the WAL receiver connection for one timeline.
     288              :     /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
     289              :     /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
     290              :     /// There is at most one connection at any given time.
     291              :     ///
     292              :     /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
     293              :     /// The `Client` object is what library users use to make requests & get responses.
     294              :     /// Internally, `Client` hands over requests to the `Connection` object.
     295              :     /// The `Connection` object is responsible for speaking the wire protocol.
     296              :     ///
     297              :     /// Walreceiver uses a legacy abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
     298              :     /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
     299              :     /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
     300              :     ///
     301              :     /// Once the connection is established, the `TaskHandle` task spawns a
     302              :     /// [`WalReceiverConnectionPoller`] task that is responsible for polling
     303              :     /// the `Connection` object.
     304              :     /// A `CancellationToken` created by the `TaskHandle` task ensures
     305              :     /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
     306              :     ///
     307              :     /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
     308              :     /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
     309              :     WalReceiverManager,
     310              : 
     311              :     /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
     312              :     /// See the comment on [`WalReceiverManager`].
     313              :     ///
     314              :     /// [`WalReceiverManager`]: Self::WalReceiverManager
     315              :     WalReceiverConnectionHandler,
     316              : 
     317              :     /// The task that polls the `tokio-postgres::Connection` object.
     318              :     /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
     319              :     /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
     320              :     WalReceiverConnectionPoller,
     321              : 
     322              :     // Garbage collection worker. One per tenant
     323              :     GarbageCollector,
     324              : 
     325              :     // Compaction. One per tenant.
     326              :     Compaction,
     327              : 
     328              :     // Eviction. One per timeline.
     329              :     Eviction,
     330              : 
     331              :     // Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure)
     332              :     IngestHousekeeping,
     333              : 
     334              :     /// See [`crate::disk_usage_eviction_task`].
     335              :     DiskUsageEviction,
     336              : 
     337              :     /// See [`crate::tenant::secondary`].
     338              :     SecondaryDownloads,
     339              : 
     340              :     /// See [`crate::tenant::secondary`].
     341              :     SecondaryUploads,
     342              : 
     343              :     // Initial logical size calculation
     344              :     InitialLogicalSizeCalculation,
     345              : 
     346              :     OndemandLogicalSizeCalculation,
     347              : 
     348              :     // Task that flushes frozen in-memory layers to disk
     349              :     LayerFlushTask,
     350              : 
     351              :     // Task that uploads a file to remote storage
     352              :     RemoteUploadTask,
     353              : 
     354              :     // task that handles the initial downloading of all tenants
     355              :     InitialLoad,
     356              : 
     357              :     // task that handles attaching a tenant
     358              :     Attach,
     359              : 
     360              :     // Used mostly for background deletion from s3
     361              :     TimelineDeletionWorker,
     362              : 
     363              :     // task that handhes metrics collection
     364              :     MetricsCollection,
     365              : 
     366              :     // task that drives downloading layers
     367              :     DownloadAllRemoteLayers,
     368              :     // Task that calculates synthetis size for all active tenants
     369              :     CalculateSyntheticSize,
     370              : 
     371              :     // A request that comes in via the pageserver HTTP API.
     372              :     MgmtRequest,
     373              : 
     374              :     DebugTool,
     375              : 
     376              :     EphemeralFilePreWarmPageCache,
     377              : 
     378              :     LayerDownload,
     379              : 
     380              :     #[cfg(test)]
     381              :     UnitTest,
     382              : 
     383              :     DetachAncestor,
     384              : 
     385              :     ImportPgdata,
     386              : }
     387              : 
     388              : #[derive(Default)]
     389              : struct MutableTaskState {
     390              :     /// Handle for waiting for the task to exit. It can be None, if the
     391              :     /// the task has already exited.
     392              :     join_handle: Option<JoinHandle<()>>,
     393              : }
     394              : 
     395              : struct PageServerTask {
     396              :     task_id: PageserverTaskId,
     397              : 
     398              :     kind: TaskKind,
     399              : 
     400              :     name: String,
     401              : 
     402              :     // To request task shutdown, just cancel this token.
     403              :     cancel: CancellationToken,
     404              : 
     405              :     /// Tasks may optionally be launched for a particular tenant/timeline, enabling
     406              :     /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
     407              :     tenant_shard_id: TenantShardId,
     408              :     timeline_id: Option<TimelineId>,
     409              : 
     410              :     mutable: Mutex<MutableTaskState>,
     411              : }
     412              : 
     413              : /// Launch a new task
     414              : /// Note: if shutdown_process_on_error is set to true failure
     415              : ///   of the task will lead to shutdown of entire process
     416         3603 : pub fn spawn<F>(
     417         3603 :     runtime: &tokio::runtime::Handle,
     418         3603 :     kind: TaskKind,
     419         3603 :     tenant_shard_id: TenantShardId,
     420         3603 :     timeline_id: Option<TimelineId>,
     421         3603 :     name: &str,
     422         3603 :     future: F,
     423         3603 : ) -> PageserverTaskId
     424         3603 : where
     425         3603 :     F: Future<Output = anyhow::Result<()>> + Send + 'static,
     426         3603 : {
     427         3603 :     let cancel = CancellationToken::new();
     428         3603 :     let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
     429         3603 :     let task = Arc::new(PageServerTask {
     430         3603 :         task_id: PageserverTaskId(task_id),
     431         3603 :         kind,
     432         3603 :         name: name.to_string(),
     433         3603 :         cancel: cancel.clone(),
     434         3603 :         tenant_shard_id,
     435         3603 :         timeline_id,
     436         3603 :         mutable: Mutex::new(MutableTaskState { join_handle: None }),
     437         3603 :     });
     438         3603 : 
     439         3603 :     TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
     440         3603 : 
     441         3603 :     let mut task_mut = task.mutable.lock().unwrap();
     442         3603 : 
     443         3603 :     let task_name = name.to_string();
     444         3603 :     let task_cloned = Arc::clone(&task);
     445         3603 :     let join_handle = runtime.spawn(task_wrapper(
     446         3603 :         task_name,
     447         3603 :         task_id,
     448         3603 :         task_cloned,
     449         3603 :         cancel,
     450         3603 :         future,
     451         3603 :     ));
     452         3603 :     task_mut.join_handle = Some(join_handle);
     453         3603 :     drop(task_mut);
     454         3603 : 
     455         3603 :     // The task is now running. Nothing more to do here
     456         3603 :     PageserverTaskId(task_id)
     457         3603 : }
     458              : 
     459              : /// This wrapper function runs in a newly-spawned task. It initializes the
     460              : /// task-local variables and calls the payload function.
     461         3603 : async fn task_wrapper<F>(
     462         3603 :     task_name: String,
     463         3603 :     task_id: u64,
     464         3603 :     task: Arc<PageServerTask>,
     465         3603 :     shutdown_token: CancellationToken,
     466         3603 :     future: F,
     467         3603 : ) where
     468         3603 :     F: Future<Output = anyhow::Result<()>> + Send + 'static,
     469         3603 : {
     470         3500 :     debug!("Starting task '{}'", task_name);
     471              : 
     472              :     // wrap the future so we log panics and errors
     473         3500 :     let tenant_shard_id = task.tenant_shard_id;
     474         3500 :     let timeline_id = task.timeline_id;
     475         3500 :     let fut = async move {
     476              :         // We use AssertUnwindSafe here so that the payload function
     477              :         // doesn't need to be UnwindSafe. We don't do anything after the
     478              :         // unwinding that would expose us to unwind-unsafe behavior.
     479        62340 :         let result = AssertUnwindSafe(future).catch_unwind().await;
     480         3000 :         match result {
     481              :             Ok(Ok(())) => {
     482         3000 :                 debug!("Task '{}' exited normally", task_name);
     483              :             }
     484            0 :             Ok(Err(err)) => {
     485            0 :                 error!(
     486            0 :                     "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
     487              :                     task_name, tenant_shard_id, timeline_id, err
     488              :                 );
     489              :             }
     490            0 :             Err(err) => {
     491            0 :                 error!(
     492            0 :                     "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
     493              :                     task_name, tenant_shard_id, timeline_id, err
     494              :                 );
     495              :             }
     496              :         }
     497         3000 :     };
     498              : 
     499              :     // add the task-locals
     500         3500 :     let fut = CURRENT_TASK.scope(task, fut);
     501         3500 :     let fut = SHUTDOWN_TOKEN.scope(shutdown_token, fut);
     502         3500 : 
     503         3500 :     // poll future to completion
     504        62340 :     fut.await;
     505              : 
     506              :     // Remove our entry from the global hashmap.
     507         3000 :     TASKS
     508         3000 :         .lock()
     509         3000 :         .unwrap()
     510         3000 :         .remove(&task_id)
     511         3000 :         .expect("no task in registry");
     512         3000 : }
     513              : 
     514            0 : pub async fn exit_on_panic_or_error<T, E>(
     515            0 :     task_name: &'static str,
     516            0 :     future: impl Future<Output = Result<T, E>>,
     517            0 : ) -> T
     518            0 : where
     519            0 :     E: std::fmt::Debug,
     520            0 : {
     521              :     // We use AssertUnwindSafe here so that the payload function
     522              :     // doesn't need to be UnwindSafe. We don't do anything after the
     523              :     // unwinding that would expose us to unwind-unsafe behavior.
     524            0 :     let result = AssertUnwindSafe(future).catch_unwind().await;
     525            0 :     match result {
     526            0 :         Ok(Ok(val)) => val,
     527            0 :         Ok(Err(err)) => {
     528            0 :             error!(
     529              :                 task_name,
     530            0 :                 "Task exited with error, exiting process: {err:?}"
     531              :             );
     532            0 :             std::process::exit(1);
     533              :         }
     534            0 :         Err(panic_obj) => {
     535            0 :             error!(task_name, "Task panicked, exiting process: {panic_obj:?}");
     536            0 :             std::process::exit(1);
     537              :         }
     538              :     }
     539            0 : }
     540              : 
     541              : /// Signal and wait for tasks to shut down.
     542              : ///
     543              : ///
     544              : /// The arguments are used to select the tasks to kill. Any None arguments are
     545              : /// ignored. For example, to shut down all WalReceiver tasks:
     546              : ///
     547              : ///   shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
     548              : ///
     549              : /// Or to shut down all tasks for given timeline:
     550              : ///
     551              : ///   shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id))
     552              : ///
     553           26 : pub async fn shutdown_tasks(
     554           26 :     kind: Option<TaskKind>,
     555           26 :     tenant_shard_id: Option<TenantShardId>,
     556           26 :     timeline_id: Option<TimelineId>,
     557           26 : ) {
     558           26 :     let mut victim_tasks = Vec::new();
     559           26 : 
     560           26 :     {
     561           26 :         let tasks = TASKS.lock().unwrap();
     562           28 :         for task in tasks.values() {
     563           28 :             if (kind.is_none() || Some(task.kind) == kind)
     564           14 :                 && (tenant_shard_id.is_none() || Some(task.tenant_shard_id) == tenant_shard_id)
     565           14 :                 && (timeline_id.is_none() || task.timeline_id == timeline_id)
     566            8 :             {
     567            8 :                 task.cancel.cancel();
     568            8 :                 victim_tasks.push((
     569            8 :                     Arc::clone(task),
     570            8 :                     task.kind,
     571            8 :                     task.tenant_shard_id,
     572            8 :                     task.timeline_id,
     573            8 :                 ));
     574           20 :             }
     575              :         }
     576              :     }
     577              : 
     578           26 :     let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none();
     579              : 
     580           34 :     for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks {
     581            8 :         let join_handle = {
     582            8 :             let mut task_mut = task.mutable.lock().unwrap();
     583            8 :             task_mut.join_handle.take()
     584              :         };
     585            8 :         if let Some(mut join_handle) = join_handle {
     586            8 :             if log_all {
     587              :                 // warn to catch these in tests; there shouldn't be any
     588            0 :                 warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
     589            8 :             }
     590            8 :             if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
     591            8 :                 .await
     592            8 :                 .is_err()
     593              :             {
     594              :                 // allow some time to elapse before logging to cut down the number of log
     595              :                 // lines.
     596            0 :                 info!("waiting for task {} to shut down", task.name);
     597              :                 // we never handled this return value, but:
     598              :                 // - we don't deschedule which would lead to is_cancelled
     599              :                 // - panics are already logged (is_panicked)
     600              :                 // - task errors are already logged in the wrapper
     601            0 :                 let _ = join_handle.await;
     602            0 :                 info!("task {} completed", task.name);
     603            8 :             }
     604            0 :         } else {
     605            0 :             // Possibly one of:
     606            0 :             //  * The task had not even fully started yet.
     607            0 :             //  * It was shut down concurrently and already exited
     608            0 :         }
     609              :     }
     610           26 : }
     611              : 
     612            0 : pub fn current_task_kind() -> Option<TaskKind> {
     613            0 :     CURRENT_TASK.try_with(|ct| ct.kind).ok()
     614            0 : }
     615              : 
     616            0 : pub fn current_task_id() -> Option<PageserverTaskId> {
     617            0 :     CURRENT_TASK.try_with(|ct| ct.task_id).ok()
     618            0 : }
     619              : 
     620              : /// A Future that can be used to check if the current task has been requested to
     621              : /// shut down.
     622            0 : pub async fn shutdown_watcher() {
     623            0 :     let token = SHUTDOWN_TOKEN
     624            0 :         .try_with(|t| t.clone())
     625            0 :         .expect("shutdown_watcher() called in an unexpected task or thread");
     626            0 : 
     627            0 :     token.cancelled().await;
     628            0 : }
     629              : 
     630              : /// Clone the current task's cancellation token, which can be moved across tasks.
     631              : ///
     632              : /// When the task which is currently executing is shutdown, the cancellation token will be
     633              : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
     634              : /// `tokio::task::JoinSet::spawn`.
     635         3088 : pub fn shutdown_token() -> CancellationToken {
     636         3088 :     let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
     637         3088 : 
     638         3088 :     if cfg!(test) {
     639              :         // in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
     640         3088 :         res.unwrap_or_default()
     641              :     } else {
     642            0 :         res.expect("shutdown_token() called in an unexpected task or thread")
     643              :     }
     644         3088 : }
     645              : 
     646              : /// Has the current task been requested to shut down?
     647            0 : pub fn is_shutdown_requested() -> bool {
     648            0 :     if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) {
     649            0 :         true_or_false
     650              :     } else {
     651            0 :         if !cfg!(test) {
     652            0 :             warn!("is_shutdown_requested() called in an unexpected task or thread");
     653            0 :         }
     654            0 :         false
     655              :     }
     656            0 : }
        

Generated by: LCOV version 2.1-beta