TLA Line data Source code
1 : //!
2 : //! This module provides centralized handling of tokio tasks in the Page Server.
3 : //!
4 : //! We provide a few basic facilities:
5 : //! - A global registry of tasks that lists what kind of tasks they are, and
6 : //! which tenant or timeline they are working on
7 : //!
8 : //! - The ability to request a task to shut down.
9 : //!
10 : //!
11 : //! # How it works?
12 : //!
13 : //! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
14 : //! task is spawned, a PageServerTask entry is added there, and when a
15 : //! task dies, it removes itself from the hashmap. If you want to kill a
16 : //! task, you can scan the hashmap to find it.
17 : //!
18 : //! # Task shutdown
19 : //!
20 : //! To kill a task, we rely on co-operation from the victim. Each task is
21 : //! expected to periodically call the `is_shutdown_requested()` function, and
22 : //! if it returns true, exit gracefully. In addition to that, when waiting for
23 : //! the network or other long-running operation, you can use
24 : //! `shutdown_watcher()` function to get a Future that will become ready if
25 : //! the current task has been requested to shut down. You can use that with
26 : //! Tokio select!().
27 : //!
28 : //! TODO: This would be a good place to also handle panics in a somewhat sane way.
29 : //! Depending on what task panics, we might want to kill the whole server, or
30 : //! only a single tenant or timeline.
31 : //!
32 :
33 : // Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
34 : // Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
35 : #![allow(clippy::declare_interior_mutable_const)]
36 :
37 : use std::collections::HashMap;
38 : use std::fmt;
39 : use std::future::Future;
40 : use std::panic::AssertUnwindSafe;
41 : use std::sync::atomic::{AtomicU64, Ordering};
42 : use std::sync::{Arc, Mutex};
43 :
44 : use futures::FutureExt;
45 : use tokio::runtime::Runtime;
46 : use tokio::task::JoinHandle;
47 : use tokio::task_local;
48 : use tokio_util::sync::CancellationToken;
49 :
50 : use tracing::{debug, error, info, warn};
51 :
52 : use once_cell::sync::Lazy;
53 :
54 : use utils::id::{TenantId, TimelineId};
55 :
56 : use crate::shutdown_pageserver;
57 :
58 : //
59 : // There are four runtimes:
60 : //
61 : // Compute request runtime
62 : // - used to handle connections from compute nodes. Any tasks related to satisfying
63 : // GetPage requests, base backups, import, and other such compute node operations
64 : // are handled by the Compute request runtime
65 : // - page_service.rs
66 : // - this includes layer downloads from remote storage, if a layer is needed to
67 : // satisfy a GetPage request
68 : //
69 : // Management request runtime
70 : // - used to handle HTTP API requests
71 : //
72 : // WAL receiver runtime:
73 : // - used to handle WAL receiver connections.
74 : // - and to receiver updates from storage_broker
75 : //
76 : // Background runtime
77 : // - layer flushing
78 : // - garbage collection
79 : // - compaction
80 : // - remote storage uploads
81 : // - initial tenant loading
82 : //
83 : // Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
84 : // runtime.
85 : //
86 : // There might be situations when one task needs to wait for a task running in another
87 : // Runtime to finish. For example, if a background operation needs a layer from remote
88 : // storage, it will start to download it. If a background operation needs a remote layer,
89 : // and the download was already initiated by a GetPage request, the background task
90 : // will wait for the download - running in the Page server runtime - to finish.
91 : // Another example: the initial tenant loading tasks are launched in the background ops
92 : // runtime. If a GetPage request comes in before the load of a tenant has finished, the
93 : // GetPage request will wait for the tenant load to finish.
94 : //
95 : // The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
96 : // protect data structures. Let's keep it that way. Synchronous code is easier to debug
97 : // and analyze, and there's a lot of hairy, low-level, performance critical code there.
98 : //
99 : // It's nice to have different runtimes, so that you can quickly eyeball how much CPU
100 : // time each class of operations is taking, with 'top -H' or similar.
101 : //
102 : // It's also good to avoid hogging all threads that would be needed to process
103 : // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
104 : // happen, but still.
105 : //
106 CBC 560 : pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
107 560 : tokio::runtime::Builder::new_multi_thread()
108 560 : .thread_name("compute request worker")
109 560 : .enable_all()
110 560 : .build()
111 560 : .expect("Failed to create compute request runtime")
112 560 : });
113 :
114 560 : pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
115 560 : tokio::runtime::Builder::new_multi_thread()
116 560 : .thread_name("mgmt request worker")
117 560 : .enable_all()
118 560 : .build()
119 560 : .expect("Failed to create mgmt request runtime")
120 560 : });
121 :
122 561 : pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
123 561 : tokio::runtime::Builder::new_multi_thread()
124 561 : .thread_name("walreceiver worker")
125 561 : .enable_all()
126 561 : .build()
127 561 : .expect("Failed to create walreceiver runtime")
128 561 : });
129 :
130 561 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
131 561 : tokio::runtime::Builder::new_multi_thread()
132 561 : .thread_name("background op worker")
133 561 : // if you change the number of worker threads please change the constant below
134 561 : .enable_all()
135 561 : .build()
136 561 : .expect("Failed to create background op runtime")
137 561 : });
138 :
139 243 : pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
140 243 : // force init and thus panics
141 243 : let _ = BACKGROUND_RUNTIME.handle();
142 243 : // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
143 243 : // tokio would had already panicked for parsing errors or NotUnicode
144 243 : //
145 243 : // this will be wrong if any of the runtimes gets their worker threads configured to something
146 243 : // else, but that has not been needed in a long time.
147 243 : std::env::var("TOKIO_WORKER_THREADS")
148 243 : .map(|s| s.parse::<usize>().unwrap())
149 243 : .unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
150 243 : });
151 :
152 UBC 0 : #[derive(Debug, Clone, Copy)]
153 : pub struct PageserverTaskId(u64);
154 :
155 : impl fmt::Display for PageserverTaskId {
156 CBC 23 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 23 : self.0.fmt(f)
158 23 : }
159 : }
160 :
161 : /// Each task that we track is associated with a "task ID". It's just an
162 : /// increasing number that we assign. Note that it is different from tokio::task::Id.
163 : static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
164 :
165 : /// Global registry of tasks
166 : static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
167 561 : Lazy::new(|| Mutex::new(HashMap::new()));
168 :
169 86 : task_local! {
170 86 : // This is a cancellation token which will be cancelled when a task needs to shut down. The
171 86 : // root token is kept in the global registry, so that anyone can send the signal to request
172 86 : // task shutdown.
173 86 : static SHUTDOWN_TOKEN: CancellationToken;
174 86 :
175 86 : // Each task holds reference to its own PageServerTask here.
176 86 : static CURRENT_TASK: Arc<PageServerTask>;
177 86 : }
178 :
179 : ///
180 : /// There are many kinds of tasks in the system. Some are associated with a particular
181 : /// tenant or timeline, while others are global.
182 : ///
183 : /// Note that we don't try to limit how many task of a certain kind can be running
184 : /// at the same time.
185 : ///
186 : #[derive(
187 3227 : Debug,
188 : // NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
189 962 : enumset::EnumSetType,
190 592224994 : enum_map::Enum,
191 UBC 0 : serde::Serialize,
192 0 : serde::Deserialize,
193 CBC 21159 : strum_macros::IntoStaticStr,
194 : )]
195 : pub enum TaskKind {
196 : // Pageserver startup, i.e., `main`
197 : Startup,
198 :
199 : // libpq listener task. It just accepts connection and spawns a
200 : // PageRequestHandler task for each connection.
201 : LibpqEndpointListener,
202 :
203 : // HTTP endpoint listener.
204 : HttpEndpointListener,
205 :
206 : // Task that handles a single connection. A PageRequestHandler task
207 : // starts detached from any particular tenant or timeline, but it can be
208 : // associated with one later, after receiving a command from the client.
209 : PageRequestHandler,
210 :
211 : /// Manages the WAL receiver connection for one timeline.
212 : /// It subscribes to events from storage_broker and decides which safekeeper to connect to.
213 : /// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
214 : /// There is at most one connection at any given time.
215 : ///
216 : /// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
217 : /// The `Client` object is what library users use to make requests & get responses.
218 : /// Internally, `Client` hands over requests to the `Connection` object.
219 : /// The `Connection` object is responsible for speaking the wire protocol.
220 : ///
221 : /// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
222 : /// That abstraction doesn't use `task_mgr`.
223 : /// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
224 : /// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
225 : ///
226 : /// Once the connection is established, the `TaskHandle` task creates a
227 : /// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
228 : /// the `Connection` object.
229 : /// A `CancellationToken` created by the `TaskHandle` task ensures
230 : /// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
231 : ///
232 : /// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
233 : /// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
234 : WalReceiverManager,
235 :
236 : /// The `TaskHandle` task that executes `handle_walreceiver_connection`.
237 : /// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
238 : /// See the comment on [`WalReceiverManager`].
239 : ///
240 : /// [`WalReceiverManager`]: Self::WalReceiverManager
241 : WalReceiverConnectionHandler,
242 :
243 : /// The task that polls the `tokio-postgres::Connection` object.
244 : /// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
245 : /// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
246 : WalReceiverConnectionPoller,
247 :
248 : // Garbage collection worker. One per tenant
249 : GarbageCollector,
250 :
251 : // Compaction. One per tenant.
252 : Compaction,
253 :
254 : // Eviction. One per timeline.
255 : Eviction,
256 :
257 : /// See [`crate::disk_usage_eviction_task`].
258 : DiskUsageEviction,
259 :
260 : // Initial logical size calculation
261 : InitialLogicalSizeCalculation,
262 :
263 : OndemandLogicalSizeCalculation,
264 :
265 : // Task that flushes frozen in-memory layers to disk
266 : LayerFlushTask,
267 :
268 : // Task that uploads a file to remote storage
269 : RemoteUploadTask,
270 :
271 : // Task that downloads a file from remote storage
272 : RemoteDownloadTask,
273 :
274 : // task that handles the initial downloading of all tenants
275 : InitialLoad,
276 :
277 : // task that handles attaching a tenant
278 : Attach,
279 :
280 : // Used mostly for background deletion from s3
281 : TimelineDeletionWorker,
282 :
283 : // task that handhes metrics collection
284 : MetricsCollection,
285 :
286 : // task that drives downloading layers
287 : DownloadAllRemoteLayers,
288 : // Task that calculates synthetis size for all active tenants
289 : CalculateSyntheticSize,
290 :
291 : // A request that comes in via the pageserver HTTP API.
292 : MgmtRequest,
293 :
294 : DebugTool,
295 :
296 : #[cfg(test)]
297 : UnitTest,
298 : }
299 :
300 UBC 0 : #[derive(Default)]
301 : struct MutableTaskState {
302 : /// Tenant and timeline that this task is associated with.
303 : tenant_id: Option<TenantId>,
304 : timeline_id: Option<TimelineId>,
305 :
306 : /// Handle for waiting for the task to exit. It can be None, if the
307 : /// the task has already exited.
308 : join_handle: Option<JoinHandle<()>>,
309 : }
310 :
311 : struct PageServerTask {
312 : #[allow(dead_code)] // unused currently
313 : task_id: PageserverTaskId,
314 :
315 : kind: TaskKind,
316 :
317 : name: String,
318 :
319 : // To request task shutdown, just cancel this token.
320 : cancel: CancellationToken,
321 :
322 : mutable: Mutex<MutableTaskState>,
323 : }
324 :
325 : /// Launch a new task
326 : /// Note: if shutdown_process_on_error is set to true failure
327 : /// of the task will lead to shutdown of entire process
328 CBC 40029 : pub fn spawn<F>(
329 40029 : runtime: &tokio::runtime::Handle,
330 40029 : kind: TaskKind,
331 40029 : tenant_id: Option<TenantId>,
332 40029 : timeline_id: Option<TimelineId>,
333 40029 : name: &str,
334 40029 : shutdown_process_on_error: bool,
335 40029 : future: F,
336 40029 : ) -> PageserverTaskId
337 40029 : where
338 40029 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
339 40029 : {
340 40029 : let cancel = CancellationToken::new();
341 40029 : let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
342 40029 : let task = Arc::new(PageServerTask {
343 40029 : task_id: PageserverTaskId(task_id),
344 40029 : kind,
345 40029 : name: name.to_string(),
346 40029 : cancel: cancel.clone(),
347 40029 : mutable: Mutex::new(MutableTaskState {
348 40029 : tenant_id,
349 40029 : timeline_id,
350 40029 : join_handle: None,
351 40029 : }),
352 40029 : });
353 40029 :
354 40029 : TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
355 40029 :
356 40029 : let mut task_mut = task.mutable.lock().unwrap();
357 40029 :
358 40029 : let task_name = name.to_string();
359 40029 : let task_cloned = Arc::clone(&task);
360 40029 : let join_handle = runtime.spawn(task_wrapper(
361 40029 : task_name,
362 40029 : task_id,
363 40029 : task_cloned,
364 40029 : cancel,
365 40029 : shutdown_process_on_error,
366 40029 : future,
367 40029 : ));
368 40029 : task_mut.join_handle = Some(join_handle);
369 40029 : drop(task_mut);
370 40029 :
371 40029 : // The task is now running. Nothing more to do here
372 40029 : PageserverTaskId(task_id)
373 40029 : }
374 :
375 : /// This wrapper function runs in a newly-spawned task. It initializes the
376 : /// task-local variables and calls the payload function.
377 40029 : async fn task_wrapper<F>(
378 40029 : task_name: String,
379 40029 : task_id: u64,
380 40029 : task: Arc<PageServerTask>,
381 40029 : shutdown_token: CancellationToken,
382 40029 : shutdown_process_on_error: bool,
383 40029 : future: F,
384 40029 : ) where
385 40029 : F: Future<Output = anyhow::Result<()>> + Send + 'static,
386 40029 : {
387 UBC 0 : debug!("Starting task '{}'", task_name);
388 :
389 CBC 40021 : let result = SHUTDOWN_TOKEN
390 40021 : .scope(
391 40021 : shutdown_token,
392 40021 : CURRENT_TASK.scope(task, {
393 40021 : // We use AssertUnwindSafe here so that the payload function
394 40021 : // doesn't need to be UnwindSafe. We don't do anything after the
395 40021 : // unwinding that would expose us to unwind-unsafe behavior.
396 40021 : AssertUnwindSafe(future).catch_unwind()
397 40021 : }),
398 40021 : )
399 16545635 : .await;
400 35843 : task_finish(result, task_name, task_id, shutdown_process_on_error).await;
401 35843 : }
402 :
403 35843 : async fn task_finish(
404 35843 : result: std::result::Result<
405 35843 : anyhow::Result<()>,
406 35843 : std::boxed::Box<dyn std::any::Any + std::marker::Send>,
407 35843 : >,
408 35843 : task_name: String,
409 35843 : task_id: u64,
410 35843 : shutdown_process_on_error: bool,
411 35843 : ) {
412 35843 : // Remove our entry from the global hashmap.
413 35843 : let task = TASKS
414 35843 : .lock()
415 35843 : .unwrap()
416 35843 : .remove(&task_id)
417 35843 : .expect("no task in registry");
418 35843 :
419 35843 : let mut shutdown_process = false;
420 35843 : {
421 35843 : let task_mut = task.mutable.lock().unwrap();
422 :
423 35843 : match result {
424 : Ok(Ok(())) => {
425 UBC 0 : debug!("Task '{}' exited normally", task_name);
426 : }
427 CBC 4 : Ok(Err(err)) => {
428 4 : if shutdown_process_on_error {
429 UBC 0 : error!(
430 0 : "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
431 0 : task_name, task_mut.tenant_id, task_mut.timeline_id, err
432 0 : );
433 0 : shutdown_process = true;
434 : } else {
435 CBC 4 : error!(
436 4 : "Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
437 4 : task_name, task_mut.tenant_id, task_mut.timeline_id, err
438 4 : );
439 : }
440 : }
441 UBC 0 : Err(err) => {
442 0 : if shutdown_process_on_error {
443 0 : error!(
444 0 : "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
445 0 : task_name, task_mut.tenant_id, task_mut.timeline_id, err
446 0 : );
447 0 : shutdown_process = true;
448 : } else {
449 0 : error!(
450 0 : "Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
451 0 : task_name, task_mut.tenant_id, task_mut.timeline_id, err
452 0 : );
453 : }
454 : }
455 : }
456 : }
457 :
458 CBC 35843 : if shutdown_process {
459 UBC 0 : shutdown_pageserver(None, 1).await;
460 CBC 35843 : }
461 35843 : }
462 :
463 : // expected to be called from the task of the given id.
464 4443 : pub fn associate_with(tenant_id: Option<TenantId>, timeline_id: Option<TimelineId>) {
465 4443 : CURRENT_TASK.with(|ct| {
466 4443 : let mut task_mut = ct.mutable.lock().unwrap();
467 4443 : task_mut.tenant_id = tenant_id;
468 4443 : task_mut.timeline_id = timeline_id;
469 4443 : });
470 4443 : }
471 :
472 : /// Is there a task running that matches the criteria
473 :
474 : /// Signal and wait for tasks to shut down.
475 : ///
476 : ///
477 : /// The arguments are used to select the tasks to kill. Any None arguments are
478 : /// ignored. For example, to shut down all WalReceiver tasks:
479 : ///
480 : /// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
481 : ///
482 : /// Or to shut down all tasks for given timeline:
483 : ///
484 : /// shutdown_tasks(None, Some(tenant_id), Some(timeline_id))
485 : ///
486 1692 : pub async fn shutdown_tasks(
487 1692 : kind: Option<TaskKind>,
488 1692 : tenant_id: Option<TenantId>,
489 1692 : timeline_id: Option<TimelineId>,
490 1692 : ) {
491 1692 : let mut victim_tasks = Vec::new();
492 1692 :
493 1692 : {
494 1692 : let tasks = TASKS.lock().unwrap();
495 19149 : for task in tasks.values() {
496 19149 : let task_mut = task.mutable.lock().unwrap();
497 19149 : if (kind.is_none() || Some(task.kind) == kind)
498 7493 : && (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
499 6103 : && (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
500 5099 : {
501 5099 : task.cancel.cancel();
502 5099 : victim_tasks.push((
503 5099 : Arc::clone(task),
504 5099 : task.kind,
505 5099 : task_mut.tenant_id,
506 5099 : task_mut.timeline_id,
507 5099 : ));
508 14050 : }
509 : }
510 : }
511 :
512 1692 : let log_all = kind.is_none() && tenant_id.is_none() && timeline_id.is_none();
513 :
514 6791 : for (task, task_kind, tenant_id, timeline_id) in victim_tasks {
515 5099 : let join_handle = {
516 5099 : let mut task_mut = task.mutable.lock().unwrap();
517 5099 : task_mut.join_handle.take()
518 : };
519 5099 : if let Some(mut join_handle) = join_handle {
520 5099 : if log_all {
521 6 : if tenant_id.is_none() {
522 : // there are quite few of these
523 6 : info!(name = task.name, kind = ?task_kind, "stopping global task");
524 : } else {
525 : // warn to catch these in tests; there shouldn't be any
526 UBC 0 : warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
527 : }
528 CBC 5093 : }
529 5099 : if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
530 1509 : .await
531 5099 : .is_err()
532 : {
533 : // allow some time to elapse before logging to cut down the number of log
534 : // lines.
535 4 : info!("waiting for {} to shut down", task.name);
536 : // we never handled this return value, but:
537 : // - we don't deschedule which would lead to is_cancelled
538 : // - panics are already logged (is_panicked)
539 : // - task errors are already logged in the wrapper
540 4 : let _ = join_handle.await;
541 5095 : }
542 UBC 0 : } else {
543 0 : // Possibly one of:
544 0 : // * The task had not even fully started yet.
545 0 : // * It was shut down concurrently and already exited
546 0 : }
547 : }
548 CBC 1692 : }
549 :
550 3835860 : pub fn current_task_kind() -> Option<TaskKind> {
551 3835860 : CURRENT_TASK.try_with(|ct| ct.kind).ok()
552 3835860 : }
553 :
554 20 : pub fn current_task_id() -> Option<PageserverTaskId> {
555 20 : CURRENT_TASK.try_with(|ct| ct.task_id).ok()
556 20 : }
557 :
558 : /// A Future that can be used to check if the current task has been requested to
559 : /// shut down.
560 37931 : pub async fn shutdown_watcher() {
561 37712 : let token = SHUTDOWN_TOKEN
562 37712 : .try_with(|t| t.clone())
563 37712 : .expect("shutdown_watcher() called in an unexpected task or thread");
564 37712 :
565 953836 : token.cancelled().await;
566 771 : }
567 :
568 : /// Clone the current task's cancellation token, which can be moved across tasks.
569 : ///
570 : /// When the task which is currently executing is shutdown, the cancellation token will be
571 : /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
572 : /// `tokio::task::JoinSet::spawn`.
573 12474 : pub fn shutdown_token() -> CancellationToken {
574 12474 : SHUTDOWN_TOKEN
575 12474 : .try_with(|t| t.clone())
576 12474 : .expect("shutdown_token() called in an unexpected task or thread")
577 12474 : }
578 :
579 : /// Has the current task been requested to shut down?
580 : pub fn is_shutdown_requested() -> bool {
581 28363 : if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
582 28359 : cancel.is_cancelled()
583 : } else {
584 4 : if !cfg!(test) {
585 UBC 0 : warn!("is_shutdown_requested() called in an unexpected task or thread");
586 CBC 4 : }
587 4 : false
588 : }
589 28363 : }
|