LCOV - code coverage report
Current view: top level - pageserver/src - task_mgr.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 87.9 % 247 217
Test Date: 2023-09-06 10:18:01 Functions: 51.6 % 155 80

            Line data    Source code
       1              : //!
       2              : //! This module provides centralized handling of tokio tasks in the Page Server.
       3              : //!
       4              : //! We provide a few basic facilities:
       5              : //! - A global registry of tasks that lists what kind of tasks they are, and
       6              : //!   which tenant or timeline they are working on
       7              : //!
       8              : //! - The ability to request a task to shut down.
       9              : //!
      10              : //!
      11              : //! # How it works?
      12              : //!
      13              : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
      14              : //! task is spawned, a PageServerTask entry is added there, and when a
      15              : //! task dies, it removes itself from the hashmap. If you want to kill a
      16              : //! task, you can scan the hashmap to find it.
      17              : //!
      18              : //! # Task shutdown
      19              : //!
      20              : //! To kill a task, we rely on co-operation from the victim. Each task is
      21              : //! expected to periodically call the `is_shutdown_requested()` function, and
      22              : //! if it returns true, exit gracefully. In addition to that, when waiting for
      23              : //! the network or other long-running operation, you can use
      24              : //! `shutdown_watcher()` function to get a Future that will become ready if
      25              : //! the current task has been requested to shut down. You can use that with
      26              : //! Tokio select!().
      27              : //!
      28              : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
      29              : //! Depending on what task panics, we might want to kill the whole server, or
      30              : //! only a single tenant or timeline.
      31              : //!
      32              : 
      33              : // Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
      34              : // Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
      35              : #![allow(clippy::declare_interior_mutable_const)]
      36              : 
      37              : use std::collections::HashMap;
      38              : use std::fmt;
      39              : use std::future::Future;
      40              : use std::panic::AssertUnwindSafe;
      41              : use std::sync::atomic::{AtomicU64, Ordering};
      42              : use std::sync::{Arc, Mutex};
      43              : 
      44              : use futures::FutureExt;
      45              : use tokio::runtime::Runtime;
      46              : use tokio::task::JoinHandle;
      47              : use tokio::task_local;
      48              : use tokio_util::sync::CancellationToken;
      49              : 
      50              : use tracing::{debug, error, info, warn};
      51              : 
      52              : use once_cell::sync::Lazy;
      53              : 
      54              : use utils::id::{TenantId, TimelineId};
      55              : 
      56              : use crate::shutdown_pageserver;
      57              : 
      58              : //
      59              : // There are four runtimes:
      60              : //
      61              : // Compute request runtime
      62              : //  - used to handle connections from compute nodes. Any tasks related to satisfying
      63              : //    GetPage requests, base backups, import, and other such compute node operations
      64              : //    are handled by the Compute request runtime
      65              : //  - page_service.rs
      66              : //  - this includes layer downloads from remote storage, if a layer is needed to
      67              : //    satisfy a GetPage request
      68              : //
      69              : // Management request runtime
      70              : //  - used to handle HTTP API requests
      71              : //
      72              : // WAL receiver runtime:
      73              : //  - used to handle WAL receiver connections.
      74              : //  - and to receiver updates from storage_broker
      75              : //
      76              : // Background runtime
      77              : //  - layer flushing
      78              : //  - garbage collection
      79              : //  - compaction
      80              : //  - remote storage uploads
      81              : //  - initial tenant loading
      82              : //
      83              : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
      84              : // runtime.
      85              : //
      86              : // There might be situations when one task needs to wait for a task running in another
      87              : // Runtime to finish. For example, if a background operation needs a layer from remote
      88              : // storage, it will start to download it. If a background operation needs a remote layer,
      89              : // and the download was already initiated by a GetPage request, the background task
      90              : // will wait for the download - running in the Page server runtime - to finish.
      91              : // Another example: the initial tenant loading tasks are launched in the background ops
      92              : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
      93              : // GetPage request will wait for the tenant load to finish.
      94              : //
      95              : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
      96              : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
      97              : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
      98              : //
      99              : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
     100              : // time each class of operations is taking, with 'top -H' or similar.
     101              : //
     102              : // It's also good to avoid hogging all threads that would be needed to process
     103              : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
     104              : // happen, but still.
     105              : //
     106          575 : pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     107          575 :     tokio::runtime::Builder::new_multi_thread()
     108          575 :         .thread_name("compute request worker")
     109          575 :         .enable_all()
     110          575 :         .build()
     111          575 :         .expect("Failed to create compute request runtime")
     112          575 : });
     113              : 
     114          575 : pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     115          575 :     tokio::runtime::Builder::new_multi_thread()
     116          575 :         .thread_name("mgmt request worker")
     117          575 :         .enable_all()
     118          575 :         .build()
     119          575 :         .expect("Failed to create mgmt request runtime")
     120          575 : });
     121              : 
     122          576 : pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     123          576 :     tokio::runtime::Builder::new_multi_thread()
     124          576 :         .thread_name("walreceiver worker")
     125          576 :         .enable_all()
     126          576 :         .build()
     127          576 :         .expect("Failed to create walreceiver runtime")
     128          576 : });
     129              : 
     130          576 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     131          576 :     tokio::runtime::Builder::new_multi_thread()
     132          576 :         .thread_name("background op worker")
     133          576 :         // if you change the number of worker threads please change the constant below
     134          576 :         .enable_all()
     135          576 :         .build()
     136          576 :         .expect("Failed to create background op runtime")
     137          576 : });
     138              : 
     139          238 : pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
     140          238 :     // force init and thus panics
     141          238 :     let _ = BACKGROUND_RUNTIME.handle();
     142          238 :     // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
     143          238 :     // tokio would had already panicked for parsing errors or NotUnicode
     144          238 :     //
     145          238 :     // this will be wrong if any of the runtimes gets their worker threads configured to something
     146          238 :     // else, but that has not been needed in a long time.
     147          238 :     std::env::var("TOKIO_WORKER_THREADS")
     148          238 :         .map(|s| s.parse::<usize>().unwrap())
     149          238 :         .unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
     150          238 : });
     151              : 
     152            0 : #[derive(Debug, Clone, Copy)]
     153              : pub struct PageserverTaskId(u64);
     154              : 
     155              : impl fmt::Display for PageserverTaskId {
     156           75 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     157           75 :         self.0.fmt(f)
     158           75 :     }
     159              : }
     160              : 
     161              : /// Each task that we track is associated with a "task ID". It's just an
     162              : /// increasing number that we assign. Note that it is different from tokio::task::Id.
     163              : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
     164              : 
     165              : /// Global registry of tasks
     166              : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
     167          576 :     Lazy::new(|| Mutex::new(HashMap::new()));
     168              : 
     169              : task_local! {
     170              :     // This is a cancellation token which will be cancelled when a task needs to shut down. The
     171              :     // root token is kept in the global registry, so that anyone can send the signal to request
     172              :     // task shutdown.
     173              :     static SHUTDOWN_TOKEN: CancellationToken;
     174              : 
     175              :     // Each task holds reference to its own PageServerTask here.
     176              :     static CURRENT_TASK: Arc<PageServerTask>;
     177              : }
     178              : 
     179              : ///
     180              : /// There are many kinds of tasks in the system. Some are associated with a particular
     181              : /// tenant or timeline, while others are global.
     182              : ///
     183              : /// Note that we don't try to limit how many task of a certain kind can be running
     184              : /// at the same time.
     185              : ///
     186              : #[derive(
     187         2358 :     Debug,
     188              :     // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
     189          969 :     enumset::EnumSetType,
     190            0 :     serde::Serialize,
     191            0 :     serde::Deserialize,
     192         9554 :     strum_macros::IntoStaticStr,
     193              : )]
     194              : pub enum TaskKind {
     195              :     // Pageserver startup, i.e., `main`
     196              :     Startup,
     197              : 
     198              :     // libpq listener task. It just accepts connection and spawns a
     199              :     // PageRequestHandler task for each connection.
     200              :     LibpqEndpointListener,
     201              : 
     202              :     // HTTP endpoint listener.
     203              :     HttpEndpointListener,
     204              : 
     205              :     // Task that handles a single connection. A PageRequestHandler task
     206              :     // starts detached from any particular tenant or timeline, but it can be
     207              :     // associated with one later, after receiving a command from the client.
     208              :     PageRequestHandler,
     209              : 
     210              :     /// Manages the WAL receiver connection for one timeline.
     211              :     /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
     212              :     /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
     213              :     /// There is at most one connection at any given time.
     214              :     ///
     215              :     /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
     216              :     /// The `Client` object is what library users use to make requests & get responses.
     217              :     /// Internally, `Client` hands over requests to the `Connection` object.
     218              :     /// The `Connection` object is responsible for speaking the wire protocol.
     219              :     ///
     220              :     /// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
     221              :     /// That abstraction doesn't use `task_mgr`.
     222              :     /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
     223              :     /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
     224              :     ///
     225              :     /// Once the connection is established, the `TaskHandle` task creates a
     226              :     /// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
     227              :     /// the `Connection` object.
     228              :     /// A `CancellationToken` created by the `TaskHandle` task ensures
     229              :     /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
     230              :     ///
     231              :     /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
     232              :     /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
     233              :     WalReceiverManager,
     234              : 
     235              :     /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
     236              :     /// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
     237              :     /// See the comment on [`WalReceiverManager`].
     238              :     ///
     239              :     /// [`WalReceiverManager`]: Self::WalReceiverManager
     240              :     WalReceiverConnectionHandler,
     241              : 
     242              :     /// The task that polls the `tokio-postgres::Connection` object.
     243              :     /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
     244              :     /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
     245              :     WalReceiverConnectionPoller,
     246              : 
     247              :     // Garbage collection worker. One per tenant
     248              :     GarbageCollector,
     249              : 
     250              :     // Compaction. One per tenant.
     251              :     Compaction,
     252              : 
     253              :     // Eviction. One per timeline.
     254              :     Eviction,
     255              : 
     256              :     /// See [`crate::disk_usage_eviction_task`].
     257              :     DiskUsageEviction,
     258              : 
     259              :     // Initial logical size calculation
     260              :     InitialLogicalSizeCalculation,
     261              : 
     262              :     OndemandLogicalSizeCalculation,
     263              : 
     264              :     // Task that flushes frozen in-memory layers to disk
     265              :     LayerFlushTask,
     266              : 
     267              :     // Task that uploads a file to remote storage
     268              :     RemoteUploadTask,
     269              : 
     270              :     // Task that downloads a file from remote storage
     271              :     RemoteDownloadTask,
     272              : 
     273              :     // task that handles the initial downloading of all tenants
     274              :     InitialLoad,
     275              : 
     276              :     // task that handles attaching a tenant
     277              :     Attach,
     278              : 
     279              :     // Used mostly for background deletion from s3
     280              :     TimelineDeletionWorker,
     281              : 
     282              :     // task that handhes metrics collection
     283              :     MetricsCollection,
     284              : 
     285              :     // task that drives downloading layers
     286              :     DownloadAllRemoteLayers,
     287              :     // Task that calculates synthetis size for all active tenants
     288              :     CalculateSyntheticSize,
     289              : 
     290              :     // A request that comes in via the pageserver HTTP API.
     291              :     MgmtRequest,
     292              : 
     293              :     DebugTool,
     294              : 
     295              :     #[cfg(test)]
     296              :     UnitTest,
     297              : }
     298              : 
     299            0 : #[derive(Default)]
     300              : struct MutableTaskState {
     301              :     /// Tenant and timeline that this task is associated with.
     302              :     tenant_id: Option<TenantId>,
     303              :     timeline_id: Option<TimelineId>,
     304              : 
     305              :     /// Handle for waiting for the task to exit. It can be None, if the
     306              :     /// the task has already exited.
     307              :     join_handle: Option<JoinHandle<()>>,
     308              : }
     309              : 
     310              : struct PageServerTask {
     311              :     #[allow(dead_code)] // unused currently
     312              :     task_id: PageserverTaskId,
     313              : 
     314              :     kind: TaskKind,
     315              : 
     316              :     name: String,
     317              : 
     318              :     // To request task shutdown, just cancel this token.
     319              :     cancel: CancellationToken,
     320              : 
     321              :     mutable: Mutex<MutableTaskState>,
     322              : }
     323              : 
     324              : /// Launch a new task
     325              : /// Note: if shutdown_process_on_error is set to true failure
     326              : ///   of the task will lead to shutdown of entire process
     327        39898 : pub fn spawn<F>(
     328        39898 :     runtime: &tokio::runtime::Handle,
     329        39898 :     kind: TaskKind,
     330        39898 :     tenant_id: Option<TenantId>,
     331        39898 :     timeline_id: Option<TimelineId>,
     332        39898 :     name: &str,
     333        39898 :     shutdown_process_on_error: bool,
     334        39898 :     future: F,
     335        39898 : ) -> PageserverTaskId
     336        39898 : where
     337        39898 :     F: Future<Output = anyhow::Result<()>> + Send + 'static,
     338        39898 : {
     339        39898 :     let cancel = CancellationToken::new();
     340        39898 :     let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
     341        39898 :     let task = Arc::new(PageServerTask {
     342        39898 :         task_id: PageserverTaskId(task_id),
     343        39898 :         kind,
     344        39898 :         name: name.to_string(),
     345        39898 :         cancel: cancel.clone(),
     346        39898 :         mutable: Mutex::new(MutableTaskState {
     347        39898 :             tenant_id,
     348        39898 :             timeline_id,
     349        39898 :             join_handle: None,
     350        39898 :         }),
     351        39898 :     });
     352        39898 : 
     353        39898 :     TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
     354        39898 : 
     355        39898 :     let mut task_mut = task.mutable.lock().unwrap();
     356        39898 : 
     357        39898 :     let task_name = name.to_string();
     358        39898 :     let task_cloned = Arc::clone(&task);
     359        39898 :     let join_handle = runtime.spawn(task_wrapper(
     360        39898 :         task_name,
     361        39898 :         task_id,
     362        39898 :         task_cloned,
     363        39898 :         cancel,
     364        39898 :         shutdown_process_on_error,
     365        39898 :         future,
     366        39898 :     ));
     367        39898 :     task_mut.join_handle = Some(join_handle);
     368        39898 :     drop(task_mut);
     369        39898 : 
     370        39898 :     // The task is now running. Nothing more to do here
     371        39898 :     PageserverTaskId(task_id)
     372        39898 : }
     373              : 
     374              : /// This wrapper function runs in a newly-spawned task. It initializes the
     375              : /// task-local variables and calls the payload function.
     376        39898 : async fn task_wrapper<F>(
     377        39898 :     task_name: String,
     378        39898 :     task_id: u64,
     379        39898 :     task: Arc<PageServerTask>,
     380        39898 :     shutdown_token: CancellationToken,
     381        39898 :     shutdown_process_on_error: bool,
     382        39898 :     future: F,
     383        39898 : ) where
     384        39898 :     F: Future<Output = anyhow::Result<()>> + Send + 'static,
     385        39898 : {
     386            0 :     debug!("Starting task '{}'", task_name);
     387              : 
     388        39897 :     let result = SHUTDOWN_TOKEN
     389        39897 :         .scope(
     390        39897 :             shutdown_token,
     391        39897 :             CURRENT_TASK.scope(task, {
     392        39897 :                 // We use AssertUnwindSafe here so that the payload function
     393        39897 :                 // doesn't need to be UnwindSafe. We don't do anything after the
     394        39897 :                 // unwinding that would expose us to unwind-unsafe behavior.
     395        39897 :                 AssertUnwindSafe(future).catch_unwind()
     396        39897 :             }),
     397        39897 :         )
     398     18019897 :         .await;
     399        35767 :     task_finish(result, task_name, task_id, shutdown_process_on_error).await;
     400        35767 : }
     401              : 
     402        35767 : async fn task_finish(
     403        35767 :     result: std::result::Result<
     404        35767 :         anyhow::Result<()>,
     405        35767 :         std::boxed::Box<dyn std::any::Any + std::marker::Send>,
     406        35767 :     >,
     407        35767 :     task_name: String,
     408        35767 :     task_id: u64,
     409        35767 :     shutdown_process_on_error: bool,
     410        35767 : ) {
     411        35767 :     // Remove our entry from the global hashmap.
     412        35767 :     let task = TASKS
     413        35767 :         .lock()
     414        35767 :         .unwrap()
     415        35767 :         .remove(&task_id)
     416        35767 :         .expect("no task in registry");
     417        35767 : 
     418        35767 :     let mut shutdown_process = false;
     419        35767 :     {
     420        35767 :         let task_mut = task.mutable.lock().unwrap();
     421              : 
     422        35767 :         match result {
     423              :             Ok(Ok(())) => {
     424            0 :                 debug!("Task '{}' exited normally", task_name);
     425              :             }
     426            4 :             Ok(Err(err)) => {
     427            4 :                 if shutdown_process_on_error {
     428            0 :                     error!(
     429            0 :                         "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
     430            0 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     431            0 :                     );
     432            0 :                     shutdown_process = true;
     433              :                 } else {
     434            4 :                     error!(
     435            4 :                         "Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
     436            4 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     437            4 :                     );
     438              :                 }
     439              :             }
     440            0 :             Err(err) => {
     441            0 :                 if shutdown_process_on_error {
     442            0 :                     error!(
     443            0 :                         "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
     444            0 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     445            0 :                     );
     446            0 :                     shutdown_process = true;
     447              :                 } else {
     448            0 :                     error!(
     449            0 :                         "Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
     450            0 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     451            0 :                     );
     452              :                 }
     453              :             }
     454              :         }
     455              :     }
     456              : 
     457        35767 :     if shutdown_process {
     458            0 :         shutdown_pageserver(1).await;
     459        35767 :     }
     460        35767 : }
     461              : 
     462              : // expected to be called from the task of the given id.
     463         4629 : pub fn associate_with(tenant_id: Option<TenantId>, timeline_id: Option<TimelineId>) {
     464         4629 :     CURRENT_TASK.with(|ct| {
     465         4629 :         let mut task_mut = ct.mutable.lock().unwrap();
     466         4629 :         task_mut.tenant_id = tenant_id;
     467         4629 :         task_mut.timeline_id = timeline_id;
     468         4629 :     });
     469         4629 : }
     470              : 
     471              : /// Is there a task running that matches the criteria
     472              : 
     473              : /// Signal and wait for tasks to shut down.
     474              : ///
     475              : ///
     476              : /// The arguments are used to select the tasks to kill. Any None arguments are
     477              : /// ignored. For example, to shut down all WalReceiver tasks:
     478              : ///
     479              : ///   shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
     480              : ///
     481              : /// Or to shut down all tasks for given timeline:
     482              : ///
     483              : ///   shutdown_tasks(None, Some(tenant_id), Some(timeline_id))
     484              : ///
     485         1989 : pub async fn shutdown_tasks(
     486         1989 :     kind: Option<TaskKind>,
     487         1989 :     tenant_id: Option<TenantId>,
     488         1989 :     timeline_id: Option<TimelineId>,
     489         1989 : ) {
     490         1989 :     let mut victim_tasks = Vec::new();
     491         1989 : 
     492         1989 :     {
     493         1989 :         let tasks = TASKS.lock().unwrap();
     494        22159 :         for task in tasks.values() {
     495        22159 :             let task_mut = task.mutable.lock().unwrap();
     496        22159 :             if (kind.is_none() || Some(task.kind) == kind)
     497         8952 :                 && (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
     498         7240 :                 && (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
     499         5770 :             {
     500         5770 :                 task.cancel.cancel();
     501         5770 :                 victim_tasks.push((
     502         5770 :                     Arc::clone(task),
     503         5770 :                     task.kind,
     504         5770 :                     task_mut.tenant_id,
     505         5770 :                     task_mut.timeline_id,
     506         5770 :                 ));
     507        16389 :             }
     508              :         }
     509              :     }
     510              : 
     511         1989 :     let log_all = kind.is_none() && tenant_id.is_none() && timeline_id.is_none();
     512              : 
     513         7759 :     for (task, task_kind, tenant_id, timeline_id) in victim_tasks {
     514         5770 :         let join_handle = {
     515         5770 :             let mut task_mut = task.mutable.lock().unwrap();
     516         5770 :             task_mut.join_handle.take()
     517              :         };
     518         5770 :         if let Some(mut join_handle) = join_handle {
     519         5770 :             if log_all {
     520            2 :                 if tenant_id.is_none() {
     521              :                     // there are quite few of these
     522            2 :                     info!(name = task.name, kind = ?task_kind, "stopping global task");
     523              :                 } else {
     524              :                     // warn to catch these in tests; there shouldn't be any
     525            0 :                     warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
     526              :                 }
     527         5768 :             }
     528         5770 :             if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
     529         1677 :                 .await
     530         5770 :                 .is_err()
     531              :             {
     532              :                 // allow some time to elapse before logging to cut down the number of log
     533              :                 // lines.
     534            3 :                 info!("waiting for {} to shut down", task.name);
     535              :                 // we never handled this return value, but:
     536              :                 // - we don't deschedule which would lead to is_cancelled
     537              :                 // - panics are already logged (is_panicked)
     538              :                 // - task errors are already logged in the wrapper
     539            3 :                 let _ = join_handle.await;
     540         5767 :             }
     541            0 :         } else {
     542            0 :             // Possibly one of:
     543            0 :             //  * The task had not even fully started yet.
     544            0 :             //  * It was shut down concurrently and already exited
     545            0 :         }
     546              :     }
     547         1989 : }
     548              : 
     549      3887235 : pub fn current_task_kind() -> Option<TaskKind> {
     550      3887235 :     CURRENT_TASK.try_with(|ct| ct.kind).ok()
     551      3887235 : }
     552              : 
     553           72 : pub fn current_task_id() -> Option<PageserverTaskId> {
     554           72 :     CURRENT_TASK.try_with(|ct| ct.task_id).ok()
     555           72 : }
     556              : 
     557              : /// A Future that can be used to check if the current task has been requested to
     558              : /// shut down.
     559      4666539 : pub async fn shutdown_watcher() {
     560      4666334 :     let token = SHUTDOWN_TOKEN
     561      4666334 :         .try_with(|t| t.clone())
     562      4666334 :         .expect("shutdown_watcher() called in an unexpected task or thread");
     563      4666334 : 
     564      5111348 :     token.cancelled().await;
     565         1115 : }
     566              : 
     567              : /// Clone the current task's cancellation token, which can be moved across tasks.
     568              : ///
     569              : /// When the task which is currently executing is shutdown, the cancellation token will be
     570              : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
     571              : /// `tokio::task::JoinSet::spawn`.
     572         9228 : pub fn shutdown_token() -> CancellationToken {
     573         9228 :     SHUTDOWN_TOKEN
     574         9228 :         .try_with(|t| t.clone())
     575         9228 :         .expect("shutdown_token() called in an unexpected task or thread")
     576         9228 : }
     577              : 
     578              : /// Has the current task been requested to shut down?
     579              : pub fn is_shutdown_requested() -> bool {
     580        29909 :     if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
     581        29905 :         cancel.is_cancelled()
     582              :     } else {
     583            4 :         if !cfg!(test) {
     584            0 :             warn!("is_shutdown_requested() called in an unexpected task or thread");
     585            4 :         }
     586            4 :         false
     587              :     }
     588        29909 : }
        

Generated by: LCOV version 2.1-beta