Line data Source code
1 : //! Defines [`RequestContext`].
2 : //!
3 : //! It is a structure that we use throughout the pageserver to propagate
4 : //! high-level context from places that _originate_ activity down to the
5 : //! shared code paths at the heart of the pageserver. It's inspired by
6 : //! Golang's `context.Context`.
7 : //!
8 : //! For example, in `Timeline::get(page_nr, lsn)` we need to answer the following questions:
9 : //! 1. What high-level activity ([`TaskKind`]) needs this page?
10 : //! We need that information as a categorical dimension for page access
11 : //! statistics, which we, in turn, need to guide layer eviction policy design.
12 : //! 2. How should we behave if, to produce the page image, we need to
13 : //! on-demand download a layer file ([`DownloadBehavior`]).
14 : //!
15 : //! [`RequestContext`] satisfies those needs.
16 : //! The current implementation is a small `struct` that is passed through
17 : //! the call chain by reference.
18 : //!
19 : //! ### Future Work
20 : //!
21 : //! However, we do not intend to stop here, since there are other needs that
22 : //! require carrying information from high to low levels of the app.
23 : //!
24 : //! Most importantly, **cancellation signaling** in response to
25 : //! 1. timeouts (page_service max response time) and
26 : //! 2. lifecycle requests (detach tenant, delete timeline).
27 : //!
28 : //! Related to that, there is sometimes a need to ensure that all tokio tasks spawned
29 : //! by the transitive callees of a request have finished. The keyword here
30 : //! is **Structured Concurrency**, and right now, we use `task_mgr` in most places,
31 : //! `TaskHandle` in some places, and careful code review around `FuturesUnordered`
32 : //! or `JoinSet` in other places.
33 : //!
34 : //! We do not yet have a systematic cancellation story in pageserver, and it is
35 : //! pretty clear that [`RequestContext`] will be responsible for that.
36 : //! So, the API already prepares for this role through the
37 : //! [`RequestContext::detached_child`] and [`RequestContext::attached_child`] methods.
38 : //! See their doc comments for details on how we will use them in the future.
39 : //!
40 : //! It is not clear whether or how we will enforce Structured Concurrency, and
41 : //! what role [`RequestContext`] will play there.
42 : //! So, the API doesn't prepare us for this topic.
43 : //!
44 : //! Other future uses of `RequestContext`:
45 : //! - Communicate compute & IO priorities (user-initiated request vs. background-loop)
46 : //! - Request IDs for distributed tracing
47 : //! - Request/Timeline/Tenant-scoped log levels
48 : //!
49 : //! RequestContext might look quite different once it supports those features.
50 : //! Likely, it will have a shape similar to Golang's `context.Context`.
51 : //!
52 : //! ### Why A Struct Instead Of Method Parameters
53 : //!
54 : //! What's typical about such information is that it needs to be passed down
55 : //! along the call chain from high level to low level, but few of the functions
56 : //! in the middle need to understand it.
57 : //! Further, it is to be expected that we will need to propagate more data
58 : //! in the future (see the earlier section on future work).
59 : //! Hence, for functions in the middle of the call chain, we have the following
60 : //! requirements:
61 : //! 1. It should be easy to forward the context to callees.
62 : //! 2. To propagate more data from high-level to low-level code, the functions in
63 : //! the middle should not need to be modified.
64 : //!
65 : //! The solution is to have a container structure ([`RequestContext`]) that
66 : //! carries the information. Functions that don't care about what's in it
67 : //! pass it along to callees.
68 : //!
69 : //! ### Why Not Task-Local Variables
70 : //!
71 : //! One could use task-local variables (the equivalent of thread-local variables)
72 : //! to address the immediate needs outlined above.
73 : //! However, we reject task-local variables because:
74 : //! 1. they are implicit, thereby making it harder to trace the data flow in code
75 : //! reviews and during debugging,
76 : //! 2. they can be mutable, which enables implicit return data flow,
77 : //! 3. they are restrictive in that code which fans out into multiple tasks,
78 : //! or even threads, needs to carefully propagate the state.
79 : //!
80 : //! In contrast, information flow with [`RequestContext`] is
81 : //! 1. always explicit,
82 : //! 2. strictly uni-directional because RequestContext is immutable,
83 : //! 3. tangible because a [`RequestContext`] is just a value.
84 : //! When creating child activities, regardless of whether it's a task,
85 : //! thread, or even an RPC to another service, the value can
86 : //! be used like any other argument.
87 : //!
88 : //! The solution is that all code paths are infected with precisely one
89 : //! [`RequestContext`] argument. Functions in the middle of the call chain
90 : //! only need to pass it on.
91 :
92 : use std::{sync::Arc, time::Duration};
93 :
94 : use once_cell::sync::Lazy;
95 : use tracing::warn;
96 : use utils::{id::TimelineId, shard::TenantShardId};
97 :
98 : use crate::{
99 : metrics::{StorageIoSizeMetrics, TimelineMetrics},
100 : task_mgr::TaskKind,
101 : tenant::Timeline,
102 : };
103 : use futures::FutureExt;
104 : use futures::future::BoxFuture;
105 : use std::future::Future;
106 : use tracing_utils::perf_span::{PerfInstrument, PerfSpan};
107 :
108 : use tracing::{Dispatch, Span};
109 :
110 : // The main structure of this module, see module-level comment.
111 : pub struct RequestContext {
112 : task_kind: TaskKind,
113 : download_behavior: DownloadBehavior,
114 : access_stats_behavior: AccessStatsBehavior,
115 : page_content_kind: PageContentKind,
116 : read_path_debug: bool,
117 : scope: Scope,
118 : perf_span: Option<PerfSpan>,
119 : perf_span_dispatch: Option<Dispatch>,
120 : }
121 :
122 : #[derive(Clone)]
123 : pub(crate) enum Scope {
124 : Global {
125 : io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
126 : },
127 : SecondaryTenant {
128 : io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
129 : },
130 : SecondaryTimeline {
131 : io_size_metrics: crate::metrics::StorageIoSizeMetrics,
132 : },
133 : Timeline {
134 : // We wrap the `Arc<TimelineMetrics>`s inside another Arc to avoid child
135 : // context creation contending for the ref counters of the Arc<TimelineMetrics>,
136 : // which are shared among all tasks that operate on the timeline, especially
137 : // concurrent page_service connections.
138 : #[allow(clippy::redundant_allocation)]
139 : arc_arc: Arc<Arc<TimelineMetrics>>,
140 : },
141 : #[cfg(test)]
142 : UnitTest {
143 : io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
144 : },
145 : DebugTools {
146 : io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics,
147 : },
148 : }
149 :
150 : static GLOBAL_IO_SIZE_METRICS: Lazy<crate::metrics::StorageIoSizeMetrics> =
151 556 : Lazy::new(|| crate::metrics::StorageIoSizeMetrics::new("*", "*", "*"));
152 :
153 : impl Scope {
154 1560 : pub(crate) fn new_global() -> Self {
155 1560 : Scope::Global {
156 1560 : io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
157 1560 : }
158 1560 : }
159 : /// NB: this allocates, so, use only at relatively long-lived roots, e.g., at start
160 : /// of a compaction iteration.
161 1936 : pub(crate) fn new_timeline(timeline: &Timeline) -> Self {
162 1936 : Scope::Timeline {
163 1936 : arc_arc: Arc::new(Arc::clone(&timeline.metrics)),
164 1936 : }
165 1936 : }
166 0 : pub(crate) fn new_page_service_pagestream(
167 0 : timeline_handle: &crate::tenant::timeline::handle::Handle<
168 0 : crate::page_service::TenantManagerTypes,
169 0 : >,
170 0 : ) -> Self {
171 0 : Scope::Timeline {
172 0 : arc_arc: Arc::clone(&timeline_handle.metrics),
173 0 : }
174 0 : }
175 0 : pub(crate) fn new_secondary_timeline(
176 0 : tenant_shard_id: &TenantShardId,
177 0 : timeline_id: &TimelineId,
178 0 : ) -> Self {
179 0 : // TODO(https://github.com/neondatabase/neon/issues/11156): secondary timelines have no infrastructure for metrics lifecycle.
180 0 :
181 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
182 0 : let shard_id = tenant_shard_id.shard_slug().to_string();
183 0 : let timeline_id = timeline_id.to_string();
184 0 :
185 0 : let io_size_metrics =
186 0 : crate::metrics::StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id);
187 0 : Scope::SecondaryTimeline { io_size_metrics }
188 0 : }
189 0 : pub(crate) fn new_secondary_tenant(_tenant_shard_id: &TenantShardId) -> Self {
190 0 : // Before propagating metrics via RequestContext, the labels were inferred from file path.
191 0 : // The only user of VirtualFile at tenant scope is the heatmap download & read.
192 0 : // The inferred labels for the path of the heatmap file on local disk were that of the global metric (*,*,*).
193 0 : // Thus, we do the same here, and extend that for anything secondary-tenant scoped.
194 0 : //
195 0 : // If we want to have (tenant_id, shard_id, '*') labels for secondary tenants in the future,
196 0 : // we will need to think about the metric lifecycle, i.e., remove them during secondary tenant shutdown,
197 0 : // like we do for attached timelines. (We don't have attached-tenant-scoped usage of VirtualFile
198 0 : // at this point, so, we were able to completely side-step tenant-scoped stuff there).
199 0 : Scope::SecondaryTenant {
200 0 : io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
201 0 : }
202 0 : }
203 : #[cfg(test)]
204 588 : pub(crate) fn new_unit_test() -> Self {
205 588 : Scope::UnitTest {
206 588 : io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
207 588 : }
208 588 : }
209 :
210 0 : pub(crate) fn new_debug_tools() -> Self {
211 0 : Scope::DebugTools {
212 0 : io_size_metrics: &GLOBAL_IO_SIZE_METRICS,
213 0 : }
214 0 : }
215 : }
216 :
217 : /// The kind of access to the page cache.
218 : #[derive(Clone, Copy, PartialEq, Eq, Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
219 : pub enum PageContentKind {
220 : Unknown,
221 : DeltaLayerSummary,
222 : DeltaLayerBtreeNode,
223 : DeltaLayerValue,
224 : ImageLayerSummary,
225 : ImageLayerBtreeNode,
226 : ImageLayerValue,
227 : InMemoryLayer,
228 : }
229 :
230 : /// Desired behavior if the operation requires an on-demand download
231 : /// to proceed.
232 : #[derive(Clone, Copy, PartialEq, Eq, Debug)]
233 : pub enum DownloadBehavior {
234 : /// Download the layer file. It can take a while.
235 : Download,
236 :
237 : /// Download the layer file, but print a warning to the log. This should be used
238 : /// in code where the layer file is expected to already exist locally.
239 : Warn,
240 :
241 : /// Return a PageReconstructError::NeedsDownload error
242 : Error,
243 : }
244 :
245 : /// Whether this request should update access times used in LRU eviction
246 : #[derive(Clone, Copy, PartialEq, Eq, Debug)]
247 : pub(crate) enum AccessStatsBehavior {
248 : /// Update access times: this request's access to data should be taken
249 : /// as a hint that the accessed layer is likely to be accessed again
250 : Update,
251 :
252 : /// Do not update access times: this request is accessing the layer
253 : /// but does not want to indicate that the layer should be retained in cache,
254 : /// perhaps because the requestor is a compaction routine that will soon cover
255 : /// this layer with another.
256 : Skip,
257 : }
258 :
259 : pub struct RequestContextBuilder {
260 : inner: RequestContext,
261 : }
262 :
263 : impl RequestContextBuilder {
264 : /// A new builder with default settings
265 1560 : pub fn new(task_kind: TaskKind) -> Self {
266 1560 : Self {
267 1560 : inner: RequestContext {
268 1560 : task_kind,
269 1560 : download_behavior: DownloadBehavior::Download,
270 1560 : access_stats_behavior: AccessStatsBehavior::Update,
271 1560 : page_content_kind: PageContentKind::Unknown,
272 1560 : read_path_debug: false,
273 1560 : scope: Scope::new_global(),
274 1560 : perf_span: None,
275 1560 : perf_span_dispatch: None,
276 1560 : },
277 1560 : }
278 1560 : }
279 :
280 11967552 : pub fn from(original: &RequestContext) -> Self {
281 11967552 : Self {
282 11967552 : inner: original.clone(),
283 11967552 : }
284 11967552 : }
285 :
286 1044 : pub fn task_kind(mut self, k: TaskKind) -> Self {
287 1044 : self.inner.task_kind = k;
288 1044 : self
289 1044 : }
290 :
291 : /// Configure the DownloadBehavior of the context: whether to
292 : /// download missing layers, and/or warn on the download.
293 2044 : pub fn download_behavior(mut self, b: DownloadBehavior) -> Self {
294 2044 : self.inner.download_behavior = b;
295 2044 : self
296 2044 : }
297 :
298 : /// Configure the AccessStatsBehavior of the context: whether layer
299 : /// accesses should update the access time of the layer.
300 728 : pub(crate) fn access_stats_behavior(mut self, b: AccessStatsBehavior) -> Self {
301 728 : self.inner.access_stats_behavior = b;
302 728 : self
303 728 : }
304 :
305 1697625 : pub(crate) fn page_content_kind(mut self, k: PageContentKind) -> Self {
306 1697625 : self.inner.page_content_kind = k;
307 1697625 : self
308 1697625 : }
309 :
310 0 : pub(crate) fn read_path_debug(mut self, b: bool) -> Self {
311 0 : self.inner.read_path_debug = b;
312 0 : self
313 0 : }
314 :
315 2524 : pub(crate) fn scope(mut self, s: Scope) -> Self {
316 2524 : self.inner.scope = s;
317 2524 : self
318 2524 : }
319 :
320 0 : pub(crate) fn perf_span_dispatch(mut self, dispatch: Option<Dispatch>) -> Self {
321 0 : self.inner.perf_span_dispatch = dispatch;
322 0 : self
323 0 : }
324 :
325 0 : pub fn root_perf_span<Fn>(mut self, make_span: Fn) -> Self
326 0 : where
327 0 : Fn: FnOnce() -> Span,
328 0 : {
329 0 : assert!(self.inner.perf_span.is_none());
330 0 : assert!(self.inner.perf_span_dispatch.is_some());
331 :
332 0 : let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
333 0 : let new_span = tracing::dispatcher::with_default(dispatcher, make_span);
334 0 :
335 0 : self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
336 0 :
337 0 : self
338 0 : }
339 :
340 8661037 : pub fn perf_span<Fn>(mut self, make_span: Fn) -> Self
341 8661037 : where
342 8661037 : Fn: FnOnce(&Span) -> Span,
343 8661037 : {
344 8661037 : if let Some(ref perf_span) = self.inner.perf_span {
345 0 : assert!(self.inner.perf_span_dispatch.is_some());
346 0 : let dispatcher = self.inner.perf_span_dispatch.as_ref().unwrap();
347 0 :
348 0 : let new_span =
349 0 : tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
350 0 :
351 0 : self.inner.perf_span = Some(PerfSpan::new(new_span, dispatcher.clone()));
352 8661037 : }
353 :
354 8661037 : self
355 8661037 : }
356 :
357 1560 : pub fn root(self) -> RequestContext {
358 1560 : self.inner
359 1560 : }
360 :
361 11966172 : pub fn attached_child(self) -> RequestContext {
362 11966172 : self.inner
363 11966172 : }
364 :
365 1380 : pub fn detached_child(self) -> RequestContext {
366 1380 : self.inner
367 1380 : }
368 : }
369 :
370 : impl RequestContext {
371 : /// Private clone implementation
372 : ///
373 : /// Callers should use the [`RequestContextBuilder`] or child spaning APIs of
374 : /// [`RequestContext`].
375 11967552 : fn clone(&self) -> Self {
376 11967552 : Self {
377 11967552 : task_kind: self.task_kind,
378 11967552 : download_behavior: self.download_behavior,
379 11967552 : access_stats_behavior: self.access_stats_behavior,
380 11967552 : page_content_kind: self.page_content_kind,
381 11967552 : read_path_debug: self.read_path_debug,
382 11967552 : scope: self.scope.clone(),
383 11967552 : perf_span: self.perf_span.clone(),
384 11967552 : perf_span_dispatch: self.perf_span_dispatch.clone(),
385 11967552 : }
386 11967552 : }
387 :
388 : /// Create a new RequestContext that has no parent.
389 : ///
390 : /// The function is called `new` because, once we add children
391 : /// to it using `detached_child` or `attached_child`, the context
392 : /// form a tree (not implemented yet since cancellation will be
393 : /// the first feature that requires a tree).
394 : ///
395 : /// # Future: Cancellation
396 : ///
397 : /// The only reason why a context like this one can be canceled is
398 : /// because someone explicitly canceled it.
399 : /// It has no parent, so it cannot inherit cancellation from there.
400 1560 : pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
401 1560 : RequestContextBuilder::new(task_kind)
402 1560 : .download_behavior(download_behavior)
403 1560 : .root()
404 1560 : }
405 :
406 : /// Create a detached child context for a task that may outlive `self`.
407 : ///
408 : /// Use this when spawning new background activity that should complete
409 : /// even if the current request is canceled.
410 : ///
411 : /// # Future: Cancellation
412 : ///
413 : /// Cancellation of `self` will not propagate to the child context returned
414 : /// by this method.
415 : ///
416 : /// # Future: Structured Concurrency
417 : ///
418 : /// We could add the Future as a parameter to this function, spawn it as a task,
419 : /// and pass to the new task the child context as an argument.
420 : /// That would be an ergonomic improvement.
421 : ///
422 : /// We could make new calls to this function fail if `self` is already canceled.
423 456 : pub fn detached_child(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
424 456 : RequestContextBuilder::from(self)
425 456 : .task_kind(task_kind)
426 456 : .download_behavior(download_behavior)
427 456 : .detached_child()
428 456 : }
429 :
430 : /// Create a child of context `self` for a task that shall not outlive `self`.
431 : ///
432 : /// Use this when fanning-out work to other async tasks.
433 : ///
434 : /// # Future: Cancellation
435 : ///
436 : /// Cancelling a context will propagate to its attached children.
437 : ///
438 : /// # Future: Structured Concurrency
439 : ///
440 : /// We could add the Future as a parameter to this function, spawn it as a task,
441 : /// and track its `JoinHandle` inside the `RequestContext`.
442 : ///
443 : /// We could then provide another method to allow waiting for all child tasks
444 : /// to finish.
445 : ///
446 : /// We could make new calls to this function fail if `self` is already canceled.
447 : /// Alternatively, we could allow the creation but not spawn the task.
448 : /// The method to wait for child tasks would return an error, indicating
449 : /// that the child task was not started because the context was canceled.
450 1605154 : pub fn attached_child(&self) -> Self {
451 1605154 : RequestContextBuilder::from(self).attached_child()
452 1605154 : }
453 :
454 : /// Use this function when you should be creating a child context using
455 : /// [`attached_child`] or [`detached_child`], but your caller doesn't provide
456 : /// a context and you are unwilling to change all callers to provide one.
457 : ///
458 : /// Before we add cancellation, we should get rid of this method.
459 : ///
460 : /// [`attached_child`]: Self::attached_child
461 : /// [`detached_child`]: Self::detached_child
462 912 : pub fn todo_child(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
463 912 : Self::new(task_kind, download_behavior)
464 912 : }
465 :
466 1012 : pub fn with_scope_timeline(&self, timeline: &Arc<Timeline>) -> Self {
467 1012 : RequestContextBuilder::from(self)
468 1012 : .scope(Scope::new_timeline(timeline))
469 1012 : .attached_child()
470 1012 : }
471 :
472 0 : pub(crate) fn with_scope_page_service_pagestream(
473 0 : &self,
474 0 : timeline_handle: &crate::tenant::timeline::handle::Handle<
475 0 : crate::page_service::TenantManagerTypes,
476 0 : >,
477 0 : ) -> Self {
478 0 : RequestContextBuilder::from(self)
479 0 : .scope(Scope::new_page_service_pagestream(timeline_handle))
480 0 : .attached_child()
481 0 : }
482 :
483 0 : pub fn with_scope_secondary_timeline(
484 0 : &self,
485 0 : tenant_shard_id: &TenantShardId,
486 0 : timeline_id: &TimelineId,
487 0 : ) -> Self {
488 0 : RequestContextBuilder::from(self)
489 0 : .scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id))
490 0 : .attached_child()
491 0 : }
492 :
493 0 : pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self {
494 0 : RequestContextBuilder::from(self)
495 0 : .scope(Scope::new_secondary_tenant(tenant_shard_id))
496 0 : .attached_child()
497 0 : }
498 :
499 : #[cfg(test)]
500 588 : pub fn with_scope_unit_test(&self) -> Self {
501 588 : RequestContextBuilder::from(self)
502 588 : .task_kind(TaskKind::UnitTest)
503 588 : .scope(Scope::new_unit_test())
504 588 : .attached_child()
505 588 : }
506 :
507 0 : pub fn with_scope_debug_tools(&self) -> Self {
508 0 : RequestContextBuilder::from(self)
509 0 : .task_kind(TaskKind::DebugTool)
510 0 : .scope(Scope::new_debug_tools())
511 0 : .attached_child()
512 0 : }
513 :
514 3688763 : pub fn task_kind(&self) -> TaskKind {
515 3688763 : self.task_kind
516 3688763 : }
517 :
518 60 : pub fn download_behavior(&self) -> DownloadBehavior {
519 60 : self.download_behavior
520 60 : }
521 :
522 480385 : pub(crate) fn access_stats_behavior(&self) -> AccessStatsBehavior {
523 480385 : self.access_stats_behavior
524 480385 : }
525 :
526 1941870 : pub(crate) fn page_content_kind(&self) -> PageContentKind {
527 1941870 : self.page_content_kind
528 1941870 : }
529 :
530 0 : pub(crate) fn read_path_debug(&self) -> bool {
531 0 : self.read_path_debug
532 0 : }
533 :
534 1273840 : pub(crate) fn io_size_metrics(&self) -> &StorageIoSizeMetrics {
535 1273840 : match &self.scope {
536 0 : Scope::Global { io_size_metrics } => {
537 0 : let is_unit_test = cfg!(test);
538 0 : let is_regress_test_build = cfg!(feature = "testing");
539 0 : if is_unit_test || is_regress_test_build {
540 0 : panic!("all VirtualFile instances are timeline-scoped");
541 : } else {
542 1273840 : use once_cell::sync::Lazy;
543 1273840 : use std::sync::Mutex;
544 1273840 : use std::time::Duration;
545 1273840 : use utils::rate_limit::RateLimit;
546 1273840 : static LIMIT: Lazy<Mutex<RateLimit>> =
547 0 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
548 0 : let mut guard = LIMIT.lock().unwrap();
549 0 : guard.call2(|rate_limit_stats| {
550 0 : warn!(
551 : %rate_limit_stats,
552 0 : backtrace=%std::backtrace::Backtrace::force_capture(),
553 0 : "all VirtualFile instances are timeline-scoped",
554 : );
555 0 : });
556 0 :
557 0 : io_size_metrics
558 : }
559 : }
560 42995 : Scope::Timeline { arc_arc } => &arc_arc.storage_io_size,
561 0 : Scope::SecondaryTimeline { io_size_metrics } => io_size_metrics,
562 0 : Scope::SecondaryTenant { io_size_metrics } => io_size_metrics,
563 : #[cfg(test)]
564 1230845 : Scope::UnitTest { io_size_metrics } => io_size_metrics,
565 0 : Scope::DebugTools { io_size_metrics } => io_size_metrics,
566 : }
567 1273840 : }
568 :
569 481577 : pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
570 481577 : if duration == Duration::ZERO {
571 481529 : return;
572 48 : }
573 48 :
574 48 : match &self.scope {
575 48 : Scope::Timeline { arc_arc } => arc_arc
576 48 : .wait_ondemand_download_time
577 48 : .observe(self.task_kind, duration),
578 0 : _ => {
579 0 : use once_cell::sync::Lazy;
580 0 : use std::sync::Mutex;
581 0 : use std::time::Duration;
582 0 : use utils::rate_limit::RateLimit;
583 0 : static LIMIT: Lazy<Mutex<RateLimit>> =
584 0 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
585 0 : let mut guard = LIMIT.lock().unwrap();
586 0 : guard.call2(|rate_limit_stats| {
587 0 : warn!(
588 : %rate_limit_stats,
589 0 : backtrace=%std::backtrace::Backtrace::force_capture(),
590 0 : "ondemand downloads should always happen within timeline scope",
591 : );
592 0 : });
593 0 : }
594 : }
595 481577 : }
596 :
597 36768 : pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
598 36768 : if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
599 0 : span.inner().follows_from(from_span.inner());
600 36768 : }
601 36768 : }
602 :
603 36796 : pub(crate) fn has_perf_span(&self) -> bool {
604 36796 : self.perf_span.is_some()
605 36796 : }
606 : }
607 :
608 : /// [`Future`] extension trait that allow for creating performance
609 : /// spans on sampled requests
610 : pub(crate) trait PerfInstrumentFutureExt<'a>: Future + Send {
611 : /// Instrument this future with a new performance span when the
612 : /// provided request context indicates the originator request
613 : /// was sampled. Otherwise, just box the future and return it as is.
614 10070807 : fn maybe_perf_instrument<Fn>(
615 10070807 : self,
616 10070807 : ctx: &RequestContext,
617 10070807 : make_span: Fn,
618 10070807 : ) -> BoxFuture<'a, Self::Output>
619 10070807 : where
620 10070807 : Self: Sized + 'a,
621 10070807 : Fn: FnOnce(&Span) -> Span,
622 10070807 : {
623 10070807 : match &ctx.perf_span {
624 0 : Some(perf_span) => {
625 0 : assert!(ctx.perf_span_dispatch.is_some());
626 0 : let dispatcher = ctx.perf_span_dispatch.as_ref().unwrap();
627 0 :
628 0 : let new_span =
629 0 : tracing::dispatcher::with_default(dispatcher, || make_span(perf_span.inner()));
630 0 :
631 0 : let new_perf_span = PerfSpan::new(new_span, dispatcher.clone());
632 0 : self.instrument(new_perf_span).boxed()
633 : }
634 10070807 : None => self.boxed(),
635 : }
636 10070807 : }
637 : }
638 :
639 : // Implement the trait for all types that satisfy the trait bounds
640 : impl<'a, T: Future + Send + 'a> PerfInstrumentFutureExt<'a> for T {}
|