LCOV - differential code coverage report
Current view: top level - pageserver/src - task_mgr.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 88.3 % 257 227 30 227
Current Date: 2023-10-19 02:04:12 Functions: 52.8 % 163 86 77 86
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! This module provides centralized handling of tokio tasks in the Page Server.
       3                 : //!
       4                 : //! We provide a few basic facilities:
       5                 : //! - A global registry of tasks that lists what kind of tasks they are, and
       6                 : //!   which tenant or timeline they are working on
       7                 : //!
       8                 : //! - The ability to request a task to shut down.
       9                 : //!
      10                 : //!
      11                 : //! # How it works?
      12                 : //!
      13                 : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
      14                 : //! task is spawned, a PageServerTask entry is added there, and when a
      15                 : //! task dies, it removes itself from the hashmap. If you want to kill a
      16                 : //! task, you can scan the hashmap to find it.
      17                 : //!
      18                 : //! # Task shutdown
      19                 : //!
      20                 : //! To kill a task, we rely on co-operation from the victim. Each task is
      21                 : //! expected to periodically call the `is_shutdown_requested()` function, and
      22                 : //! if it returns true, exit gracefully. In addition to that, when waiting for
      23                 : //! the network or other long-running operation, you can use
      24                 : //! `shutdown_watcher()` function to get a Future that will become ready if
      25                 : //! the current task has been requested to shut down. You can use that with
      26                 : //! Tokio select!().
      27                 : //!
      28                 : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
      29                 : //! Depending on what task panics, we might want to kill the whole server, or
      30                 : //! only a single tenant or timeline.
      31                 : //!
      32                 : 
      33                 : // Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
      34                 : // Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
      35                 : #![allow(clippy::declare_interior_mutable_const)]
      36                 : 
      37                 : use std::collections::HashMap;
      38                 : use std::fmt;
      39                 : use std::future::Future;
      40                 : use std::panic::AssertUnwindSafe;
      41                 : use std::sync::atomic::{AtomicU64, Ordering};
      42                 : use std::sync::{Arc, Mutex};
      43                 : 
      44                 : use futures::FutureExt;
      45                 : use tokio::runtime::Runtime;
      46                 : use tokio::task::JoinHandle;
      47                 : use tokio::task_local;
      48                 : use tokio_util::sync::CancellationToken;
      49                 : 
      50                 : use tracing::{debug, error, info, warn};
      51                 : 
      52                 : use once_cell::sync::Lazy;
      53                 : 
      54                 : use utils::id::{TenantId, TimelineId};
      55                 : 
      56                 : use crate::shutdown_pageserver;
      57                 : 
      58                 : //
      59                 : // There are four runtimes:
      60                 : //
      61                 : // Compute request runtime
      62                 : //  - used to handle connections from compute nodes. Any tasks related to satisfying
      63                 : //    GetPage requests, base backups, import, and other such compute node operations
      64                 : //    are handled by the Compute request runtime
      65                 : //  - page_service.rs
      66                 : //  - this includes layer downloads from remote storage, if a layer is needed to
      67                 : //    satisfy a GetPage request
      68                 : //
      69                 : // Management request runtime
      70                 : //  - used to handle HTTP API requests
      71                 : //
      72                 : // WAL receiver runtime:
      73                 : //  - used to handle WAL receiver connections.
      74                 : //  - and to receiver updates from storage_broker
      75                 : //
      76                 : // Background runtime
      77                 : //  - layer flushing
      78                 : //  - garbage collection
      79                 : //  - compaction
      80                 : //  - remote storage uploads
      81                 : //  - initial tenant loading
      82                 : //
      83                 : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
      84                 : // runtime.
      85                 : //
      86                 : // There might be situations when one task needs to wait for a task running in another
      87                 : // Runtime to finish. For example, if a background operation needs a layer from remote
      88                 : // storage, it will start to download it. If a background operation needs a remote layer,
      89                 : // and the download was already initiated by a GetPage request, the background task
      90                 : // will wait for the download - running in the Page server runtime - to finish.
      91                 : // Another example: the initial tenant loading tasks are launched in the background ops
      92                 : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
      93                 : // GetPage request will wait for the tenant load to finish.
      94                 : //
      95                 : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
      96                 : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
      97                 : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
      98                 : //
      99                 : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
     100                 : // time each class of operations is taking, with 'top -H' or similar.
     101                 : //
     102                 : // It's also good to avoid hogging all threads that would be needed to process
     103                 : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
     104                 : // happen, but still.
     105                 : //
     106 CBC         560 : pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     107             560 :     tokio::runtime::Builder::new_multi_thread()
     108             560 :         .thread_name("compute request worker")
     109             560 :         .enable_all()
     110             560 :         .build()
     111             560 :         .expect("Failed to create compute request runtime")
     112             560 : });
     113                 : 
     114             560 : pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     115             560 :     tokio::runtime::Builder::new_multi_thread()
     116             560 :         .thread_name("mgmt request worker")
     117             560 :         .enable_all()
     118             560 :         .build()
     119             560 :         .expect("Failed to create mgmt request runtime")
     120             560 : });
     121                 : 
     122             561 : pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     123             561 :     tokio::runtime::Builder::new_multi_thread()
     124             561 :         .thread_name("walreceiver worker")
     125             561 :         .enable_all()
     126             561 :         .build()
     127             561 :         .expect("Failed to create walreceiver runtime")
     128             561 : });
     129                 : 
     130             561 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     131             561 :     tokio::runtime::Builder::new_multi_thread()
     132             561 :         .thread_name("background op worker")
     133             561 :         // if you change the number of worker threads please change the constant below
     134             561 :         .enable_all()
     135             561 :         .build()
     136             561 :         .expect("Failed to create background op runtime")
     137             561 : });
     138                 : 
     139             243 : pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
     140             243 :     // force init and thus panics
     141             243 :     let _ = BACKGROUND_RUNTIME.handle();
     142             243 :     // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
     143             243 :     // tokio would had already panicked for parsing errors or NotUnicode
     144             243 :     //
     145             243 :     // this will be wrong if any of the runtimes gets their worker threads configured to something
     146             243 :     // else, but that has not been needed in a long time.
     147             243 :     std::env::var("TOKIO_WORKER_THREADS")
     148             243 :         .map(|s| s.parse::<usize>().unwrap())
     149             243 :         .unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
     150             243 : });
     151                 : 
     152 UBC           0 : #[derive(Debug, Clone, Copy)]
     153                 : pub struct PageserverTaskId(u64);
     154                 : 
     155                 : impl fmt::Display for PageserverTaskId {
     156 CBC          23 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     157              23 :         self.0.fmt(f)
     158              23 :     }
     159                 : }
     160                 : 
     161                 : /// Each task that we track is associated with a "task ID". It's just an
     162                 : /// increasing number that we assign. Note that it is different from tokio::task::Id.
     163                 : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
     164                 : 
     165                 : /// Global registry of tasks
     166                 : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
     167             561 :     Lazy::new(|| Mutex::new(HashMap::new()));
     168                 : 
     169              86 : task_local! {
     170              86 :     // This is a cancellation token which will be cancelled when a task needs to shut down. The
     171              86 :     // root token is kept in the global registry, so that anyone can send the signal to request
     172              86 :     // task shutdown.
     173              86 :     static SHUTDOWN_TOKEN: CancellationToken;
     174              86 : 
     175              86 :     // Each task holds reference to its own PageServerTask here.
     176              86 :     static CURRENT_TASK: Arc<PageServerTask>;
     177              86 : }
     178                 : 
     179                 : ///
     180                 : /// There are many kinds of tasks in the system. Some are associated with a particular
     181                 : /// tenant or timeline, while others are global.
     182                 : ///
     183                 : /// Note that we don't try to limit how many task of a certain kind can be running
     184                 : /// at the same time.
     185                 : ///
     186                 : #[derive(
     187            3227 :     Debug,
     188                 :     // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
     189             962 :     enumset::EnumSetType,
     190       592224994 :     enum_map::Enum,
     191 UBC           0 :     serde::Serialize,
     192               0 :     serde::Deserialize,
     193 CBC       21159 :     strum_macros::IntoStaticStr,
     194                 : )]
     195                 : pub enum TaskKind {
     196                 :     // Pageserver startup, i.e., `main`
     197                 :     Startup,
     198                 : 
     199                 :     // libpq listener task. It just accepts connection and spawns a
     200                 :     // PageRequestHandler task for each connection.
     201                 :     LibpqEndpointListener,
     202                 : 
     203                 :     // HTTP endpoint listener.
     204                 :     HttpEndpointListener,
     205                 : 
     206                 :     // Task that handles a single connection. A PageRequestHandler task
     207                 :     // starts detached from any particular tenant or timeline, but it can be
     208                 :     // associated with one later, after receiving a command from the client.
     209                 :     PageRequestHandler,
     210                 : 
     211                 :     /// Manages the WAL receiver connection for one timeline.
     212                 :     /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
     213                 :     /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
     214                 :     /// There is at most one connection at any given time.
     215                 :     ///
     216                 :     /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
     217                 :     /// The `Client` object is what library users use to make requests & get responses.
     218                 :     /// Internally, `Client` hands over requests to the `Connection` object.
     219                 :     /// The `Connection` object is responsible for speaking the wire protocol.
     220                 :     ///
     221                 :     /// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
     222                 :     /// That abstraction doesn't use `task_mgr`.
     223                 :     /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
     224                 :     /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
     225                 :     ///
     226                 :     /// Once the connection is established, the `TaskHandle` task creates a
     227                 :     /// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
     228                 :     /// the `Connection` object.
     229                 :     /// A `CancellationToken` created by the `TaskHandle` task ensures
     230                 :     /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
     231                 :     ///
     232                 :     /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
     233                 :     /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
     234                 :     WalReceiverManager,
     235                 : 
     236                 :     /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
     237                 :     /// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
     238                 :     /// See the comment on [`WalReceiverManager`].
     239                 :     ///
     240                 :     /// [`WalReceiverManager`]: Self::WalReceiverManager
     241                 :     WalReceiverConnectionHandler,
     242                 : 
     243                 :     /// The task that polls the `tokio-postgres::Connection` object.
     244                 :     /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
     245                 :     /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
     246                 :     WalReceiverConnectionPoller,
     247                 : 
     248                 :     // Garbage collection worker. One per tenant
     249                 :     GarbageCollector,
     250                 : 
     251                 :     // Compaction. One per tenant.
     252                 :     Compaction,
     253                 : 
     254                 :     // Eviction. One per timeline.
     255                 :     Eviction,
     256                 : 
     257                 :     /// See [`crate::disk_usage_eviction_task`].
     258                 :     DiskUsageEviction,
     259                 : 
     260                 :     // Initial logical size calculation
     261                 :     InitialLogicalSizeCalculation,
     262                 : 
     263                 :     OndemandLogicalSizeCalculation,
     264                 : 
     265                 :     // Task that flushes frozen in-memory layers to disk
     266                 :     LayerFlushTask,
     267                 : 
     268                 :     // Task that uploads a file to remote storage
     269                 :     RemoteUploadTask,
     270                 : 
     271                 :     // Task that downloads a file from remote storage
     272                 :     RemoteDownloadTask,
     273                 : 
     274                 :     // task that handles the initial downloading of all tenants
     275                 :     InitialLoad,
     276                 : 
     277                 :     // task that handles attaching a tenant
     278                 :     Attach,
     279                 : 
     280                 :     // Used mostly for background deletion from s3
     281                 :     TimelineDeletionWorker,
     282                 : 
     283                 :     // task that handhes metrics collection
     284                 :     MetricsCollection,
     285                 : 
     286                 :     // task that drives downloading layers
     287                 :     DownloadAllRemoteLayers,
     288                 :     // Task that calculates synthetis size for all active tenants
     289                 :     CalculateSyntheticSize,
     290                 : 
     291                 :     // A request that comes in via the pageserver HTTP API.
     292                 :     MgmtRequest,
     293                 : 
     294                 :     DebugTool,
     295                 : 
     296                 :     #[cfg(test)]
     297                 :     UnitTest,
     298                 : }
     299                 : 
     300 UBC           0 : #[derive(Default)]
     301                 : struct MutableTaskState {
     302                 :     /// Tenant and timeline that this task is associated with.
     303                 :     tenant_id: Option<TenantId>,
     304                 :     timeline_id: Option<TimelineId>,
     305                 : 
     306                 :     /// Handle for waiting for the task to exit. It can be None, if the
     307                 :     /// the task has already exited.
     308                 :     join_handle: Option<JoinHandle<()>>,
     309                 : }
     310                 : 
     311                 : struct PageServerTask {
     312                 :     #[allow(dead_code)] // unused currently
     313                 :     task_id: PageserverTaskId,
     314                 : 
     315                 :     kind: TaskKind,
     316                 : 
     317                 :     name: String,
     318                 : 
     319                 :     // To request task shutdown, just cancel this token.
     320                 :     cancel: CancellationToken,
     321                 : 
     322                 :     mutable: Mutex<MutableTaskState>,
     323                 : }
     324                 : 
     325                 : /// Launch a new task
     326                 : /// Note: if shutdown_process_on_error is set to true failure
     327                 : ///   of the task will lead to shutdown of entire process
     328 CBC       40029 : pub fn spawn<F>(
     329           40029 :     runtime: &tokio::runtime::Handle,
     330           40029 :     kind: TaskKind,
     331           40029 :     tenant_id: Option<TenantId>,
     332           40029 :     timeline_id: Option<TimelineId>,
     333           40029 :     name: &str,
     334           40029 :     shutdown_process_on_error: bool,
     335           40029 :     future: F,
     336           40029 : ) -> PageserverTaskId
     337           40029 : where
     338           40029 :     F: Future<Output = anyhow::Result<()>> + Send + 'static,
     339           40029 : {
     340           40029 :     let cancel = CancellationToken::new();
     341           40029 :     let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
     342           40029 :     let task = Arc::new(PageServerTask {
     343           40029 :         task_id: PageserverTaskId(task_id),
     344           40029 :         kind,
     345           40029 :         name: name.to_string(),
     346           40029 :         cancel: cancel.clone(),
     347           40029 :         mutable: Mutex::new(MutableTaskState {
     348           40029 :             tenant_id,
     349           40029 :             timeline_id,
     350           40029 :             join_handle: None,
     351           40029 :         }),
     352           40029 :     });
     353           40029 : 
     354           40029 :     TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
     355           40029 : 
     356           40029 :     let mut task_mut = task.mutable.lock().unwrap();
     357           40029 : 
     358           40029 :     let task_name = name.to_string();
     359           40029 :     let task_cloned = Arc::clone(&task);
     360           40029 :     let join_handle = runtime.spawn(task_wrapper(
     361           40029 :         task_name,
     362           40029 :         task_id,
     363           40029 :         task_cloned,
     364           40029 :         cancel,
     365           40029 :         shutdown_process_on_error,
     366           40029 :         future,
     367           40029 :     ));
     368           40029 :     task_mut.join_handle = Some(join_handle);
     369           40029 :     drop(task_mut);
     370           40029 : 
     371           40029 :     // The task is now running. Nothing more to do here
     372           40029 :     PageserverTaskId(task_id)
     373           40029 : }
     374                 : 
     375                 : /// This wrapper function runs in a newly-spawned task. It initializes the
     376                 : /// task-local variables and calls the payload function.
     377           40029 : async fn task_wrapper<F>(
     378           40029 :     task_name: String,
     379           40029 :     task_id: u64,
     380           40029 :     task: Arc<PageServerTask>,
     381           40029 :     shutdown_token: CancellationToken,
     382           40029 :     shutdown_process_on_error: bool,
     383           40029 :     future: F,
     384           40029 : ) where
     385           40029 :     F: Future<Output = anyhow::Result<()>> + Send + 'static,
     386           40029 : {
     387 UBC           0 :     debug!("Starting task '{}'", task_name);
     388                 : 
     389 CBC       40021 :     let result = SHUTDOWN_TOKEN
     390           40021 :         .scope(
     391           40021 :             shutdown_token,
     392           40021 :             CURRENT_TASK.scope(task, {
     393           40021 :                 // We use AssertUnwindSafe here so that the payload function
     394           40021 :                 // doesn't need to be UnwindSafe. We don't do anything after the
     395           40021 :                 // unwinding that would expose us to unwind-unsafe behavior.
     396           40021 :                 AssertUnwindSafe(future).catch_unwind()
     397           40021 :             }),
     398           40021 :         )
     399        16545635 :         .await;
     400           35843 :     task_finish(result, task_name, task_id, shutdown_process_on_error).await;
     401           35843 : }
     402                 : 
     403           35843 : async fn task_finish(
     404           35843 :     result: std::result::Result<
     405           35843 :         anyhow::Result<()>,
     406           35843 :         std::boxed::Box<dyn std::any::Any + std::marker::Send>,
     407           35843 :     >,
     408           35843 :     task_name: String,
     409           35843 :     task_id: u64,
     410           35843 :     shutdown_process_on_error: bool,
     411           35843 : ) {
     412           35843 :     // Remove our entry from the global hashmap.
     413           35843 :     let task = TASKS
     414           35843 :         .lock()
     415           35843 :         .unwrap()
     416           35843 :         .remove(&task_id)
     417           35843 :         .expect("no task in registry");
     418           35843 : 
     419           35843 :     let mut shutdown_process = false;
     420           35843 :     {
     421           35843 :         let task_mut = task.mutable.lock().unwrap();
     422                 : 
     423           35843 :         match result {
     424                 :             Ok(Ok(())) => {
     425 UBC           0 :                 debug!("Task '{}' exited normally", task_name);
     426                 :             }
     427 CBC           4 :             Ok(Err(err)) => {
     428               4 :                 if shutdown_process_on_error {
     429 UBC           0 :                     error!(
     430               0 :                         "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
     431               0 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     432               0 :                     );
     433               0 :                     shutdown_process = true;
     434                 :                 } else {
     435 CBC           4 :                     error!(
     436               4 :                         "Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
     437               4 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     438               4 :                     );
     439                 :                 }
     440                 :             }
     441 UBC           0 :             Err(err) => {
     442               0 :                 if shutdown_process_on_error {
     443               0 :                     error!(
     444               0 :                         "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
     445               0 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     446               0 :                     );
     447               0 :                     shutdown_process = true;
     448                 :                 } else {
     449               0 :                     error!(
     450               0 :                         "Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
     451               0 :                         task_name, task_mut.tenant_id, task_mut.timeline_id, err
     452               0 :                     );
     453                 :                 }
     454                 :             }
     455                 :         }
     456                 :     }
     457                 : 
     458 CBC       35843 :     if shutdown_process {
     459 UBC           0 :         shutdown_pageserver(None, 1).await;
     460 CBC       35843 :     }
     461           35843 : }
     462                 : 
     463                 : // expected to be called from the task of the given id.
     464            4443 : pub fn associate_with(tenant_id: Option<TenantId>, timeline_id: Option<TimelineId>) {
     465            4443 :     CURRENT_TASK.with(|ct| {
     466            4443 :         let mut task_mut = ct.mutable.lock().unwrap();
     467            4443 :         task_mut.tenant_id = tenant_id;
     468            4443 :         task_mut.timeline_id = timeline_id;
     469            4443 :     });
     470            4443 : }
     471                 : 
     472                 : /// Is there a task running that matches the criteria
     473                 : 
     474                 : /// Signal and wait for tasks to shut down.
     475                 : ///
     476                 : ///
     477                 : /// The arguments are used to select the tasks to kill. Any None arguments are
     478                 : /// ignored. For example, to shut down all WalReceiver tasks:
     479                 : ///
     480                 : ///   shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
     481                 : ///
     482                 : /// Or to shut down all tasks for given timeline:
     483                 : ///
     484                 : ///   shutdown_tasks(None, Some(tenant_id), Some(timeline_id))
     485                 : ///
     486            1692 : pub async fn shutdown_tasks(
     487            1692 :     kind: Option<TaskKind>,
     488            1692 :     tenant_id: Option<TenantId>,
     489            1692 :     timeline_id: Option<TimelineId>,
     490            1692 : ) {
     491            1692 :     let mut victim_tasks = Vec::new();
     492            1692 : 
     493            1692 :     {
     494            1692 :         let tasks = TASKS.lock().unwrap();
     495           19149 :         for task in tasks.values() {
     496           19149 :             let task_mut = task.mutable.lock().unwrap();
     497           19149 :             if (kind.is_none() || Some(task.kind) == kind)
     498            7493 :                 && (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
     499            6103 :                 && (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
     500            5099 :             {
     501            5099 :                 task.cancel.cancel();
     502            5099 :                 victim_tasks.push((
     503            5099 :                     Arc::clone(task),
     504            5099 :                     task.kind,
     505            5099 :                     task_mut.tenant_id,
     506            5099 :                     task_mut.timeline_id,
     507            5099 :                 ));
     508           14050 :             }
     509                 :         }
     510                 :     }
     511                 : 
     512            1692 :     let log_all = kind.is_none() && tenant_id.is_none() && timeline_id.is_none();
     513                 : 
     514            6791 :     for (task, task_kind, tenant_id, timeline_id) in victim_tasks {
     515            5099 :         let join_handle = {
     516            5099 :             let mut task_mut = task.mutable.lock().unwrap();
     517            5099 :             task_mut.join_handle.take()
     518                 :         };
     519            5099 :         if let Some(mut join_handle) = join_handle {
     520            5099 :             if log_all {
     521               6 :                 if tenant_id.is_none() {
     522                 :                     // there are quite few of these
     523               6 :                     info!(name = task.name, kind = ?task_kind, "stopping global task");
     524                 :                 } else {
     525                 :                     // warn to catch these in tests; there shouldn't be any
     526 UBC           0 :                     warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
     527                 :                 }
     528 CBC        5093 :             }
     529            5099 :             if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
     530            1509 :                 .await
     531            5099 :                 .is_err()
     532                 :             {
     533                 :                 // allow some time to elapse before logging to cut down the number of log
     534                 :                 // lines.
     535               4 :                 info!("waiting for {} to shut down", task.name);
     536                 :                 // we never handled this return value, but:
     537                 :                 // - we don't deschedule which would lead to is_cancelled
     538                 :                 // - panics are already logged (is_panicked)
     539                 :                 // - task errors are already logged in the wrapper
     540               4 :                 let _ = join_handle.await;
     541            5095 :             }
     542 UBC           0 :         } else {
     543               0 :             // Possibly one of:
     544               0 :             //  * The task had not even fully started yet.
     545               0 :             //  * It was shut down concurrently and already exited
     546               0 :         }
     547                 :     }
     548 CBC        1692 : }
     549                 : 
     550         3835860 : pub fn current_task_kind() -> Option<TaskKind> {
     551         3835860 :     CURRENT_TASK.try_with(|ct| ct.kind).ok()
     552         3835860 : }
     553                 : 
     554              20 : pub fn current_task_id() -> Option<PageserverTaskId> {
     555              20 :     CURRENT_TASK.try_with(|ct| ct.task_id).ok()
     556              20 : }
     557                 : 
     558                 : /// A Future that can be used to check if the current task has been requested to
     559                 : /// shut down.
     560           37931 : pub async fn shutdown_watcher() {
     561           37712 :     let token = SHUTDOWN_TOKEN
     562           37712 :         .try_with(|t| t.clone())
     563           37712 :         .expect("shutdown_watcher() called in an unexpected task or thread");
     564           37712 : 
     565          953836 :     token.cancelled().await;
     566             771 : }
     567                 : 
     568                 : /// Clone the current task's cancellation token, which can be moved across tasks.
     569                 : ///
     570                 : /// When the task which is currently executing is shutdown, the cancellation token will be
     571                 : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
     572                 : /// `tokio::task::JoinSet::spawn`.
     573           12474 : pub fn shutdown_token() -> CancellationToken {
     574           12474 :     SHUTDOWN_TOKEN
     575           12474 :         .try_with(|t| t.clone())
     576           12474 :         .expect("shutdown_token() called in an unexpected task or thread")
     577           12474 : }
     578                 : 
     579                 : /// Has the current task been requested to shut down?
     580                 : pub fn is_shutdown_requested() -> bool {
     581           28363 :     if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
     582           28359 :         cancel.is_cancelled()
     583                 :     } else {
     584               4 :         if !cfg!(test) {
     585 UBC           0 :             warn!("is_shutdown_requested() called in an unexpected task or thread");
     586 CBC           4 :         }
     587               4 :         false
     588                 :     }
     589           28363 : }
        

Generated by: LCOV version 2.1-beta