Line data Source code
1 : //!
2 : //! This module provides centralized handling of tokio tasks in the Page Server.
3 : //!
4 : //! We provide a few basic facilities:
5 : //! - A global registry of tasks that lists what kind of tasks they are, and
6 : //! which tenant or timeline they are working on
7 : //!
8 : //! - The ability to request a task to shut down.
9 : //!
10 : //!
11 : //! # How it works?
12 : //!
13 : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
14 : //! task is spawned, a PageServerTask entry is added there, and when a
15 : //! task dies, it removes itself from the hashmap. If you want to kill a
16 : //! task, you can scan the hashmap to find it.
17 : //!
18 : //! # Task shutdown
19 : //!
20 : //! To kill a task, we rely on co-operation from the victim. Each task is
21 : //! expected to periodically call the `is_shutdown_requested()` function, and
22 : //! if it returns true, exit gracefully. In addition to that, when waiting for
23 : //! the network or other long-running operation, you can use
24 : //! `shutdown_watcher()` function to get a Future that will become ready if
25 : //! the current task has been requested to shut down. You can use that with
26 : //! Tokio select!().
27 : //!
28 : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
29 : //! Depending on what task panics, we might want to kill the whole server, or
30 : //! only a single tenant or timeline.
31 : //!
32 :
33 : // Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
34 : // Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
35 : #![allow(clippy::declare_interior_mutable_const)]
36 :
37 : use std::collections::HashMap;
38 : use std::fmt;
39 : use std::future::Future;
40 : use std::panic::AssertUnwindSafe;
41 : use std::sync::atomic::{AtomicU64, Ordering};
42 : use std::sync::{Arc, Mutex};
43 :
44 : use futures::FutureExt;
45 : use pageserver_api::shard::TenantShardId;
46 : use tokio::runtime::Runtime;
47 : use tokio::task::JoinHandle;
48 : use tokio::task_local;
49 : use tokio_util::sync::CancellationToken;
50 :
51 : use tracing::{debug, error, info, warn};
52 :
53 : use once_cell::sync::Lazy;
54 :
55 : use utils::id::TimelineId;
56 :
57 : use crate::shutdown_pageserver;
58 :
59 : //
60 : // There are four runtimes:
61 : //
62 : // Compute request runtime
63 : // - used to handle connections from compute nodes. Any tasks related to satisfying
64 : // GetPage requests, base backups, import, and other such compute node operations
65 : // are handled by the Compute request runtime
66 : // - page_service.rs
67 : // - this includes layer downloads from remote storage, if a layer is needed to
68 : // satisfy a GetPage request
69 : //
70 : // Management request runtime
71 : // - used to handle HTTP API requests
72 : //
73 : // WAL receiver runtime:
74 : // - used to handle WAL receiver connections.
75 : // - and to receiver updates from storage_broker
76 : //
77 : // Background runtime
78 : // - layer flushing
79 : // - garbage collection
80 : // - compaction
81 : // - remote storage uploads
82 : // - initial tenant loading
83 : //
84 : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
85 : // runtime.
86 : //
87 : // There might be situations when one task needs to wait for a task running in another
88 : // Runtime to finish. For example, if a background operation needs a layer from remote
89 : // storage, it will start to download it. If a background operation needs a remote layer,
90 : // and the download was already initiated by a GetPage request, the background task
91 : // will wait for the download - running in the Page server runtime - to finish.
92 : // Another example: the initial tenant loading tasks are launched in the background ops
93 : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
94 : // GetPage request will wait for the tenant load to finish.
95 : //
96 : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
97 : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
98 : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
99 : //
100 : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
101 : // time each class of operations is taking, with 'top -H' or similar.
102 : //
103 : // It's also good to avoid hogging all threads that would be needed to process
104 : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
105 : // happen, but still.
106 : //
107 624 : pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
108 624 : tokio::runtime::Builder::new_multi_thread()
109 624 : .thread_name("compute request worker")
110 624 : .enable_all()
111 624 : .build()
112 624 : .expect("Failed to create compute request runtime")
113 624 : });
114 :
115 624 : pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
116 624 : tokio::runtime::Builder::new_multi_thread()
117 624 : .thread_name("mgmt request worker")
118 624 : .enable_all()
119 624 : .build()
120 624 : .expect("Failed to create mgmt request runtime")
121 624 : });
122 :
123 634 : pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
124 634 : tokio::runtime::Builder::new_multi_thread()
125 634 : .thread_name("walreceiver worker")
126 634 : .enable_all()
127 634 : .build()
128 634 : .expect("Failed to create walreceiver runtime")
129 634 : });
130 :
131 696 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
132 696 : tokio::runtime::Builder::new_multi_thread()
133 696 : .thread_name("background op worker")
134 696 : // if you change the number of worker threads please change the constant below
135 696 : .enable_all()
136 696 : .build()
137 696 : .expect("Failed to create background op runtime")
138 696 : });
139 :
140 366 : pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
141 366 : // force init and thus panics
142 366 : let _ = BACKGROUND_RUNTIME.handle();
143 366 : // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
144 366 : // tokio would had already panicked for parsing errors or NotUnicode
145 366 : //
146 366 : // this will be wrong if any of the runtimes gets their worker threads configured to something
147 366 : // else, but that has not been needed in a long time.
148 366 : std::env::var("TOKIO_WORKER_THREADS")
149 366 : .map(|s| s.parse::<usize>().unwrap())
150 366 : .unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
151 366 : });
152 :
153 0 : #[derive(Debug, Clone, Copy)]
154 : pub struct PageserverTaskId(u64);
155 :
156 : impl fmt::Display for PageserverTaskId {
157 18 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 18 : self.0.fmt(f)
159 18 : }
160 : }
161 :
162 : /// Each task that we track is associated with a "task ID". It's just an
163 : /// increasing number that we assign. Note that it is different from tokio::task::Id.
164 : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
165 :
166 : /// Global registry of tasks
167 : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
168 700 : Lazy::new(|| Mutex::new(HashMap::new()));
169 :
170 150 : task_local! {
171 150 : // This is a cancellation token which will be cancelled when a task needs to shut down. The
172 150 : // root token is kept in the global registry, so that anyone can send the signal to request
173 150 : // task shutdown.
174 150 : static SHUTDOWN_TOKEN: CancellationToken;
175 150 :
176 150 : // Each task holds reference to its own PageServerTask here.
177 150 : static CURRENT_TASK: Arc<PageServerTask>;
178 150 : }
179 :
180 : ///
181 : /// There are many kinds of tasks in the system. Some are associated with a particular
182 : /// tenant or timeline, while others are global.
183 : ///
184 : /// Note that we don't try to limit how many task of a certain kind can be running
185 : /// at the same time.
186 : ///
187 : #[derive(
188 8 : Debug,
189 : // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
190 2080 : enumset::EnumSetType,
191 350301774 : enum_map::Enum,
192 0 : serde::Serialize,
193 0 : serde::Deserialize,
194 39817 : strum_macros::IntoStaticStr,
195 : )]
196 : pub enum TaskKind {
197 : // Pageserver startup, i.e., `main`
198 : Startup,
199 :
200 : // libpq listener task. It just accepts connection and spawns a
201 : // PageRequestHandler task for each connection.
202 : LibpqEndpointListener,
203 :
204 : // HTTP endpoint listener.
205 : HttpEndpointListener,
206 :
207 : // Task that handles a single connection. A PageRequestHandler task
208 : // starts detached from any particular tenant or timeline, but it can be
209 : // associated with one later, after receiving a command from the client.
210 : PageRequestHandler,
211 :
212 : /// Manages the WAL receiver connection for one timeline.
213 : /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
214 : /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
215 : /// There is at most one connection at any given time.
216 : ///
217 : /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
218 : /// The `Client` object is what library users use to make requests & get responses.
219 : /// Internally, `Client` hands over requests to the `Connection` object.
220 : /// The `Connection` object is responsible for speaking the wire protocol.
221 : ///
222 : /// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
223 : /// That abstraction doesn't use `task_mgr`.
224 : /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
225 : /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
226 : ///
227 : /// Once the connection is established, the `TaskHandle` task creates a
228 : /// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
229 : /// the `Connection` object.
230 : /// A `CancellationToken` created by the `TaskHandle` task ensures
231 : /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
232 : ///
233 : /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
234 : /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
235 : WalReceiverManager,
236 :
237 : /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
238 : /// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
239 : /// See the comment on [`WalReceiverManager`].
240 : ///
241 : /// [`WalReceiverManager`]: Self::WalReceiverManager
242 : WalReceiverConnectionHandler,
243 :
244 : /// The task that polls the `tokio-postgres::Connection` object.
245 : /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
246 : /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
247 : WalReceiverConnectionPoller,
248 :
249 : // Garbage collection worker. One per tenant
250 : GarbageCollector,
251 :
252 : // Compaction. One per tenant.
253 : Compaction,
254 :
255 : // Eviction. One per timeline.
256 : Eviction,
257 :
258 : /// See [`crate::disk_usage_eviction_task`].
259 : DiskUsageEviction,
260 :
261 : /// See [`crate::tenant::secondary`].
262 : SecondaryDownloads,
263 :
264 : /// See [`crate::tenant::secondary`].
265 : SecondaryUploads,
266 :
267 : // Initial logical size calculation
268 : InitialLogicalSizeCalculation,
269 :
270 : OndemandLogicalSizeCalculation,
271 :
272 : // Task that flushes frozen in-memory layers to disk
273 : LayerFlushTask,
274 :
275 : // Task that uploads a file to remote storage
276 : RemoteUploadTask,
277 :
278 : // Task that downloads a file from remote storage
279 : RemoteDownloadTask,
280 :
281 : // task that handles the initial downloading of all tenants
282 : InitialLoad,
283 :
284 : // task that handles attaching a tenant
285 : Attach,
286 :
287 : // Used mostly for background deletion from s3
288 : TimelineDeletionWorker,
289 :
290 : // task that handhes metrics collection
291 : MetricsCollection,
292 :
293 : // task that drives downloading layers
294 : DownloadAllRemoteLayers,
295 : // Task that calculates synthetis size for all active tenants
296 : CalculateSyntheticSize,
297 :
298 : // A request that comes in via the pageserver HTTP API.
299 : MgmtRequest,
300 :
301 : DebugTool,
302 :
303 : #[cfg(test)]
304 : UnitTest,
305 : }
306 :
307 0 : #[derive(Default)]
308 : struct MutableTaskState {
309 : /// Handle for waiting for the task to exit. It can be None, if the
310 : /// the task has already exited.
311 : join_handle: Option<JoinHandle<()>>,
312 : }
313 :
314 : struct PageServerTask {
315 : #[allow(dead_code)] // unused currently
316 : task_id: PageserverTaskId,
317 :
318 : kind: TaskKind,
319 :
320 : name: String,
321 :
322 : // To request task shutdown, just cancel this token.
323 : cancel: CancellationToken,
324 :
325 : /// Tasks may optionally be launched for a particular tenant/timeline, enabling
326 : /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
327 : tenant_shard_id: Option<TenantShardId>,
328 : timeline_id: Option<TimelineId>,
329 :
330 : mutable: Mutex<MutableTaskState>,
331 : }
332 :
333 : /// Launch a new task
334 : /// Note: if shutdown_process_on_error is set to true failure
335 : /// of the task will lead to shutdown of entire process
336 63787 : pub fn spawn<F>(
337 63787 : runtime: &tokio::runtime::Handle,
338 63787 : kind: TaskKind,
339 63787 : tenant_shard_id: Option<TenantShardId>,
340 63787 : timeline_id: Option<TimelineId>,
341 63787 : name: &str,
342 63787 : shutdown_process_on_error: bool,
343 63787 : future: F,
344 63787 : ) -> PageserverTaskId
345 63787 : where
346 63787 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
347 63787 : {
348 63787 : let cancel = CancellationToken::new();
349 63787 : let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
350 63787 : let task = Arc::new(PageServerTask {
351 63787 : task_id: PageserverTaskId(task_id),
352 63787 : kind,
353 63787 : name: name.to_string(),
354 63787 : cancel: cancel.clone(),
355 63787 : tenant_shard_id,
356 63787 : timeline_id,
357 63787 : mutable: Mutex::new(MutableTaskState { join_handle: None }),
358 63787 : });
359 63787 :
360 63787 : TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
361 63787 :
362 63787 : let mut task_mut = task.mutable.lock().unwrap();
363 63787 :
364 63787 : let task_name = name.to_string();
365 63787 : let task_cloned = Arc::clone(&task);
366 63787 : let join_handle = runtime.spawn(task_wrapper(
367 63787 : task_name,
368 63787 : task_id,
369 63787 : task_cloned,
370 63787 : cancel,
371 63787 : shutdown_process_on_error,
372 63787 : future,
373 63787 : ));
374 63787 : task_mut.join_handle = Some(join_handle);
375 63787 : drop(task_mut);
376 63787 :
377 63787 : // The task is now running. Nothing more to do here
378 63787 : PageserverTaskId(task_id)
379 63787 : }
380 :
381 : /// This wrapper function runs in a newly-spawned task. It initializes the
382 : /// task-local variables and calls the payload function.
383 63787 : async fn task_wrapper<F>(
384 63787 : task_name: String,
385 63787 : task_id: u64,
386 63787 : task: Arc<PageServerTask>,
387 63787 : shutdown_token: CancellationToken,
388 63787 : shutdown_process_on_error: bool,
389 63787 : future: F,
390 63787 : ) where
391 63787 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
392 63787 : {
393 0 : debug!("Starting task '{}'", task_name);
394 :
395 63777 : let result = SHUTDOWN_TOKEN
396 63777 : .scope(
397 63777 : shutdown_token,
398 63777 : CURRENT_TASK.scope(task, {
399 63777 : // We use AssertUnwindSafe here so that the payload function
400 63777 : // doesn't need to be UnwindSafe. We don't do anything after the
401 63777 : // unwinding that would expose us to unwind-unsafe behavior.
402 63777 : AssertUnwindSafe(future).catch_unwind()
403 63777 : }),
404 63777 : )
405 10142883 : .await;
406 58623 : task_finish(result, task_name, task_id, shutdown_process_on_error).await;
407 58623 : }
408 :
409 58623 : async fn task_finish(
410 58623 : result: std::result::Result<
411 58623 : anyhow::Result<()>,
412 58623 : std::boxed::Box<dyn std::any::Any + std::marker::Send>,
413 58623 : >,
414 58623 : task_name: String,
415 58623 : task_id: u64,
416 58623 : shutdown_process_on_error: bool,
417 58623 : ) {
418 58623 : // Remove our entry from the global hashmap.
419 58623 : let task = TASKS
420 58623 : .lock()
421 58623 : .unwrap()
422 58623 : .remove(&task_id)
423 58623 : .expect("no task in registry");
424 58623 :
425 58623 : let mut shutdown_process = false;
426 : {
427 58623 : match result {
428 : Ok(Ok(())) => {
429 0 : debug!("Task '{}' exited normally", task_name);
430 : }
431 14 : Ok(Err(err)) => {
432 14 : if shutdown_process_on_error {
433 0 : error!(
434 0 : "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
435 0 : task_name, task.tenant_shard_id, task.timeline_id, err
436 0 : );
437 0 : shutdown_process = true;
438 : } else {
439 14 : error!(
440 14 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
441 14 : task_name, task.tenant_shard_id, task.timeline_id, err
442 14 : );
443 : }
444 : }
445 0 : Err(err) => {
446 0 : if shutdown_process_on_error {
447 0 : error!(
448 0 : "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
449 0 : task_name, task.tenant_shard_id, task.timeline_id, err
450 0 : );
451 0 : shutdown_process = true;
452 : } else {
453 0 : error!(
454 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
455 0 : task_name, task.tenant_shard_id, task.timeline_id, err
456 0 : );
457 : }
458 : }
459 : }
460 : }
461 :
462 58623 : if shutdown_process {
463 0 : shutdown_pageserver(None, 1).await;
464 58623 : }
465 58623 : }
466 :
467 : /// Signal and wait for tasks to shut down.
468 : ///
469 : ///
470 : /// The arguments are used to select the tasks to kill. Any None arguments are
471 : /// ignored. For example, to shut down all WalReceiver tasks:
472 : ///
473 : /// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
474 : ///
475 : /// Or to shut down all tasks for given timeline:
476 : ///
477 : /// shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id))
478 : ///
479 3188 : pub async fn shutdown_tasks(
480 3188 : kind: Option<TaskKind>,
481 3188 : tenant_shard_id: Option<TenantShardId>,
482 3188 : timeline_id: Option<TimelineId>,
483 3188 : ) {
484 3188 : let mut victim_tasks = Vec::new();
485 3188 :
486 3188 : {
487 3188 : let tasks = TASKS.lock().unwrap();
488 31697 : for task in tasks.values() {
489 31697 : if (kind.is_none() || Some(task.kind) == kind)
490 14662 : && (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
491 7794 : && (timeline_id.is_none() || task.timeline_id == timeline_id)
492 4501 : {
493 4501 : task.cancel.cancel();
494 4501 : victim_tasks.push((
495 4501 : Arc::clone(task),
496 4501 : task.kind,
497 4501 : task.tenant_shard_id,
498 4501 : task.timeline_id,
499 4501 : ));
500 27196 : }
501 : }
502 : }
503 :
504 3188 : let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none();
505 :
506 7689 : for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks {
507 4501 : let join_handle = {
508 4501 : let mut task_mut = task.mutable.lock().unwrap();
509 4501 : task_mut.join_handle.take()
510 : };
511 4501 : if let Some(mut join_handle) = join_handle {
512 4501 : if log_all {
513 6 : if tenant_shard_id.is_none() {
514 : // there are quite few of these
515 6 : info!(name = task.name, kind = ?task_kind, "stopping global task");
516 : } else {
517 : // warn to catch these in tests; there shouldn't be any
518 0 : warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
519 : }
520 4495 : }
521 4501 : if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
522 2287 : .await
523 4501 : .is_err()
524 : {
525 : // allow some time to elapse before logging to cut down the number of log
526 : // lines.
527 6 : info!("waiting for task {} to shut down", task.name);
528 : // we never handled this return value, but:
529 : // - we don't deschedule which would lead to is_cancelled
530 : // - panics are already logged (is_panicked)
531 : // - task errors are already logged in the wrapper
532 6 : let _ = join_handle.await;
533 6 : info!("task {} completed", task.name);
534 4495 : }
535 0 : } else {
536 0 : // Possibly one of:
537 0 : // * The task had not even fully started yet.
538 0 : // * It was shut down concurrently and already exited
539 0 : }
540 : }
541 3188 : }
542 :
543 4581762 : pub fn current_task_kind() -> Option<TaskKind> {
544 4581762 : CURRENT_TASK.try_with(|ct| ct.kind).ok()
545 4581762 : }
546 :
547 16 : pub fn current_task_id() -> Option<PageserverTaskId> {
548 16 : CURRENT_TASK.try_with(|ct| ct.task_id).ok()
549 16 : }
550 :
551 : /// A Future that can be used to check if the current task has been requested to
552 : /// shut down.
553 122424 : pub async fn shutdown_watcher() {
554 116994 : let token = SHUTDOWN_TOKEN
555 116994 : .try_with(|t| t.clone())
556 116994 : .expect("shutdown_watcher() called in an unexpected task or thread");
557 116994 :
558 833202 : token.cancelled().await;
559 404 : }
560 :
561 : /// Clone the current task's cancellation token, which can be moved across tasks.
562 : ///
563 : /// When the task which is currently executing is shutdown, the cancellation token will be
564 : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
565 : /// `tokio::task::JoinSet::spawn`.
566 70413 : pub fn shutdown_token() -> CancellationToken {
567 70413 : let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
568 70413 :
569 70413 : if cfg!(test) {
570 : // in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
571 1561 : res.unwrap_or_default()
572 : } else {
573 68852 : res.expect("shutdown_token() called in an unexpected task or thread")
574 : }
575 70413 : }
576 :
577 : /// Has the current task been requested to shut down?
578 497 : pub fn is_shutdown_requested() -> bool {
579 497 : if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) {
580 489 : true_or_false
581 : } else {
582 8 : if !cfg!(test) {
583 0 : warn!("is_shutdown_requested() called in an unexpected task or thread");
584 8 : }
585 8 : false
586 : }
587 497 : }
|