Line data Source code
1 : //!
2 : //! This module provides centralized handling of tokio tasks in the Page Server.
3 : //!
4 : //! We provide a few basic facilities:
5 : //! - A global registry of tasks that lists what kind of tasks they are, and
6 : //! which tenant or timeline they are working on
7 : //!
8 : //! - The ability to request a task to shut down.
9 : //!
10 : //!
11 : //! # How it works?
12 : //!
13 : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
14 : //! task is spawned, a PageServerTask entry is added there, and when a
15 : //! task dies, it removes itself from the hashmap. If you want to kill a
16 : //! task, you can scan the hashmap to find it.
17 : //!
18 : //! # Task shutdown
19 : //!
20 : //! To kill a task, we rely on co-operation from the victim. Each task is
21 : //! expected to periodically call the `is_shutdown_requested()` function, and
22 : //! if it returns true, exit gracefully. In addition to that, when waiting for
23 : //! the network or other long-running operation, you can use
24 : //! `shutdown_watcher()` function to get a Future that will become ready if
25 : //! the current task has been requested to shut down. You can use that with
26 : //! Tokio select!().
27 : //!
28 : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
29 : //! Depending on what task panics, we might want to kill the whole server, or
30 : //! only a single tenant or timeline.
31 : //!
32 :
33 : use std::collections::HashMap;
34 : use std::fmt;
35 : use std::future::Future;
36 : use std::num::NonZeroUsize;
37 : use std::panic::AssertUnwindSafe;
38 : use std::str::FromStr;
39 : use std::sync::atomic::{AtomicU64, Ordering};
40 : use std::sync::{Arc, Mutex};
41 :
42 : use futures::FutureExt;
43 : use pageserver_api::shard::TenantShardId;
44 : use tokio::task::JoinHandle;
45 : use tokio::task_local;
46 : use tokio_util::sync::CancellationToken;
47 :
48 : use tracing::{debug, error, info, warn};
49 :
50 : use once_cell::sync::Lazy;
51 :
52 : use utils::env;
53 : use utils::id::TimelineId;
54 :
55 : use crate::metrics::set_tokio_runtime_setup;
56 :
57 : //
58 : // There are four runtimes:
59 : //
60 : // Compute request runtime
61 : // - used to handle connections from compute nodes. Any tasks related to satisfying
62 : // GetPage requests, base backups, import, and other such compute node operations
63 : // are handled by the Compute request runtime
64 : // - page_service.rs
65 : // - this includes layer downloads from remote storage, if a layer is needed to
66 : // satisfy a GetPage request
67 : //
68 : // Management request runtime
69 : // - used to handle HTTP API requests
70 : //
71 : // WAL receiver runtime:
72 : // - used to handle WAL receiver connections.
73 : // - and to receiver updates from storage_broker
74 : //
75 : // Background runtime
76 : // - layer flushing
77 : // - garbage collection
78 : // - compaction
79 : // - remote storage uploads
80 : // - initial tenant loading
81 : //
82 : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
83 : // runtime.
84 : //
85 : // There might be situations when one task needs to wait for a task running in another
86 : // Runtime to finish. For example, if a background operation needs a layer from remote
87 : // storage, it will start to download it. If a background operation needs a remote layer,
88 : // and the download was already initiated by a GetPage request, the background task
89 : // will wait for the download - running in the Page server runtime - to finish.
90 : // Another example: the initial tenant loading tasks are launched in the background ops
91 : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
92 : // GetPage request will wait for the tenant load to finish.
93 : //
94 : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
95 : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
96 : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
97 : //
98 : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
99 : // time each class of operations is taking, with 'top -H' or similar.
100 : //
101 : // It's also good to avoid hogging all threads that would be needed to process
102 : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
103 : // happen, but still.
104 : //
105 :
106 96 : pub(crate) static TOKIO_WORKER_THREADS: Lazy<NonZeroUsize> = Lazy::new(|| {
107 96 : // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
108 96 : // tokio would had already panicked for parsing errors or NotUnicode
109 96 : //
110 96 : // this will be wrong if any of the runtimes gets their worker threads configured to something
111 96 : // else, but that has not been needed in a long time.
112 96 : NonZeroUsize::new(
113 96 : std::env::var("TOKIO_WORKER_THREADS")
114 96 : .map(|s| s.parse::<usize>().unwrap())
115 96 : .unwrap_or_else(|_e| usize::max(2, num_cpus::get())),
116 96 : )
117 96 : .expect("the max() ensures that this is not zero")
118 96 : });
119 :
120 : enum TokioRuntimeMode {
121 : SingleThreaded,
122 : MultiThreaded { num_workers: NonZeroUsize },
123 : }
124 :
125 : impl FromStr for TokioRuntimeMode {
126 : type Err = String;
127 :
128 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
129 0 : match s {
130 0 : "current_thread" => Ok(TokioRuntimeMode::SingleThreaded),
131 0 : s => match s.strip_prefix("multi_thread:") {
132 0 : Some("default") => Ok(TokioRuntimeMode::MultiThreaded {
133 0 : num_workers: *TOKIO_WORKER_THREADS,
134 0 : }),
135 0 : Some(suffix) => {
136 0 : let num_workers = suffix.parse::<NonZeroUsize>().map_err(|e| {
137 0 : format!(
138 0 : "invalid number of multi-threaded runtime workers ({suffix:?}): {e}",
139 0 : )
140 0 : })?;
141 0 : Ok(TokioRuntimeMode::MultiThreaded { num_workers })
142 : }
143 0 : None => Err(format!("invalid runtime config: {s:?}")),
144 : },
145 : }
146 0 : }
147 : }
148 :
149 96 : static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
150 96 : let thread_name = "pageserver-tokio";
151 96 : let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else {
152 : // If the env var is not set, leave this static as None.
153 96 : set_tokio_runtime_setup(
154 96 : "multiple-runtimes",
155 96 : NUM_MULTIPLE_RUNTIMES
156 96 : .checked_mul(*TOKIO_WORKER_THREADS)
157 96 : .unwrap(),
158 96 : );
159 96 : return None;
160 : };
161 0 : Some(match mode {
162 : TokioRuntimeMode::SingleThreaded => {
163 0 : set_tokio_runtime_setup("one-runtime-single-threaded", NonZeroUsize::new(1).unwrap());
164 0 : tokio::runtime::Builder::new_current_thread()
165 0 : .thread_name(thread_name)
166 0 : .enable_all()
167 0 : .build()
168 0 : .expect("failed to create one single runtime")
169 : }
170 0 : TokioRuntimeMode::MultiThreaded { num_workers } => {
171 0 : set_tokio_runtime_setup("one-runtime-multi-threaded", num_workers);
172 0 : tokio::runtime::Builder::new_multi_thread()
173 0 : .thread_name(thread_name)
174 0 : .enable_all()
175 0 : .worker_threads(num_workers.get())
176 0 : .build()
177 0 : .expect("failed to create one multi-threaded runtime")
178 : }
179 : })
180 96 : });
181 :
182 : /// Declare a lazy static variable named `$varname` that will resolve
183 : /// to a tokio runtime handle. If the env var `NEON_PAGESERVER_USE_ONE_RUNTIME`
184 : /// is set, this will resolve to `ONE_RUNTIME`. Otherwise, the macro invocation
185 : /// declares a separate runtime and the lazy static variable `$varname`
186 : /// will resolve to that separate runtime.
187 : ///
188 : /// The result is is that `$varname.spawn()` will use `ONE_RUNTIME` if
189 : /// `NEON_PAGESERVER_USE_ONE_RUNTIME` is set, and will use the separate runtime
190 : /// otherwise.
191 : macro_rules! pageserver_runtime {
192 : ($varname:ident, $name:literal) => {
193 : pub static $varname: Lazy<&'static tokio::runtime::Runtime> = Lazy::new(|| {
194 : if let Some(runtime) = &*ONE_RUNTIME {
195 : return runtime;
196 : }
197 : static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
198 : tokio::runtime::Builder::new_multi_thread()
199 : .thread_name($name)
200 : .worker_threads(TOKIO_WORKER_THREADS.get())
201 : .enable_all()
202 : .build()
203 : .expect(std::concat!("Failed to create runtime ", $name))
204 : });
205 : &*RUNTIME
206 : });
207 : };
208 : }
209 :
210 0 : pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
211 0 : pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
212 20 : pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
213 192 : pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
214 : // Bump this number when adding a new pageserver_runtime!
215 : // SAFETY: it's obviously correct
216 : const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
217 :
218 : #[derive(Debug, Clone, Copy)]
219 : pub struct PageserverTaskId(u64);
220 :
221 : impl fmt::Display for PageserverTaskId {
222 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 0 : self.0.fmt(f)
224 0 : }
225 : }
226 :
227 : /// Each task that we track is associated with a "task ID". It's just an
228 : /// increasing number that we assign. Note that it is different from tokio::task::Id.
229 : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
230 :
231 : /// Global registry of tasks
232 : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
233 100 : Lazy::new(|| Mutex::new(HashMap::new()));
234 :
235 192 : task_local! {
236 192 : // This is a cancellation token which will be cancelled when a task needs to shut down. The
237 192 : // root token is kept in the global registry, so that anyone can send the signal to request
238 192 : // task shutdown.
239 192 : static SHUTDOWN_TOKEN: CancellationToken;
240 192 :
241 192 : // Each task holds reference to its own PageServerTask here.
242 192 : static CURRENT_TASK: Arc<PageServerTask>;
243 192 : }
244 :
245 : ///
246 : /// There are many kinds of tasks in the system. Some are associated with a particular
247 : /// tenant or timeline, while others are global.
248 : ///
249 : /// Note that we don't try to limit how many task of a certain kind can be running
250 : /// at the same time.
251 : ///
252 : #[derive(
253 : Debug,
254 : // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
255 0 : enumset::EnumSetType,
256 : enum_map::Enum,
257 : serde::Serialize,
258 0 : serde::Deserialize,
259 2788 : strum_macros::IntoStaticStr,
260 0 : strum_macros::EnumString,
261 : )]
262 : pub enum TaskKind {
263 : // Pageserver startup, i.e., `main`
264 : Startup,
265 :
266 : // libpq listener task. It just accepts connection and spawns a
267 : // PageRequestHandler task for each connection.
268 : LibpqEndpointListener,
269 :
270 : // HTTP endpoint listener.
271 : HttpEndpointListener,
272 :
273 : // Task that handles a single connection. A PageRequestHandler task
274 : // starts detached from any particular tenant or timeline, but it can be
275 : // associated with one later, after receiving a command from the client.
276 : PageRequestHandler,
277 :
278 : /// Manages the WAL receiver connection for one timeline.
279 : /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
280 : /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
281 : /// There is at most one connection at any given time.
282 : ///
283 : /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
284 : /// The `Client` object is what library users use to make requests & get responses.
285 : /// Internally, `Client` hands over requests to the `Connection` object.
286 : /// The `Connection` object is responsible for speaking the wire protocol.
287 : ///
288 : /// Walreceiver uses a legacy abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
289 : /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
290 : /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
291 : ///
292 : /// Once the connection is established, the `TaskHandle` task spawns a
293 : /// [`WalReceiverConnectionPoller`] task that is responsible for polling
294 : /// the `Connection` object.
295 : /// A `CancellationToken` created by the `TaskHandle` task ensures
296 : /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
297 : ///
298 : /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
299 : /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
300 : WalReceiverManager,
301 :
302 : /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
303 : /// See the comment on [`WalReceiverManager`].
304 : ///
305 : /// [`WalReceiverManager`]: Self::WalReceiverManager
306 : WalReceiverConnectionHandler,
307 :
308 : /// The task that polls the `tokio-postgres::Connection` object.
309 : /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
310 : /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
311 : WalReceiverConnectionPoller,
312 :
313 : // Garbage collection worker. One per tenant
314 : GarbageCollector,
315 :
316 : // Compaction. One per tenant.
317 : Compaction,
318 :
319 : // Eviction. One per timeline.
320 : Eviction,
321 :
322 : /// See [`crate::disk_usage_eviction_task`].
323 : DiskUsageEviction,
324 :
325 : /// See [`crate::tenant::secondary`].
326 : SecondaryDownloads,
327 :
328 : /// See [`crate::tenant::secondary`].
329 : SecondaryUploads,
330 :
331 : // Initial logical size calculation
332 : InitialLogicalSizeCalculation,
333 :
334 : OndemandLogicalSizeCalculation,
335 :
336 : // Task that flushes frozen in-memory layers to disk
337 : LayerFlushTask,
338 :
339 : // Task that uploads a file to remote storage
340 : RemoteUploadTask,
341 :
342 : // task that handles the initial downloading of all tenants
343 : InitialLoad,
344 :
345 : // task that handles attaching a tenant
346 : Attach,
347 :
348 : // Used mostly for background deletion from s3
349 : TimelineDeletionWorker,
350 :
351 : // task that handhes metrics collection
352 : MetricsCollection,
353 :
354 : // task that drives downloading layers
355 : DownloadAllRemoteLayers,
356 : // Task that calculates synthetis size for all active tenants
357 : CalculateSyntheticSize,
358 :
359 : // A request that comes in via the pageserver HTTP API.
360 : MgmtRequest,
361 :
362 : DebugTool,
363 :
364 : #[cfg(test)]
365 : UnitTest,
366 : }
367 :
368 : #[derive(Default)]
369 : struct MutableTaskState {
370 : /// Handle for waiting for the task to exit. It can be None, if the
371 : /// the task has already exited.
372 : join_handle: Option<JoinHandle<()>>,
373 : }
374 :
375 : struct PageServerTask {
376 : task_id: PageserverTaskId,
377 :
378 : kind: TaskKind,
379 :
380 : name: String,
381 :
382 : // To request task shutdown, just cancel this token.
383 : cancel: CancellationToken,
384 :
385 : /// Tasks may optionally be launched for a particular tenant/timeline, enabling
386 : /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
387 : tenant_shard_id: Option<TenantShardId>,
388 : timeline_id: Option<TimelineId>,
389 :
390 : mutable: Mutex<MutableTaskState>,
391 : }
392 :
393 : /// Launch a new task
394 : /// Note: if shutdown_process_on_error is set to true failure
395 : /// of the task will lead to shutdown of entire process
396 2537 : pub fn spawn<F>(
397 2537 : runtime: &tokio::runtime::Handle,
398 2537 : kind: TaskKind,
399 2537 : tenant_shard_id: Option<TenantShardId>,
400 2537 : timeline_id: Option<TimelineId>,
401 2537 : name: &str,
402 2537 : shutdown_process_on_error: bool,
403 2537 : future: F,
404 2537 : ) -> PageserverTaskId
405 2537 : where
406 2537 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
407 2537 : {
408 2537 : let cancel = CancellationToken::new();
409 2537 : let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
410 2537 : let task = Arc::new(PageServerTask {
411 2537 : task_id: PageserverTaskId(task_id),
412 2537 : kind,
413 2537 : name: name.to_string(),
414 2537 : cancel: cancel.clone(),
415 2537 : tenant_shard_id,
416 2537 : timeline_id,
417 2537 : mutable: Mutex::new(MutableTaskState { join_handle: None }),
418 2537 : });
419 2537 :
420 2537 : TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
421 2537 :
422 2537 : let mut task_mut = task.mutable.lock().unwrap();
423 2537 :
424 2537 : let task_name = name.to_string();
425 2537 : let task_cloned = Arc::clone(&task);
426 2537 : let join_handle = runtime.spawn(task_wrapper(
427 2537 : task_name,
428 2537 : task_id,
429 2537 : task_cloned,
430 2537 : cancel,
431 2537 : shutdown_process_on_error,
432 2537 : future,
433 2537 : ));
434 2537 : task_mut.join_handle = Some(join_handle);
435 2537 : drop(task_mut);
436 2537 :
437 2537 : // The task is now running. Nothing more to do here
438 2537 : PageserverTaskId(task_id)
439 2537 : }
440 :
441 : /// This wrapper function runs in a newly-spawned task. It initializes the
442 : /// task-local variables and calls the payload function.
443 2537 : async fn task_wrapper<F>(
444 2537 : task_name: String,
445 2537 : task_id: u64,
446 2537 : task: Arc<PageServerTask>,
447 2537 : shutdown_token: CancellationToken,
448 2537 : shutdown_process_on_error: bool,
449 2537 : future: F,
450 2537 : ) where
451 2537 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
452 2537 : {
453 2430 : debug!("Starting task '{}'", task_name);
454 :
455 2430 : let result = SHUTDOWN_TOKEN
456 2430 : .scope(
457 2430 : shutdown_token,
458 2430 : CURRENT_TASK.scope(task, {
459 2430 : // We use AssertUnwindSafe here so that the payload function
460 2430 : // doesn't need to be UnwindSafe. We don't do anything after the
461 2430 : // unwinding that would expose us to unwind-unsafe behavior.
462 2430 : AssertUnwindSafe(future).catch_unwind()
463 2430 : }),
464 2430 : )
465 91548 : .await;
466 2009 : task_finish(result, task_name, task_id, shutdown_process_on_error).await;
467 2009 : }
468 :
469 2009 : async fn task_finish(
470 2009 : result: std::result::Result<
471 2009 : anyhow::Result<()>,
472 2009 : std::boxed::Box<dyn std::any::Any + std::marker::Send>,
473 2009 : >,
474 2009 : task_name: String,
475 2009 : task_id: u64,
476 2009 : shutdown_process_on_error: bool,
477 2009 : ) {
478 2009 : // Remove our entry from the global hashmap.
479 2009 : let task = TASKS
480 2009 : .lock()
481 2009 : .unwrap()
482 2009 : .remove(&task_id)
483 2009 : .expect("no task in registry");
484 2009 :
485 2009 : let mut shutdown_process = false;
486 : {
487 2009 : match result {
488 : Ok(Ok(())) => {
489 2009 : debug!("Task '{}' exited normally", task_name);
490 : }
491 0 : Ok(Err(err)) => {
492 0 : if shutdown_process_on_error {
493 0 : error!(
494 0 : "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
495 0 : task_name, task.tenant_shard_id, task.timeline_id, err
496 0 : );
497 0 : shutdown_process = true;
498 : } else {
499 0 : error!(
500 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
501 0 : task_name, task.tenant_shard_id, task.timeline_id, err
502 0 : );
503 : }
504 : }
505 0 : Err(err) => {
506 0 : if shutdown_process_on_error {
507 0 : error!(
508 0 : "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
509 0 : task_name, task.tenant_shard_id, task.timeline_id, err
510 0 : );
511 0 : shutdown_process = true;
512 : } else {
513 0 : error!(
514 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
515 0 : task_name, task.tenant_shard_id, task.timeline_id, err
516 0 : );
517 : }
518 : }
519 : }
520 : }
521 :
522 2009 : if shutdown_process {
523 0 : std::process::exit(1);
524 2009 : }
525 2009 : }
526 :
527 : /// Signal and wait for tasks to shut down.
528 : ///
529 : ///
530 : /// The arguments are used to select the tasks to kill. Any None arguments are
531 : /// ignored. For example, to shut down all WalReceiver tasks:
532 : ///
533 : /// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
534 : ///
535 : /// Or to shut down all tasks for given timeline:
536 : ///
537 : /// shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id))
538 : ///
539 22 : pub async fn shutdown_tasks(
540 22 : kind: Option<TaskKind>,
541 22 : tenant_shard_id: Option<TenantShardId>,
542 22 : timeline_id: Option<TimelineId>,
543 22 : ) {
544 22 : let mut victim_tasks = Vec::new();
545 22 :
546 22 : {
547 22 : let tasks = TASKS.lock().unwrap();
548 22 : for task in tasks.values() {
549 22 : if (kind.is_none() || Some(task.kind) == kind)
550 15 : && (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
551 15 : && (timeline_id.is_none() || task.timeline_id == timeline_id)
552 5 : {
553 5 : task.cancel.cancel();
554 5 : victim_tasks.push((
555 5 : Arc::clone(task),
556 5 : task.kind,
557 5 : task.tenant_shard_id,
558 5 : task.timeline_id,
559 5 : ));
560 17 : }
561 : }
562 : }
563 :
564 22 : let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none();
565 :
566 27 : for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks {
567 5 : let join_handle = {
568 5 : let mut task_mut = task.mutable.lock().unwrap();
569 5 : task_mut.join_handle.take()
570 : };
571 5 : if let Some(mut join_handle) = join_handle {
572 5 : if log_all {
573 0 : if tenant_shard_id.is_none() {
574 : // there are quite few of these
575 0 : info!(name = task.name, kind = ?task_kind, "stopping global task");
576 : } else {
577 : // warn to catch these in tests; there shouldn't be any
578 0 : warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
579 : }
580 5 : }
581 5 : if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
582 5 : .await
583 5 : .is_err()
584 : {
585 : // allow some time to elapse before logging to cut down the number of log
586 : // lines.
587 0 : info!("waiting for task {} to shut down", task.name);
588 : // we never handled this return value, but:
589 : // - we don't deschedule which would lead to is_cancelled
590 : // - panics are already logged (is_panicked)
591 : // - task errors are already logged in the wrapper
592 0 : let _ = join_handle.await;
593 0 : info!("task {} completed", task.name);
594 5 : }
595 0 : } else {
596 0 : // Possibly one of:
597 0 : // * The task had not even fully started yet.
598 0 : // * It was shut down concurrently and already exited
599 0 : }
600 : }
601 22 : }
602 :
603 0 : pub fn current_task_kind() -> Option<TaskKind> {
604 0 : CURRENT_TASK.try_with(|ct| ct.kind).ok()
605 0 : }
606 :
607 0 : pub fn current_task_id() -> Option<PageserverTaskId> {
608 0 : CURRENT_TASK.try_with(|ct| ct.task_id).ok()
609 0 : }
610 :
611 : /// A Future that can be used to check if the current task has been requested to
612 : /// shut down.
613 0 : pub async fn shutdown_watcher() {
614 0 : let token = SHUTDOWN_TOKEN
615 0 : .try_with(|t| t.clone())
616 0 : .expect("shutdown_watcher() called in an unexpected task or thread");
617 0 :
618 0 : token.cancelled().await;
619 0 : }
620 :
621 : /// Clone the current task's cancellation token, which can be moved across tasks.
622 : ///
623 : /// When the task which is currently executing is shutdown, the cancellation token will be
624 : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
625 : /// `tokio::task::JoinSet::spawn`.
626 2622 : pub fn shutdown_token() -> CancellationToken {
627 2622 : let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
628 2622 :
629 2622 : if cfg!(test) {
630 : // in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
631 2622 : res.unwrap_or_default()
632 : } else {
633 0 : res.expect("shutdown_token() called in an unexpected task or thread")
634 : }
635 2622 : }
636 :
637 : /// Has the current task been requested to shut down?
638 8 : pub fn is_shutdown_requested() -> bool {
639 8 : if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) {
640 0 : true_or_false
641 : } else {
642 8 : if !cfg!(test) {
643 0 : warn!("is_shutdown_requested() called in an unexpected task or thread");
644 8 : }
645 8 : false
646 : }
647 8 : }
|