Line data Source code
1 : //!
2 : //! This module provides centralized handling of tokio tasks in the Page Server.
3 : //!
4 : //! We provide a few basic facilities:
5 : //! - A global registry of tasks that lists what kind of tasks they are, and
6 : //! which tenant or timeline they are working on
7 : //!
8 : //! - The ability to request a task to shut down.
9 : //!
10 : //!
11 : //! # How it works?
12 : //!
13 : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
14 : //! task is spawned, a PageServerTask entry is added there, and when a
15 : //! task dies, it removes itself from the hashmap. If you want to kill a
16 : //! task, you can scan the hashmap to find it.
17 : //!
18 : //! # Task shutdown
19 : //!
20 : //! To kill a task, we rely on co-operation from the victim. Each task is
21 : //! expected to periodically call the `is_shutdown_requested()` function, and
22 : //! if it returns true, exit gracefully. In addition to that, when waiting for
23 : //! the network or other long-running operation, you can use
24 : //! `shutdown_watcher()` function to get a Future that will become ready if
25 : //! the current task has been requested to shut down. You can use that with
26 : //! Tokio select!().
27 : //!
28 : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
29 : //! Depending on what task panics, we might want to kill the whole server, or
30 : //! only a single tenant or timeline.
31 : //!
32 :
33 : use std::collections::HashMap;
34 : use std::fmt;
35 : use std::future::Future;
36 : use std::panic::AssertUnwindSafe;
37 : use std::sync::atomic::{AtomicU64, Ordering};
38 : use std::sync::{Arc, Mutex};
39 :
40 : use futures::FutureExt;
41 : use pageserver_api::shard::TenantShardId;
42 : use tokio::runtime::Runtime;
43 : use tokio::task::JoinHandle;
44 : use tokio::task_local;
45 : use tokio_util::sync::CancellationToken;
46 :
47 : use tracing::{debug, error, info, warn};
48 :
49 : use once_cell::sync::Lazy;
50 :
51 : use utils::id::TimelineId;
52 :
53 : use crate::shutdown_pageserver;
54 :
55 : //
56 : // There are four runtimes:
57 : //
58 : // Compute request runtime
59 : // - used to handle connections from compute nodes. Any tasks related to satisfying
60 : // GetPage requests, base backups, import, and other such compute node operations
61 : // are handled by the Compute request runtime
62 : // - page_service.rs
63 : // - this includes layer downloads from remote storage, if a layer is needed to
64 : // satisfy a GetPage request
65 : //
66 : // Management request runtime
67 : // - used to handle HTTP API requests
68 : //
69 : // WAL receiver runtime:
70 : // - used to handle WAL receiver connections.
71 : // - and to receiver updates from storage_broker
72 : //
73 : // Background runtime
74 : // - layer flushing
75 : // - garbage collection
76 : // - compaction
77 : // - remote storage uploads
78 : // - initial tenant loading
79 : //
80 : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
81 : // runtime.
82 : //
83 : // There might be situations when one task needs to wait for a task running in another
84 : // Runtime to finish. For example, if a background operation needs a layer from remote
85 : // storage, it will start to download it. If a background operation needs a remote layer,
86 : // and the download was already initiated by a GetPage request, the background task
87 : // will wait for the download - running in the Page server runtime - to finish.
88 : // Another example: the initial tenant loading tasks are launched in the background ops
89 : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
90 : // GetPage request will wait for the tenant load to finish.
91 : //
92 : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
93 : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
94 : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
95 : //
96 : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
97 : // time each class of operations is taking, with 'top -H' or similar.
98 : //
99 : // It's also good to avoid hogging all threads that would be needed to process
100 : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
101 : // happen, but still.
102 : //
103 0 : pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
104 0 : tokio::runtime::Builder::new_multi_thread()
105 0 : .thread_name("compute request worker")
106 0 : .enable_all()
107 0 : .build()
108 0 : .expect("Failed to create compute request runtime")
109 0 : });
110 :
111 0 : pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
112 0 : tokio::runtime::Builder::new_multi_thread()
113 0 : .thread_name("mgmt request worker")
114 0 : .enable_all()
115 0 : .build()
116 0 : .expect("Failed to create mgmt request runtime")
117 0 : });
118 :
119 10 : pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
120 10 : tokio::runtime::Builder::new_multi_thread()
121 10 : .thread_name("walreceiver worker")
122 10 : .enable_all()
123 10 : .build()
124 10 : .expect("Failed to create walreceiver runtime")
125 10 : });
126 :
127 76 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
128 76 : tokio::runtime::Builder::new_multi_thread()
129 76 : .thread_name("background op worker")
130 76 : // if you change the number of worker threads please change the constant below
131 76 : .enable_all()
132 76 : .build()
133 76 : .expect("Failed to create background op runtime")
134 76 : });
135 :
136 10 : pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
137 10 : // force init and thus panics
138 10 : let _ = BACKGROUND_RUNTIME.handle();
139 10 : // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
140 10 : // tokio would had already panicked for parsing errors or NotUnicode
141 10 : //
142 10 : // this will be wrong if any of the runtimes gets their worker threads configured to something
143 10 : // else, but that has not been needed in a long time.
144 10 : std::env::var("TOKIO_WORKER_THREADS")
145 10 : .map(|s| s.parse::<usize>().unwrap())
146 10 : .unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
147 10 : });
148 :
149 0 : #[derive(Debug, Clone, Copy)]
150 : pub struct PageserverTaskId(u64);
151 :
152 : impl fmt::Display for PageserverTaskId {
153 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 0 : self.0.fmt(f)
155 0 : }
156 : }
157 :
158 : /// Each task that we track is associated with a "task ID". It's just an
159 : /// increasing number that we assign. Note that it is different from tokio::task::Id.
160 : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
161 :
162 : /// Global registry of tasks
163 : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
164 80 : Lazy::new(|| Mutex::new(HashMap::new()));
165 :
166 152 : task_local! {
167 152 : // This is a cancellation token which will be cancelled when a task needs to shut down. The
168 152 : // root token is kept in the global registry, so that anyone can send the signal to request
169 152 : // task shutdown.
170 152 : static SHUTDOWN_TOKEN: CancellationToken;
171 152 :
172 152 : // Each task holds reference to its own PageServerTask here.
173 152 : static CURRENT_TASK: Arc<PageServerTask>;
174 152 : }
175 :
176 : ///
177 : /// There are many kinds of tasks in the system. Some are associated with a particular
178 : /// tenant or timeline, while others are global.
179 : ///
180 : /// Note that we don't try to limit how many task of a certain kind can be running
181 : /// at the same time.
182 : ///
183 : #[derive(
184 0 : Debug,
185 : // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
186 0 : enumset::EnumSetType,
187 10223608 : enum_map::Enum,
188 0 : serde::Serialize,
189 0 : serde::Deserialize,
190 2308 : strum_macros::IntoStaticStr,
191 0 : strum_macros::EnumString,
192 : )]
193 : pub enum TaskKind {
194 : // Pageserver startup, i.e., `main`
195 : Startup,
196 :
197 : // libpq listener task. It just accepts connection and spawns a
198 : // PageRequestHandler task for each connection.
199 : LibpqEndpointListener,
200 :
201 : // HTTP endpoint listener.
202 : HttpEndpointListener,
203 :
204 : // Task that handles a single connection. A PageRequestHandler task
205 : // starts detached from any particular tenant or timeline, but it can be
206 : // associated with one later, after receiving a command from the client.
207 : PageRequestHandler,
208 :
209 : /// Manages the WAL receiver connection for one timeline.
210 : /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
211 : /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
212 : /// There is at most one connection at any given time.
213 : ///
214 : /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
215 : /// The `Client` object is what library users use to make requests & get responses.
216 : /// Internally, `Client` hands over requests to the `Connection` object.
217 : /// The `Connection` object is responsible for speaking the wire protocol.
218 : ///
219 : /// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
220 : /// That abstraction doesn't use `task_mgr`.
221 : /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
222 : /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
223 : ///
224 : /// Once the connection is established, the `TaskHandle` task creates a
225 : /// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
226 : /// the `Connection` object.
227 : /// A `CancellationToken` created by the `TaskHandle` task ensures
228 : /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
229 : ///
230 : /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
231 : /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
232 : WalReceiverManager,
233 :
234 : /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
235 : /// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
236 : /// See the comment on [`WalReceiverManager`].
237 : ///
238 : /// [`WalReceiverManager`]: Self::WalReceiverManager
239 : WalReceiverConnectionHandler,
240 :
241 : /// The task that polls the `tokio-postgres::Connection` object.
242 : /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
243 : /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
244 : WalReceiverConnectionPoller,
245 :
246 : // Garbage collection worker. One per tenant
247 : GarbageCollector,
248 :
249 : // Compaction. One per tenant.
250 : Compaction,
251 :
252 : // Eviction. One per timeline.
253 : Eviction,
254 :
255 : /// See [`crate::disk_usage_eviction_task`].
256 : DiskUsageEviction,
257 :
258 : /// See [`crate::tenant::secondary`].
259 : SecondaryDownloads,
260 :
261 : /// See [`crate::tenant::secondary`].
262 : SecondaryUploads,
263 :
264 : // Initial logical size calculation
265 : InitialLogicalSizeCalculation,
266 :
267 : OndemandLogicalSizeCalculation,
268 :
269 : // Task that flushes frozen in-memory layers to disk
270 : LayerFlushTask,
271 :
272 : // Task that uploads a file to remote storage
273 : RemoteUploadTask,
274 :
275 : // Task that downloads a file from remote storage
276 : RemoteDownloadTask,
277 :
278 : // task that handles the initial downloading of all tenants
279 : InitialLoad,
280 :
281 : // task that handles attaching a tenant
282 : Attach,
283 :
284 : // Used mostly for background deletion from s3
285 : TimelineDeletionWorker,
286 :
287 : // task that handhes metrics collection
288 : MetricsCollection,
289 :
290 : // task that drives downloading layers
291 : DownloadAllRemoteLayers,
292 : // Task that calculates synthetis size for all active tenants
293 : CalculateSyntheticSize,
294 :
295 : // A request that comes in via the pageserver HTTP API.
296 : MgmtRequest,
297 :
298 : DebugTool,
299 :
300 : #[cfg(test)]
301 : UnitTest,
302 : }
303 :
304 0 : #[derive(Default)]
305 : struct MutableTaskState {
306 : /// Handle for waiting for the task to exit. It can be None, if the
307 : /// the task has already exited.
308 : join_handle: Option<JoinHandle<()>>,
309 : }
310 :
311 : struct PageServerTask {
312 : task_id: PageserverTaskId,
313 :
314 : kind: TaskKind,
315 :
316 : name: String,
317 :
318 : // To request task shutdown, just cancel this token.
319 : cancel: CancellationToken,
320 :
321 : /// Tasks may optionally be launched for a particular tenant/timeline, enabling
322 : /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
323 : tenant_shard_id: Option<TenantShardId>,
324 : timeline_id: Option<TimelineId>,
325 :
326 : mutable: Mutex<MutableTaskState>,
327 : }
328 :
329 : /// Launch a new task
330 : /// Note: if shutdown_process_on_error is set to true failure
331 : /// of the task will lead to shutdown of entire process
332 1845 : pub fn spawn<F>(
333 1845 : runtime: &tokio::runtime::Handle,
334 1845 : kind: TaskKind,
335 1845 : tenant_shard_id: Option<TenantShardId>,
336 1845 : timeline_id: Option<TimelineId>,
337 1845 : name: &str,
338 1845 : shutdown_process_on_error: bool,
339 1845 : future: F,
340 1845 : ) -> PageserverTaskId
341 1845 : where
342 1845 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
343 1845 : {
344 1845 : let cancel = CancellationToken::new();
345 1845 : let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
346 1845 : let task = Arc::new(PageServerTask {
347 1845 : task_id: PageserverTaskId(task_id),
348 1845 : kind,
349 1845 : name: name.to_string(),
350 1845 : cancel: cancel.clone(),
351 1845 : tenant_shard_id,
352 1845 : timeline_id,
353 1845 : mutable: Mutex::new(MutableTaskState { join_handle: None }),
354 1845 : });
355 1845 :
356 1845 : TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
357 1845 :
358 1845 : let mut task_mut = task.mutable.lock().unwrap();
359 1845 :
360 1845 : let task_name = name.to_string();
361 1845 : let task_cloned = Arc::clone(&task);
362 1845 : let join_handle = runtime.spawn(task_wrapper(
363 1845 : task_name,
364 1845 : task_id,
365 1845 : task_cloned,
366 1845 : cancel,
367 1845 : shutdown_process_on_error,
368 1845 : future,
369 1845 : ));
370 1845 : task_mut.join_handle = Some(join_handle);
371 1845 : drop(task_mut);
372 1845 :
373 1845 : // The task is now running. Nothing more to do here
374 1845 : PageserverTaskId(task_id)
375 1845 : }
376 :
377 : /// This wrapper function runs in a newly-spawned task. It initializes the
378 : /// task-local variables and calls the payload function.
379 1845 : async fn task_wrapper<F>(
380 1845 : task_name: String,
381 1845 : task_id: u64,
382 1845 : task: Arc<PageServerTask>,
383 1845 : shutdown_token: CancellationToken,
384 1845 : shutdown_process_on_error: bool,
385 1845 : future: F,
386 1845 : ) where
387 1845 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
388 1845 : {
389 0 : debug!("Starting task '{}'", task_name);
390 :
391 1838 : let result = SHUTDOWN_TOKEN
392 1838 : .scope(
393 1838 : shutdown_token,
394 1838 : CURRENT_TASK.scope(task, {
395 1838 : // We use AssertUnwindSafe here so that the payload function
396 1838 : // doesn't need to be UnwindSafe. We don't do anything after the
397 1838 : // unwinding that would expose us to unwind-unsafe behavior.
398 1838 : AssertUnwindSafe(future).catch_unwind()
399 1838 : }),
400 1838 : )
401 24492 : .await;
402 1540 : task_finish(result, task_name, task_id, shutdown_process_on_error).await;
403 1540 : }
404 :
405 1540 : async fn task_finish(
406 1540 : result: std::result::Result<
407 1540 : anyhow::Result<()>,
408 1540 : std::boxed::Box<dyn std::any::Any + std::marker::Send>,
409 1540 : >,
410 1540 : task_name: String,
411 1540 : task_id: u64,
412 1540 : shutdown_process_on_error: bool,
413 1540 : ) {
414 1540 : // Remove our entry from the global hashmap.
415 1540 : let task = TASKS
416 1540 : .lock()
417 1540 : .unwrap()
418 1540 : .remove(&task_id)
419 1540 : .expect("no task in registry");
420 1540 :
421 1540 : let mut shutdown_process = false;
422 : {
423 1540 : match result {
424 : Ok(Ok(())) => {
425 0 : debug!("Task '{}' exited normally", task_name);
426 : }
427 0 : Ok(Err(err)) => {
428 0 : if shutdown_process_on_error {
429 0 : error!(
430 0 : "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
431 0 : task_name, task.tenant_shard_id, task.timeline_id, err
432 0 : );
433 0 : shutdown_process = true;
434 : } else {
435 0 : error!(
436 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
437 0 : task_name, task.tenant_shard_id, task.timeline_id, err
438 0 : );
439 : }
440 : }
441 0 : Err(err) => {
442 0 : if shutdown_process_on_error {
443 0 : error!(
444 0 : "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
445 0 : task_name, task.tenant_shard_id, task.timeline_id, err
446 0 : );
447 0 : shutdown_process = true;
448 : } else {
449 0 : error!(
450 0 : "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
451 0 : task_name, task.tenant_shard_id, task.timeline_id, err
452 0 : );
453 : }
454 : }
455 : }
456 : }
457 :
458 1540 : if shutdown_process {
459 0 : shutdown_pageserver(None, 1).await;
460 1540 : }
461 1540 : }
462 :
463 : /// Signal and wait for tasks to shut down.
464 : ///
465 : ///
466 : /// The arguments are used to select the tasks to kill. Any None arguments are
467 : /// ignored. For example, to shut down all WalReceiver tasks:
468 : ///
469 : /// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
470 : ///
471 : /// Or to shut down all tasks for given timeline:
472 : ///
473 : /// shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id))
474 : ///
475 28 : pub async fn shutdown_tasks(
476 28 : kind: Option<TaskKind>,
477 28 : tenant_shard_id: Option<TenantShardId>,
478 28 : timeline_id: Option<TimelineId>,
479 28 : ) {
480 28 : let mut victim_tasks = Vec::new();
481 28 :
482 28 : {
483 28 : let tasks = TASKS.lock().unwrap();
484 30 : for task in tasks.values() {
485 30 : if (kind.is_none() || Some(task.kind) == kind)
486 12 : && (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
487 12 : && (timeline_id.is_none() || task.timeline_id == timeline_id)
488 6 : {
489 6 : task.cancel.cancel();
490 6 : victim_tasks.push((
491 6 : Arc::clone(task),
492 6 : task.kind,
493 6 : task.tenant_shard_id,
494 6 : task.timeline_id,
495 6 : ));
496 24 : }
497 : }
498 : }
499 :
500 28 : let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none();
501 :
502 34 : for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks {
503 6 : let join_handle = {
504 6 : let mut task_mut = task.mutable.lock().unwrap();
505 6 : task_mut.join_handle.take()
506 : };
507 6 : if let Some(mut join_handle) = join_handle {
508 6 : if log_all {
509 0 : if tenant_shard_id.is_none() {
510 : // there are quite few of these
511 0 : info!(name = task.name, kind = ?task_kind, "stopping global task");
512 : } else {
513 : // warn to catch these in tests; there shouldn't be any
514 0 : warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
515 : }
516 6 : }
517 6 : if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
518 6 : .await
519 6 : .is_err()
520 : {
521 : // allow some time to elapse before logging to cut down the number of log
522 : // lines.
523 0 : info!("waiting for task {} to shut down", task.name);
524 : // we never handled this return value, but:
525 : // - we don't deschedule which would lead to is_cancelled
526 : // - panics are already logged (is_panicked)
527 : // - task errors are already logged in the wrapper
528 0 : let _ = join_handle.await;
529 0 : info!("task {} completed", task.name);
530 6 : }
531 0 : } else {
532 0 : // Possibly one of:
533 0 : // * The task had not even fully started yet.
534 0 : // * It was shut down concurrently and already exited
535 0 : }
536 : }
537 28 : }
538 :
539 680301 : pub fn current_task_kind() -> Option<TaskKind> {
540 680301 : CURRENT_TASK.try_with(|ct| ct.kind).ok()
541 680301 : }
542 :
543 0 : pub fn current_task_id() -> Option<PageserverTaskId> {
544 0 : CURRENT_TASK.try_with(|ct| ct.task_id).ok()
545 0 : }
546 :
547 : /// A Future that can be used to check if the current task has been requested to
548 : /// shut down.
549 822 : pub async fn shutdown_watcher() {
550 801 : let token = SHUTDOWN_TOKEN
551 801 : .try_with(|t| t.clone())
552 801 : .expect("shutdown_watcher() called in an unexpected task or thread");
553 801 :
554 801 : token.cancelled().await;
555 4 : }
556 :
557 : /// Clone the current task's cancellation token, which can be moved across tasks.
558 : ///
559 : /// When the task which is currently executing is shutdown, the cancellation token will be
560 : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
561 : /// `tokio::task::JoinSet::spawn`.
562 1954 : pub fn shutdown_token() -> CancellationToken {
563 1954 : let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
564 1954 :
565 1954 : if cfg!(test) {
566 : // in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
567 1954 : res.unwrap_or_default()
568 : } else {
569 0 : res.expect("shutdown_token() called in an unexpected task or thread")
570 : }
571 1954 : }
572 :
573 : /// Has the current task been requested to shut down?
574 8 : pub fn is_shutdown_requested() -> bool {
575 8 : if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) {
576 0 : true_or_false
577 : } else {
578 8 : if !cfg!(test) {
579 0 : warn!("is_shutdown_requested() called in an unexpected task or thread");
580 8 : }
581 8 : false
582 : }
583 8 : }
|