Line data Source code
1 : //!
2 : //! This module provides centralized handling of tokio tasks in the Page Server.
3 : //!
4 : //! We provide a few basic facilities:
5 : //! - A global registry of tasks that lists what kind of tasks they are, and
6 : //! which tenant or timeline they are working on
7 : //!
8 : //! - The ability to request a task to shut down.
9 : //!
10 : //!
11 : //! # How it works?
12 : //!
13 : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
14 : //! task is spawned, a PageServerTask entry is added there, and when a
15 : //! task dies, it removes itself from the hashmap. If you want to kill a
16 : //! task, you can scan the hashmap to find it.
17 : //!
18 : //! # Task shutdown
19 : //!
20 : //! To kill a task, we rely on co-operation from the victim. Each task is
21 : //! expected to periodically call the `is_shutdown_requested()` function, and
22 : //! if it returns true, exit gracefully. In addition to that, when waiting for
23 : //! the network or other long-running operation, you can use
24 : //! `shutdown_watcher()` function to get a Future that will become ready if
25 : //! the current task has been requested to shut down. You can use that with
26 : //! Tokio select!().
27 : //!
28 : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
29 : //! Depending on what task panics, we might want to kill the whole server, or
30 : //! only a single tenant or timeline.
31 : //!
32 :
33 : use std::collections::HashMap;
34 : use std::fmt;
35 : use std::future::Future;
36 : use std::num::NonZeroUsize;
37 : use std::panic::AssertUnwindSafe;
38 : use std::str::FromStr;
39 : use std::sync::atomic::{AtomicU64, Ordering};
40 : use std::sync::{Arc, Mutex};
41 : use std::time::Duration;
42 :
43 : use futures::FutureExt;
44 : use once_cell::sync::Lazy;
45 : use pageserver_api::shard::TenantShardId;
46 : use tokio::task::JoinHandle;
47 : use tokio::task_local;
48 : use tokio_util::sync::CancellationToken;
49 : use tracing::{debug, error, info, warn};
50 : use utils::env;
51 : use utils::id::TimelineId;
52 :
53 : use crate::metrics::set_tokio_runtime_setup;
54 :
55 : //
56 : // There are four runtimes:
57 : //
58 : // Compute request runtime
59 : // - used to handle connections from compute nodes. Any tasks related to satisfying
60 : // GetPage requests, base backups, import, and other such compute node operations
61 : // are handled by the Compute request runtime
62 : // - page_service.rs
63 : // - this includes layer downloads from remote storage, if a layer is needed to
64 : // satisfy a GetPage request
65 : //
66 : // Management request runtime
67 : // - used to handle HTTP API requests
68 : //
69 : // WAL receiver runtime:
70 : // - used to handle WAL receiver connections.
71 : // - and to receiver updates from storage_broker
72 : //
73 : // Background runtime
74 : // - layer flushing
75 : // - garbage collection
76 : // - compaction
77 : // - remote storage uploads
78 : // - initial tenant loading
79 : //
80 : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
81 : // runtime.
82 : //
83 : // There might be situations when one task needs to wait for a task running in another
84 : // Runtime to finish. For example, if a background operation needs a layer from remote
85 : // storage, it will start to download it. If a background operation needs a remote layer,
86 : // and the download was already initiated by a GetPage request, the background task
87 : // will wait for the download - running in the Page server runtime - to finish.
88 : // Another example: the initial tenant loading tasks are launched in the background ops
89 : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
90 : // GetPage request will wait for the tenant load to finish.
91 : //
92 : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
93 : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
94 : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
95 : //
96 : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
97 : // time each class of operations is taking, with 'top -H' or similar.
98 : //
99 : // It's also good to avoid hogging all threads that would be needed to process
100 : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
101 : // happen, but still.
102 : //
103 :
104 404 : pub(crate) static TOKIO_WORKER_THREADS: Lazy<NonZeroUsize> = Lazy::new(|| {
105 404 : // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
106 404 : // tokio would had already panicked for parsing errors or NotUnicode
107 404 : //
108 404 : // this will be wrong if any of the runtimes gets their worker threads configured to something
109 404 : // else, but that has not been needed in a long time.
110 404 : NonZeroUsize::new(
111 404 : std::env::var("TOKIO_WORKER_THREADS")
112 404 : .map(|s| s.parse::<usize>().unwrap())
113 404 : .unwrap_or_else(|_e| usize::max(2, num_cpus::get())),
114 404 : )
115 404 : .expect("the max() ensures that this is not zero")
116 404 : });
117 :
118 : enum TokioRuntimeMode {
119 : SingleThreaded,
120 : MultiThreaded { num_workers: NonZeroUsize },
121 : }
122 :
123 : impl FromStr for TokioRuntimeMode {
124 : type Err = String;
125 :
126 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
127 0 : match s {
128 0 : "current_thread" => Ok(TokioRuntimeMode::SingleThreaded),
129 0 : s => match s.strip_prefix("multi_thread:") {
130 0 : Some("default") => Ok(TokioRuntimeMode::MultiThreaded {
131 0 : num_workers: *TOKIO_WORKER_THREADS,
132 0 : }),
133 0 : Some(suffix) => {
134 0 : let num_workers = suffix.parse::<NonZeroUsize>().map_err(|e| {
135 0 : format!(
136 0 : "invalid number of multi-threaded runtime workers ({suffix:?}): {e}",
137 0 : )
138 0 : })?;
139 0 : Ok(TokioRuntimeMode::MultiThreaded { num_workers })
140 : }
141 0 : None => Err(format!("invalid runtime config: {s:?}")),
142 : },
143 : }
144 0 : }
145 : }
146 :
147 404 : static TOKIO_THREAD_STACK_SIZE: Lazy<NonZeroUsize> = Lazy::new(|| {
148 404 : env::var("NEON_PAGESERVER_TOKIO_THREAD_STACK_SIZE")
149 404 : // the default 2MiB are insufficent, especially in debug mode
150 404 : .unwrap_or_else(|| NonZeroUsize::new(4 * 1024 * 1024).unwrap())
151 404 : });
152 :
153 404 : static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
154 404 : let thread_name = "pageserver-tokio";
155 404 : let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else {
156 : // If the env var is not set, leave this static as None.
157 404 : set_tokio_runtime_setup(
158 404 : "multiple-runtimes",
159 404 : NUM_MULTIPLE_RUNTIMES
160 404 : .checked_mul(*TOKIO_WORKER_THREADS)
161 404 : .unwrap(),
162 404 : );
163 404 : return None;
164 : };
165 0 : Some(match mode {
166 : TokioRuntimeMode::SingleThreaded => {
167 0 : set_tokio_runtime_setup("one-runtime-single-threaded", NonZeroUsize::new(1).unwrap());
168 0 : tokio::runtime::Builder::new_current_thread()
169 0 : .thread_name(thread_name)
170 0 : .enable_all()
171 0 : .thread_stack_size(TOKIO_THREAD_STACK_SIZE.get())
172 0 : .build()
173 0 : .expect("failed to create one single runtime")
174 : }
175 0 : TokioRuntimeMode::MultiThreaded { num_workers } => {
176 0 : set_tokio_runtime_setup("one-runtime-multi-threaded", num_workers);
177 0 : tokio::runtime::Builder::new_multi_thread()
178 0 : .thread_name(thread_name)
179 0 : .enable_all()
180 0 : .worker_threads(num_workers.get())
181 0 : .thread_stack_size(TOKIO_THREAD_STACK_SIZE.get())
182 0 : .build()
183 0 : .expect("failed to create one multi-threaded runtime")
184 : }
185 : })
186 404 : });
187 :
188 : /// Declare a lazy static variable named `$varname` that will resolve
189 : /// to a tokio runtime handle. If the env var `NEON_PAGESERVER_USE_ONE_RUNTIME`
190 : /// is set, this will resolve to `ONE_RUNTIME`. Otherwise, the macro invocation
191 : /// declares a separate runtime and the lazy static variable `$varname`
192 : /// will resolve to that separate runtime.
193 : ///
194 : /// The result is is that `$varname.spawn()` will use `ONE_RUNTIME` if
195 : /// `NEON_PAGESERVER_USE_ONE_RUNTIME` is set, and will use the separate runtime
196 : /// otherwise.
197 : macro_rules! pageserver_runtime {
198 : ($varname:ident, $name:literal) => {
199 424 : pub static $varname: Lazy<&'static tokio::runtime::Runtime> = Lazy::new(|| {
200 424 : if let Some(runtime) = &*ONE_RUNTIME {
201 0 : return runtime;
202 424 : }
203 424 : static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
204 424 : tokio::runtime::Builder::new_multi_thread()
205 424 : .thread_name($name)
206 424 : .worker_threads(TOKIO_WORKER_THREADS.get())
207 424 : .enable_all()
208 424 : .thread_stack_size(TOKIO_THREAD_STACK_SIZE.get())
209 424 : .build()
210 424 : .expect(std::concat!("Failed to create runtime ", $name))
211 424 : });
212 424 : &*RUNTIME
213 424 : });
214 : };
215 : }
216 :
217 : pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
218 : pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
219 : pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
220 : pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
221 : // Bump this number when adding a new pageserver_runtime!
222 : // SAFETY: it's obviously correct
223 : const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
224 :
225 : #[derive(Debug, Clone, Copy)]
226 : pub struct PageserverTaskId(u64);
227 :
228 : impl fmt::Display for PageserverTaskId {
229 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 0 : self.0.fmt(f)
231 0 : }
232 : }
233 :
234 : /// Each task that we track is associated with a "task ID". It's just an
235 : /// increasing number that we assign. Note that it is different from tokio::task::Id.
236 : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
237 :
238 : /// Global registry of tasks
239 : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
240 412 : Lazy::new(|| Mutex::new(HashMap::new()));
241 :
242 : task_local! {
243 : // This is a cancellation token which will be cancelled when a task needs to shut down. The
244 : // root token is kept in the global registry, so that anyone can send the signal to request
245 : // task shutdown.
246 : static SHUTDOWN_TOKEN: CancellationToken;
247 :
248 : // Each task holds reference to its own PageServerTask here.
249 : static CURRENT_TASK: Arc<PageServerTask>;
250 : }
251 :
252 : ///
253 : /// There are many kinds of tasks in the system. Some are associated with a particular
254 : /// tenant or timeline, while others are global.
255 : ///
256 : /// Note that we don't try to limit how many task of a certain kind can be running
257 : /// at the same time.
258 : ///
259 : #[derive(
260 : Debug,
261 : // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
262 0 : enumset::EnumSetType,
263 : enum_map::Enum,
264 : serde::Serialize,
265 0 : serde::Deserialize,
266 : strum_macros::IntoStaticStr,
267 0 : strum_macros::EnumString,
268 : )]
269 : pub enum TaskKind {
270 : // Pageserver startup, i.e., `main`
271 : Startup,
272 :
273 : // libpq listener task. It just accepts connection and spawns a
274 : // PageRequestHandler task for each connection.
275 : LibpqEndpointListener,
276 :
277 : // HTTP endpoint listener.
278 : HttpEndpointListener,
279 :
280 : // Task that handles a single connection. A PageRequestHandler task
281 : // starts detached from any particular tenant or timeline, but it can be
282 : // associated with one later, after receiving a command from the client.
283 : PageRequestHandler,
284 :
285 : /// Manages the WAL receiver connection for one timeline.
286 : /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
287 : /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
288 : /// There is at most one connection at any given time.
289 : ///
290 : /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
291 : /// The `Client` object is what library users use to make requests & get responses.
292 : /// Internally, `Client` hands over requests to the `Connection` object.
293 : /// The `Connection` object is responsible for speaking the wire protocol.
294 : ///
295 : /// Walreceiver uses a legacy abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
296 : /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
297 : /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
298 : ///
299 : /// Once the connection is established, the `TaskHandle` task spawns a
300 : /// [`WalReceiverConnectionPoller`] task that is responsible for polling
301 : /// the `Connection` object.
302 : /// A `CancellationToken` created by the `TaskHandle` task ensures
303 : /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
304 : ///
305 : /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
306 : /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
307 : WalReceiverManager,
308 :
309 : /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
310 : /// See the comment on [`WalReceiverManager`].
311 : ///
312 : /// [`WalReceiverManager`]: Self::WalReceiverManager
313 : WalReceiverConnectionHandler,
314 :
315 : /// The task that polls the `tokio-postgres::Connection` object.
316 : /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
317 : /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
318 : WalReceiverConnectionPoller,
319 :
320 : // Garbage collection worker. One per tenant
321 : GarbageCollector,
322 :
323 : // Compaction. One per tenant.
324 : Compaction,
325 :
326 : // Eviction. One per timeline.
327 : Eviction,
328 :
329 : // Tenant housekeeping (flush idle ephemeral layers, shut down idle walredo, etc.).
330 : TenantHousekeeping,
331 :
332 : /// See [`crate::disk_usage_eviction_task`].
333 : DiskUsageEviction,
334 :
335 : /// See [`crate::tenant::secondary`].
336 : SecondaryDownloads,
337 :
338 : /// See [`crate::tenant::secondary`].
339 : SecondaryUploads,
340 :
341 : // Initial logical size calculation
342 : InitialLogicalSizeCalculation,
343 :
344 : OndemandLogicalSizeCalculation,
345 :
346 : // Task that flushes frozen in-memory layers to disk
347 : LayerFlushTask,
348 :
349 : // Task that uploads a file to remote storage
350 : RemoteUploadTask,
351 :
352 : // task that handles the initial downloading of all tenants
353 : InitialLoad,
354 :
355 : // task that handles attaching a tenant
356 : Attach,
357 :
358 : // Used mostly for background deletion from s3
359 : TimelineDeletionWorker,
360 :
361 : // task that handhes metrics collection
362 : MetricsCollection,
363 :
364 : // task that drives downloading layers
365 : DownloadAllRemoteLayers,
366 : // Task that calculates synthetis size for all active tenants
367 : CalculateSyntheticSize,
368 :
369 : // A request that comes in via the pageserver HTTP API.
370 : MgmtRequest,
371 :
372 : DebugTool,
373 :
374 : EphemeralFilePreWarmPageCache,
375 :
376 : LayerDownload,
377 :
378 : #[cfg(test)]
379 : UnitTest,
380 :
381 : DetachAncestor,
382 :
383 : ImportPgdata,
384 : }
385 :
386 : #[derive(Default)]
387 : struct MutableTaskState {
388 : /// Handle for waiting for the task to exit. It can be None, if the
389 : /// the task has already exited.
390 : join_handle: Option<JoinHandle<()>>,
391 : }
392 :
393 : struct PageServerTask {
394 : task_id: PageserverTaskId,
395 :
396 : kind: TaskKind,
397 :
398 : name: String,
399 :
400 : // To request task shutdown, just cancel this token.
401 : cancel: CancellationToken,
402 :
403 : /// Tasks may optionally be launched for a particular tenant/timeline, enabling
404 : /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
405 : tenant_shard_id: TenantShardId,
406 : timeline_id: Option<TimelineId>,
407 :
408 : mutable: Mutex<MutableTaskState>,
409 : }
410 :
411 : /// Launch a new task
412 : /// Note: if shutdown_process_on_error is set to true failure
413 : /// of the task will lead to shutdown of entire process
414 8022 : pub fn spawn<F>(
415 8022 : runtime: &tokio::runtime::Handle,
416 8022 : kind: TaskKind,
417 8022 : tenant_shard_id: TenantShardId,
418 8022 : timeline_id: Option<TimelineId>,
419 8022 : name: &str,
420 8022 : future: F,
421 8022 : ) -> PageserverTaskId
422 8022 : where
423 8022 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
424 8022 : {
425 8022 : let cancel = CancellationToken::new();
426 8022 : let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
427 8022 : let task = Arc::new(PageServerTask {
428 8022 : task_id: PageserverTaskId(task_id),
429 8022 : kind,
430 8022 : name: name.to_string(),
431 8022 : cancel: cancel.clone(),
432 8022 : tenant_shard_id,
433 8022 : timeline_id,
434 8022 : mutable: Mutex::new(MutableTaskState { join_handle: None }),
435 8022 : });
436 8022 :
437 8022 : TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
438 8022 :
439 8022 : let mut task_mut = task.mutable.lock().unwrap();
440 8022 :
441 8022 : let task_name = name.to_string();
442 8022 : let task_cloned = Arc::clone(&task);
443 8022 : let join_handle = runtime.spawn(task_wrapper(
444 8022 : task_name,
445 8022 : task_id,
446 8022 : task_cloned,
447 8022 : cancel,
448 8022 : future,
449 8022 : ));
450 8022 : task_mut.join_handle = Some(join_handle);
451 8022 : drop(task_mut);
452 8022 :
453 8022 : // The task is now running. Nothing more to do here
454 8022 : PageserverTaskId(task_id)
455 8022 : }
456 :
457 : /// This wrapper function runs in a newly-spawned task. It initializes the
458 : /// task-local variables and calls the payload function.
459 8022 : async fn task_wrapper<F>(
460 8022 : task_name: String,
461 8022 : task_id: u64,
462 8022 : task: Arc<PageServerTask>,
463 8022 : shutdown_token: CancellationToken,
464 8022 : future: F,
465 8022 : ) where
466 8022 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
467 8022 : {
468 7866 : debug!("Starting task '{}'", task_name);
469 :
470 : // wrap the future so we log panics and errors
471 7866 : let tenant_shard_id = task.tenant_shard_id;
472 7866 : let timeline_id = task.timeline_id;
473 7866 : let fut = async move {
474 : // We use AssertUnwindSafe here so that the payload function
475 : // doesn't need to be UnwindSafe. We don't do anything after the
476 : // unwinding that would expose us to unwind-unsafe behavior.
477 7866 : let result = AssertUnwindSafe(future).catch_unwind().await;
478 6703 : match result {
479 : Ok(Ok(())) => {
480 6703 : debug!("Task '{}' exited normally", task_name);
481 : }
482 0 : Ok(Err(err)) => {
483 0 : error!(
484 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
485 : task_name, tenant_shard_id, timeline_id, err
486 : );
487 : }
488 8 : Err(err) => {
489 8 : error!(
490 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
491 : task_name, tenant_shard_id, timeline_id, err
492 : );
493 : }
494 : }
495 6711 : };
496 :
497 : // add the task-locals
498 7866 : let fut = CURRENT_TASK.scope(task, fut);
499 7866 : let fut = SHUTDOWN_TOKEN.scope(shutdown_token, fut);
500 7866 :
501 7866 : // poll future to completion
502 7866 : fut.await;
503 :
504 : // Remove our entry from the global hashmap.
505 6711 : TASKS
506 6711 : .lock()
507 6711 : .unwrap()
508 6711 : .remove(&task_id)
509 6711 : .expect("no task in registry");
510 6711 : }
511 :
512 0 : pub async fn exit_on_panic_or_error<T, E>(
513 0 : task_name: &'static str,
514 0 : future: impl Future<Output = Result<T, E>>,
515 0 : ) -> T
516 0 : where
517 0 : E: std::fmt::Debug,
518 0 : {
519 : // We use AssertUnwindSafe here so that the payload function
520 : // doesn't need to be UnwindSafe. We don't do anything after the
521 : // unwinding that would expose us to unwind-unsafe behavior.
522 0 : let result = AssertUnwindSafe(future).catch_unwind().await;
523 0 : match result {
524 0 : Ok(Ok(val)) => val,
525 0 : Ok(Err(err)) => {
526 0 : error!(
527 : task_name,
528 0 : "Task exited with error, exiting process: {err:?}"
529 : );
530 0 : std::process::exit(1);
531 : }
532 0 : Err(panic_obj) => {
533 0 : error!(task_name, "Task panicked, exiting process: {panic_obj:?}");
534 0 : std::process::exit(1);
535 : }
536 : }
537 0 : }
538 :
539 : /// Signal and wait for tasks to shut down.
540 : ///
541 : ///
542 : /// The arguments are used to select the tasks to kill. Any None arguments are
543 : /// ignored. For example, to shut down all WalReceiver tasks:
544 : ///
545 : /// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
546 : ///
547 : /// Or to shut down all tasks for given timeline:
548 : ///
549 : /// shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id))
550 : ///
551 52 : pub async fn shutdown_tasks(
552 52 : kind: Option<TaskKind>,
553 52 : tenant_shard_id: Option<TenantShardId>,
554 52 : timeline_id: Option<TimelineId>,
555 52 : ) {
556 52 : let mut victim_tasks = Vec::new();
557 52 :
558 52 : {
559 52 : let tasks = TASKS.lock().unwrap();
560 52 : for task in tasks.values() {
561 50 : if (kind.is_none() || Some(task.kind) == kind)
562 27 : && (tenant_shard_id.is_none() || Some(task.tenant_shard_id) == tenant_shard_id)
563 27 : && (timeline_id.is_none() || task.timeline_id == timeline_id)
564 14 : {
565 14 : task.cancel.cancel();
566 14 : victim_tasks.push((
567 14 : Arc::clone(task),
568 14 : task.kind,
569 14 : task.tenant_shard_id,
570 14 : task.timeline_id,
571 14 : ));
572 36 : }
573 : }
574 : }
575 :
576 52 : let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none();
577 :
578 66 : for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks {
579 14 : let join_handle = {
580 14 : let mut task_mut = task.mutable.lock().unwrap();
581 14 : task_mut.join_handle.take()
582 : };
583 14 : if let Some(mut join_handle) = join_handle {
584 14 : if log_all {
585 : // warn to catch these in tests; there shouldn't be any
586 0 : warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
587 14 : }
588 52 : const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
589 52 : const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
590 14 : if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
591 14 : .await
592 14 : .is_err()
593 : {
594 : // allow some time to elapse before logging to cut down the number of log
595 : // lines.
596 0 : info!("waiting for task {} to shut down", task.name);
597 : loop {
598 0 : tokio::select! {
599 : // we never handled this return value, but:
600 : // - we don't deschedule which would lead to is_cancelled
601 : // - panics are already logged (is_panicked)
602 : // - task errors are already logged in the wrapper
603 0 : _ = &mut join_handle => break,
604 0 : _ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
605 : }
606 : }
607 0 : info!("task {} completed", task.name);
608 14 : }
609 0 : } else {
610 0 : // Possibly one of:
611 0 : // * The task had not even fully started yet.
612 0 : // * It was shut down concurrently and already exited
613 0 : }
614 : }
615 52 : }
616 :
617 0 : pub fn current_task_kind() -> Option<TaskKind> {
618 0 : CURRENT_TASK.try_with(|ct| ct.kind).ok()
619 0 : }
620 :
621 0 : pub fn current_task_id() -> Option<PageserverTaskId> {
622 0 : CURRENT_TASK.try_with(|ct| ct.task_id).ok()
623 0 : }
624 :
625 : /// A Future that can be used to check if the current task has been requested to
626 : /// shut down.
627 0 : pub async fn shutdown_watcher() {
628 0 : let token = SHUTDOWN_TOKEN
629 0 : .try_with(|t| t.clone())
630 0 : .expect("shutdown_watcher() called in an unexpected task or thread");
631 0 :
632 0 : token.cancelled().await;
633 0 : }
634 :
635 : /// Clone the current task's cancellation token, which can be moved across tasks.
636 : ///
637 : /// When the task which is currently executing is shutdown, the cancellation token will be
638 : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
639 : /// `tokio::task::JoinSet::spawn`.
640 6974 : pub fn shutdown_token() -> CancellationToken {
641 6974 : let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
642 6974 :
643 6974 : if cfg!(test) {
644 : // in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
645 6974 : res.unwrap_or_default()
646 : } else {
647 0 : res.expect("shutdown_token() called in an unexpected task or thread")
648 : }
649 6974 : }
650 :
651 : /// Has the current task been requested to shut down?
652 0 : pub fn is_shutdown_requested() -> bool {
653 0 : if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) {
654 0 : true_or_false
655 : } else {
656 0 : if !cfg!(test) {
657 0 : warn!("is_shutdown_requested() called in an unexpected task or thread");
658 0 : }
659 0 : false
660 : }
661 0 : }
|