Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod batch_split_writer;
4 : pub mod delta_layer;
5 : pub mod filter_iterator;
6 : pub mod image_layer;
7 : pub mod inmemory_layer;
8 : pub(crate) mod layer;
9 : mod layer_desc;
10 : mod layer_name;
11 : pub mod merge_iterator;
12 :
13 : use crate::config::PageServerConf;
14 : use crate::context::{AccessStatsBehavior, RequestContext};
15 : use bytes::Bytes;
16 : use futures::stream::FuturesUnordered;
17 : use futures::StreamExt;
18 : use pageserver_api::key::Key;
19 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
20 : use pageserver_api::record::NeonWalRecord;
21 : use pageserver_api::value::Value;
22 : use std::cmp::Ordering;
23 : use std::collections::hash_map::Entry;
24 : use std::collections::{BinaryHeap, HashMap};
25 : use std::future::Future;
26 : use std::ops::Range;
27 : use std::pin::Pin;
28 : use std::sync::atomic::AtomicUsize;
29 : use std::sync::Arc;
30 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
31 : use tracing::{trace, Instrument};
32 : use utils::sync::gate::GateGuard;
33 :
34 : use utils::lsn::Lsn;
35 :
36 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
37 : pub use image_layer::{ImageLayer, ImageLayerWriter};
38 : pub use inmemory_layer::InMemoryLayer;
39 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
40 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
41 :
42 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
43 :
44 : use self::inmemory_layer::InMemoryLayerFileId;
45 :
46 : use super::timeline::GetVectoredError;
47 : use super::PageReconstructError;
48 :
49 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
50 0 : where
51 0 : T: PartialOrd<T>,
52 0 : {
53 0 : if a.start < b.start {
54 0 : a.end > b.start
55 : } else {
56 0 : b.end > a.start
57 : }
58 0 : }
59 :
60 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
61 : ///
62 : /// Before first call, you can fill in 'page_img' if you have an older cached
63 : /// version of the page available. That can save work in
64 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
65 : /// when all the WAL records going back to the cached image have been collected.
66 : ///
67 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
68 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
69 : /// record that initializes the page without requiring a previous image.
70 : ///
71 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
72 : /// been collected, but there are more records outside the current layer. Pass
73 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
74 : /// call, to collect more records.
75 : ///
76 : #[derive(Debug, Default)]
77 : pub(crate) struct ValueReconstructState {
78 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
79 : pub(crate) img: Option<(Lsn, Bytes)>,
80 : }
81 :
82 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
83 : pub(crate) enum ValueReconstructSituation {
84 : Complete,
85 : #[default]
86 : Continue,
87 : }
88 :
89 : /// On disk representation of a value loaded in a buffer
90 : #[derive(Debug)]
91 : pub(crate) enum OnDiskValue {
92 : /// Unencoded [`Value::Image`]
93 : RawImage(Bytes),
94 : /// Encoded [`Value`]. Can deserialize into an image or a WAL record
95 : WalRecordOrImage(Bytes),
96 : }
97 :
98 : /// Reconstruct data accumulated for a single key during a vectored get
99 : #[derive(Debug, Default)]
100 : pub(crate) struct VectoredValueReconstructState {
101 : pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
102 :
103 : pub(crate) situation: ValueReconstructSituation,
104 : }
105 :
106 : #[derive(Debug)]
107 : pub(crate) struct OnDiskValueIoWaiter {
108 : rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
109 : }
110 :
111 : #[derive(Debug)]
112 : #[must_use]
113 : pub(crate) enum OnDiskValueIo {
114 : /// Traversal identified this IO as required to complete the vectored get.
115 : Required {
116 : num_active_ios: Arc<AtomicUsize>,
117 : tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
118 : },
119 : /// Sparse keyspace reads always read all the values for a given key,
120 : /// even though only the first value is needed.
121 : ///
122 : /// This variant represents the unnecessary IOs for those values at lower LSNs
123 : /// that aren't needed, but are currently still being done.
124 : ///
125 : /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
126 : /// We added this explicit representation here so that we can drop
127 : /// unnecessary IO results immediately, instead of buffering them in
128 : /// `oneshot` channels inside [`VectoredValueReconstructState`] until
129 : /// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
130 : Unnecessary,
131 : }
132 :
133 : type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
134 :
135 : impl OnDiskValueIo {
136 1482101 : pub(crate) fn complete(self, res: OnDiskValueIoResult) {
137 1482101 : match self {
138 1337359 : OnDiskValueIo::Required { num_active_ios, tx } => {
139 1337359 : num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
140 1337359 : let _ = tx.send(res);
141 1337359 : }
142 144742 : OnDiskValueIo::Unnecessary => {
143 144742 : // Nobody cared, see variant doc comment.
144 144742 : }
145 : }
146 1482101 : }
147 : }
148 :
149 : #[derive(Debug, thiserror::Error)]
150 : pub(crate) enum WaitCompletionError {
151 : #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
152 : IoDropped,
153 : }
154 :
155 : impl OnDiskValueIoWaiter {
156 1337357 : pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
157 1337357 : // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
158 1337357 : self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
159 1337357 : }
160 : }
161 :
162 : impl VectoredValueReconstructState {
163 : /// # Cancel-Safety
164 : ///
165 : /// Technically fine to stop polling this future, but, the IOs will still
166 : /// be executed to completion by the sidecar task and hold on to / consume resources.
167 : /// Better not do it to make reasonsing about the system easier.
168 1336045 : pub(crate) async fn collect_pending_ios(
169 1336045 : self,
170 1336045 : ) -> Result<ValueReconstructState, PageReconstructError> {
171 : use utils::bin_ser::BeSer;
172 :
173 1336045 : let mut res = Ok(ValueReconstructState::default());
174 :
175 : // We should try hard not to bail early, so that by the time we return from this
176 : // function, all IO for this value is done. It's not required -- we could totally
177 : // stop polling the IO futures in the sidecar task, they need to support that,
178 : // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
179 : // to reason about the system if we just wait for all IO to complete, even if
180 : // we're no longer interested in the result.
181 : //
182 : // Revisit this when IO futures are replaced with a more sophisticated IO system
183 : // and an IO scheduler, where we know which IOs were submitted and which ones
184 : // just queued. Cf the comment on IoConcurrency::spawn_io.
185 2673402 : for (lsn, waiter) in self.on_disk_values {
186 1337357 : let value_recv_res = waiter
187 1337357 : .wait_completion()
188 1337357 : // we rely on the caller to poll us to completion, so this is not a bail point
189 1337357 : .await;
190 : // Force not bailing early by wrapping the code into a closure.
191 : #[allow(clippy::redundant_closure_call)]
192 1337357 : let _: () = (|| {
193 1337357 : match (&mut res, value_recv_res) {
194 0 : (Err(_), _) => {
195 0 : // We've already failed, no need to process more.
196 0 : }
197 0 : (Ok(_), Err(wait_err)) => {
198 0 : // This shouldn't happen - likely the sidecar task panicked.
199 0 : res = Err(PageReconstructError::Other(wait_err.into()));
200 0 : }
201 0 : (Ok(_), Ok(Err(err))) => {
202 0 : let err: std::io::Error = err;
203 0 : // TODO: returning IO error here will fail a compute query.
204 0 : // Probably not what we want, we're not doing `maybe_fatal_err`
205 0 : // in the IO futures.
206 0 : // But it's been like that for a long time, not changing it
207 0 : // as part of concurrent IO.
208 0 : // => https://github.com/neondatabase/neon/issues/10454
209 0 : res = Err(PageReconstructError::Other(err.into()));
210 0 : }
211 38411 : (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
212 38411 : assert!(ok.img.is_none());
213 38411 : ok.img = Some((lsn, img));
214 : }
215 1298946 : (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
216 1298946 : match Value::des(&buf) {
217 1400 : Ok(Value::WalRecord(rec)) => {
218 1400 : ok.records.push((lsn, rec));
219 1400 : }
220 1297546 : Ok(Value::Image(img)) => {
221 1297546 : assert!(ok.img.is_none());
222 1297546 : ok.img = Some((lsn, img));
223 : }
224 0 : Err(err) => {
225 0 : res = Err(PageReconstructError::Other(err.into()));
226 0 : }
227 : }
228 : }
229 : }
230 1337357 : })();
231 1337357 : }
232 :
233 1336045 : res
234 1336045 : }
235 : }
236 :
237 : /// Bag of data accumulated during a vectored get..
238 : pub(crate) struct ValuesReconstructState {
239 : /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
240 : /// should not expect to get anything from this hashmap.
241 : pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
242 : /// The keys which are already retrieved
243 : keys_done: KeySpaceRandomAccum,
244 :
245 : /// The keys covered by the image layers
246 : keys_with_image_coverage: Option<Range<Key>>,
247 :
248 : // Statistics that are still accessible as a caller of `get_vectored_impl`.
249 : layers_visited: u32,
250 : delta_layers_visited: u32,
251 :
252 : pub(crate) io_concurrency: IoConcurrency,
253 : num_active_ios: Arc<AtomicUsize>,
254 : }
255 :
256 : /// The level of IO concurrency to be used on the read path
257 : ///
258 : /// The desired end state is that we always do parallel IO.
259 : /// This struct and the dispatching in the impl will be removed once
260 : /// we've built enough confidence.
261 : pub(crate) enum IoConcurrency {
262 : Sequential,
263 : SidecarTask {
264 : task_id: usize,
265 : ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
266 : },
267 : }
268 :
269 : type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
270 :
271 : pub(crate) enum SelectedIoConcurrency {
272 : Sequential,
273 : SidecarTask(GateGuard),
274 : }
275 :
276 : impl std::fmt::Debug for IoConcurrency {
277 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 0 : match self {
279 0 : IoConcurrency::Sequential => write!(f, "Sequential"),
280 0 : IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
281 : }
282 0 : }
283 : }
284 :
285 : impl std::fmt::Debug for SelectedIoConcurrency {
286 64 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 64 : match self {
288 32 : SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
289 32 : SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
290 : }
291 64 : }
292 : }
293 :
294 : impl IoConcurrency {
295 : /// Force sequential IO. This is a temporary workaround until we have
296 : /// moved plumbing-through-the-call-stack
297 : /// of IoConcurrency into `RequestContextq.
298 : ///
299 : /// DO NOT USE for new code.
300 : ///
301 : /// Tracking issue: <https://github.com/neondatabase/neon/issues/10460>.
302 1215185 : pub(crate) fn sequential() -> Self {
303 1215185 : Self::spawn(SelectedIoConcurrency::Sequential)
304 1215185 : }
305 :
306 1012 : pub(crate) fn spawn_from_conf(
307 1012 : conf: &'static PageServerConf,
308 1012 : gate_guard: GateGuard,
309 1012 : ) -> IoConcurrency {
310 : use pageserver_api::config::GetVectoredConcurrentIo;
311 1012 : let selected = match conf.get_vectored_concurrent_io {
312 1012 : GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
313 0 : GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
314 : };
315 1012 : Self::spawn(selected)
316 1012 : }
317 :
318 1216261 : pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
319 1216261 : match io_concurrency {
320 1216229 : SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
321 32 : SelectedIoConcurrency::SidecarTask(gate_guard) => {
322 32 : let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
323 : static TASK_ID: AtomicUsize = AtomicUsize::new(0);
324 32 : let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
325 : // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
326 32 : let span =
327 32 : tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
328 32 : trace!(task_id, "spawning sidecar task");
329 32 : tokio::spawn(async move {
330 32 : trace!("start");
331 32 : scopeguard::defer!{ trace!("end") };
332 : type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
333 : enum State {
334 : Waiting {
335 : // invariant: is_empty(), but we recycle the allocation
336 : empty_futures: FuturesUnordered<IoFuture>,
337 : ios_rx: IosRx,
338 : },
339 : Executing {
340 : futures: FuturesUnordered<IoFuture>,
341 : ios_rx: IosRx,
342 : },
343 : ShuttingDown {
344 : futures: FuturesUnordered<IoFuture>,
345 : },
346 : }
347 32 : let mut state = State::Waiting {
348 32 : empty_futures: FuturesUnordered::new(),
349 32 : ios_rx,
350 32 : };
351 : loop {
352 39122 : match state {
353 : State::Waiting {
354 18551 : empty_futures,
355 18551 : mut ios_rx,
356 18551 : } => {
357 18551 : assert!(empty_futures.is_empty());
358 18551 : tokio::select! {
359 18551 : fut = ios_rx.recv() => {
360 18519 : if let Some(fut) = fut {
361 18519 : trace!("received new io future");
362 18519 : empty_futures.push(fut);
363 18519 : state = State::Executing { futures: empty_futures, ios_rx };
364 : } else {
365 0 : state = State::ShuttingDown { futures: empty_futures }
366 : }
367 : }
368 : }
369 : }
370 : State::Executing {
371 20571 : mut futures,
372 20571 : mut ios_rx,
373 20571 : } => {
374 20571 : tokio::select! {
375 20571 : res = futures.next() => {
376 19545 : trace!("io future completed");
377 19545 : assert!(res.is_some());
378 19545 : if futures.is_empty() {
379 18519 : state = State::Waiting { empty_futures: futures, ios_rx};
380 18519 : } else {
381 1026 : state = State::Executing { futures, ios_rx };
382 1026 : }
383 : }
384 20571 : fut = ios_rx.recv() => {
385 1026 : if let Some(fut) = fut {
386 1026 : trace!("received new io future");
387 1026 : futures.push(fut);
388 1026 : state = State::Executing { futures, ios_rx};
389 0 : } else {
390 0 : state = State::ShuttingDown { futures };
391 0 : }
392 : }
393 : }
394 : }
395 : State::ShuttingDown {
396 0 : mut futures,
397 0 : } => {
398 0 : trace!("shutting down");
399 0 : while let Some(()) = futures.next().await {
400 0 : trace!("io future completed (shutdown)");
401 : // drain
402 : }
403 0 : trace!("shutdown complete");
404 0 : break;
405 0 : }
406 0 : }
407 0 : }
408 0 : drop(gate_guard); // drop it right before we exit
409 32 : }.instrument(span));
410 32 : IoConcurrency::SidecarTask { task_id, ios_tx }
411 : }
412 : }
413 1216261 : }
414 :
415 76338 : pub(crate) fn clone(&self) -> Self {
416 76338 : match self {
417 39460 : IoConcurrency::Sequential => IoConcurrency::Sequential,
418 36878 : IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
419 36878 : task_id: *task_id,
420 36878 : ios_tx: ios_tx.clone(),
421 36878 : },
422 : }
423 76338 : }
424 :
425 : /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
426 : ///
427 : /// The IO is represented as an opaque future.
428 : /// IO completion must be handled inside the future, e.g., through a oneshot channel.
429 : ///
430 : /// The API seems simple but there are multiple **pitfalls** involving
431 : /// DEADLOCK RISK.
432 : ///
433 : /// First, there are no guarantees about the exexecution of the IO.
434 : /// It may be `await`ed in-place before this function returns.
435 : /// It may be polled partially by this task and handed off to another task to be finished.
436 : /// It may be polled and then dropped before returning ready.
437 : ///
438 : /// This means that submitted IOs must not be interedependent.
439 : /// Interdependence may be through shared limited resources, e.g.,
440 : /// - VirtualFile file descriptor cache slot acquisition
441 : /// - tokio-epoll-uring slot
442 : ///
443 : /// # Why current usage is safe from deadlocks
444 : ///
445 : /// Textbook condition for a deadlock is that _all_ of the following be given
446 : /// - Mutual exclusion
447 : /// - Hold and wait
448 : /// - No preemption
449 : /// - Circular wait
450 : ///
451 : /// The current usage is safe because:
452 : /// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
453 : /// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
454 : /// for acquisition of other resources:
455 : /// - VirtualFile file descriptor cache slot tokio mutex
456 : /// - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
457 : /// - No preemption: there's no taking-away of acquired locks/resources => given
458 : /// - Circular wait: this is the part of the condition that isn't met: all IO futures
459 : /// first acquire VirtualFile mutex, then tokio-epoll-uring slot.
460 : /// There is no IO future that acquires slot before VirtualFile.
461 : /// Hence there can be no circular waiting.
462 : /// Hence there cannot be a deadlock.
463 : ///
464 : /// This is a very fragile situation and must be revisited whenver any code called from
465 : /// inside the IO futures is changed.
466 : ///
467 : /// We will move away from opaque IO futures towards well-defined IOs at some point in
468 : /// the future when we have shipped this first version of concurrent IO to production
469 : /// and are ready to retire the Sequential mode which runs the futures in place.
470 : /// Right now, while brittle, the opaque IO approach allows us to ship the feature
471 : /// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
472 : ///
473 : /// Also read the comment in `collect_pending_ios`.
474 1525270 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
475 1525270 : where
476 1525270 : F: std::future::Future<Output = ()> + Send + 'static,
477 1525270 : {
478 1525270 : match self {
479 1505725 : IoConcurrency::Sequential => fut.await,
480 19545 : IoConcurrency::SidecarTask { ios_tx, .. } => {
481 19545 : let fut = Box::pin(fut);
482 19545 : // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
483 19545 : // while insignificant for latency.
484 19545 : // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
485 19545 : // a submission here, but never poll the future. That way, io_uring can make proccess while
486 19545 : // the future sits in the ios_tx queue.
487 19545 : match ios_tx.send(fut) {
488 19545 : Ok(()) => {}
489 : Err(_) => {
490 0 : unreachable!("the io task must have exited, likely it panicked")
491 : }
492 : }
493 : }
494 : }
495 1525270 : }
496 :
497 : #[cfg(test)]
498 64 : pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
499 : use std::ops::{Deref, DerefMut};
500 : use tracing::info;
501 : use utils::sync::gate::Gate;
502 :
503 : // Spawn needs a Gate, give it one.
504 : struct Wrapper {
505 : inner: IoConcurrency,
506 : #[allow(dead_code)]
507 : gate: Box<Gate>,
508 : }
509 : impl Deref for Wrapper {
510 : type Target = IoConcurrency;
511 :
512 36974 : fn deref(&self) -> &Self::Target {
513 36974 : &self.inner
514 36974 : }
515 : }
516 : impl DerefMut for Wrapper {
517 0 : fn deref_mut(&mut self) -> &mut Self::Target {
518 0 : &mut self.inner
519 0 : }
520 : }
521 64 : let gate = Box::new(Gate::default());
522 :
523 : // The default behavior when running Rust unit tests without any further
524 : // flags is to use the new behavior.
525 : // The CI uses the following environment variable to unit test both old
526 : // and new behavior.
527 : // NB: the Python regression & perf tests take the `else` branch
528 : // below and have their own defaults management.
529 64 : let selected = {
530 : // The pageserver_api::config type is unsuitable because it's internally tagged.
531 64 : #[derive(serde::Deserialize)]
532 : #[serde(rename_all = "kebab-case")]
533 : enum TestOverride {
534 : Sequential,
535 : SidecarTask,
536 : }
537 : use once_cell::sync::Lazy;
538 64 : static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
539 64 : utils::env::var_serde_json_string(
540 64 : "NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
541 64 : )
542 64 : .unwrap_or(TestOverride::SidecarTask)
543 64 : });
544 :
545 64 : match *TEST_OVERRIDE {
546 32 : TestOverride::Sequential => SelectedIoConcurrency::Sequential,
547 : TestOverride::SidecarTask => {
548 32 : SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
549 : }
550 : }
551 : };
552 :
553 64 : info!(?selected, "get_vectored_concurrent_io test");
554 :
555 64 : Wrapper {
556 64 : inner: Self::spawn(selected),
557 64 : gate,
558 64 : }
559 64 : }
560 : }
561 :
562 : /// Make noise in case the [`ValuesReconstructState`] gets dropped while
563 : /// there are still IOs in flight.
564 : /// Refer to `collect_pending_ios` for why we prefer not to do that.
565 : //
566 : /// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
567 : /// gets dropped in a tracing span with more context.
568 : /// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
569 : /// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
570 : impl Drop for ValuesReconstructState {
571 1255295 : fn drop(&mut self) {
572 1255295 : let num_active_ios = self
573 1255295 : .num_active_ios
574 1255295 : .load(std::sync::atomic::Ordering::Acquire);
575 1255295 : if num_active_ios == 0 {
576 1255293 : return;
577 2 : }
578 2 : let sidecar_task_id = match &self.io_concurrency {
579 0 : IoConcurrency::Sequential => None,
580 2 : IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
581 : };
582 2 : tracing::warn!(
583 : num_active_ios,
584 : ?sidecar_task_id,
585 0 : backtrace=%std::backtrace::Backtrace::force_capture(),
586 0 : "dropping ValuesReconstructState while some IOs have not been completed",
587 : );
588 1255295 : }
589 : }
590 :
591 : impl ValuesReconstructState {
592 1255295 : pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
593 1255295 : Self {
594 1255295 : keys: HashMap::new(),
595 1255295 : keys_done: KeySpaceRandomAccum::new(),
596 1255295 : keys_with_image_coverage: None,
597 1255295 : layers_visited: 0,
598 1255295 : delta_layers_visited: 0,
599 1255295 : io_concurrency,
600 1255295 : num_active_ios: Arc::new(AtomicUsize::new(0)),
601 1255295 : }
602 1255295 : }
603 :
604 : /// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
605 1525270 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
606 1525270 : where
607 1525270 : F: std::future::Future<Output = ()> + Send + 'static,
608 1525270 : {
609 1525270 : self.io_concurrency.spawn_io(fut).await;
610 1525270 : }
611 :
612 1693398 : pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
613 1693398 : self.layers_visited += 1;
614 1693398 : if let ReadableLayer::PersistentLayer(layer) = layer {
615 480166 : if layer.layer_desc().is_delta() {
616 435074 : self.delta_layers_visited += 1;
617 435074 : }
618 1213232 : }
619 1693398 : }
620 :
621 456 : pub(crate) fn get_delta_layers_visited(&self) -> u32 {
622 456 : self.delta_layers_visited
623 456 : }
624 :
625 1255237 : pub(crate) fn get_layers_visited(&self) -> u32 {
626 1255237 : self.layers_visited
627 1255237 : }
628 :
629 : /// On hitting image layer, we can mark all keys in this range as done, because
630 : /// if the image layer does not contain a key, it is deleted/never added.
631 45116 : pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
632 45116 : let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
633 45116 : assert_eq!(
634 : prev_val, None,
635 0 : "should consume the keyspace before the next iteration"
636 : );
637 45116 : }
638 :
639 : /// Update the state collected for a given key.
640 : /// Returns true if this was the last value needed for the key and false otherwise.
641 : ///
642 : /// If the key is done after the update, mark it as such.
643 : ///
644 : /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
645 : /// `key_done`.
646 : // TODO: rename this method & update description.
647 1482101 : pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
648 1482101 : let state = self.keys.entry(*key).or_default();
649 1482101 :
650 1482101 : let is_sparse_key = key.is_sparse();
651 :
652 1482101 : let required_io = match state.situation {
653 : ValueReconstructSituation::Complete => {
654 144742 : if is_sparse_key {
655 : // Sparse keyspace might be visited multiple times because
656 : // we don't track unmapped keyspaces.
657 144742 : return OnDiskValueIo::Unnecessary;
658 : } else {
659 0 : unreachable!()
660 : }
661 : }
662 : ValueReconstructSituation::Continue => {
663 1337359 : self.num_active_ios
664 1337359 : .fetch_add(1, std::sync::atomic::Ordering::Release);
665 1337359 : let (tx, rx) = tokio::sync::oneshot::channel();
666 1337359 : state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
667 1337359 : OnDiskValueIo::Required {
668 1337359 : tx,
669 1337359 : num_active_ios: Arc::clone(&self.num_active_ios),
670 1337359 : }
671 1337359 : }
672 1337359 : };
673 1337359 :
674 1337359 : if completes && state.situation == ValueReconstructSituation::Continue {
675 1336047 : state.situation = ValueReconstructSituation::Complete;
676 1336047 : if !is_sparse_key {
677 1208531 : self.keys_done.add_key(*key);
678 1208531 : }
679 1312 : }
680 :
681 1337359 : required_io
682 1482101 : }
683 :
684 : /// Returns the key space describing the keys that have
685 : /// been marked as completed since the last call to this function.
686 : /// Returns individual keys done, and the image layer coverage.
687 3399834 : pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
688 3399834 : (
689 3399834 : self.keys_done.consume_keyspace(),
690 3399834 : self.keys_with_image_coverage.take(),
691 3399834 : )
692 3399834 : }
693 : }
694 :
695 : /// A key that uniquely identifies a layer in a timeline
696 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
697 : pub(crate) enum LayerId {
698 : PersitentLayerId(PersistentLayerKey),
699 : InMemoryLayerId(InMemoryLayerFileId),
700 : }
701 :
702 : /// Uniquely identify a layer visit by the layer
703 : /// and LSN floor (or start LSN) of the reads.
704 : /// The layer itself is not enough since we may
705 : /// have different LSN lower bounds for delta layer reads.
706 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
707 : struct LayerToVisitId {
708 : layer_id: LayerId,
709 : lsn_floor: Lsn,
710 : }
711 :
712 : /// Layer wrapper for the read path. Note that it is valid
713 : /// to use these layers even after external operations have
714 : /// been performed on them (compaction, freeze, etc.).
715 : #[derive(Debug)]
716 : pub(crate) enum ReadableLayer {
717 : PersistentLayer(Layer),
718 : InMemoryLayer(Arc<InMemoryLayer>),
719 : }
720 :
721 : /// A partial description of a read to be done.
722 : #[derive(Debug, Clone)]
723 : struct LayerVisit {
724 : /// An id used to resolve the readable layer within the fringe
725 : layer_to_visit_id: LayerToVisitId,
726 : /// Lsn range for the read, used for selecting the next read
727 : lsn_range: Range<Lsn>,
728 : }
729 :
730 : /// Data structure which maintains a fringe of layers for the
731 : /// read path. The fringe is the set of layers which intersects
732 : /// the current keyspace that the search is descending on.
733 : /// Each layer tracks the keyspace that intersects it.
734 : ///
735 : /// The fringe must appear sorted by Lsn. Hence, it uses
736 : /// a two layer indexing scheme.
737 : #[derive(Debug)]
738 : pub(crate) struct LayerFringe {
739 : planned_visits_by_lsn: BinaryHeap<LayerVisit>,
740 : visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
741 : }
742 :
743 : #[derive(Debug)]
744 : struct LayerVisitReads {
745 : layer: ReadableLayer,
746 : target_keyspace: KeySpaceRandomAccum,
747 : }
748 :
749 : impl LayerFringe {
750 1706436 : pub(crate) fn new() -> Self {
751 1706436 : LayerFringe {
752 1706436 : planned_visits_by_lsn: BinaryHeap::new(),
753 1706436 : visit_reads: HashMap::new(),
754 1706436 : }
755 1706436 : }
756 :
757 3399834 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
758 3399834 : let read_desc = self.planned_visits_by_lsn.pop()?;
759 :
760 1693398 : let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
761 1693398 :
762 1693398 : match removed {
763 : Some((
764 : _,
765 : LayerVisitReads {
766 1693398 : layer,
767 1693398 : mut target_keyspace,
768 1693398 : },
769 1693398 : )) => Some((
770 1693398 : layer,
771 1693398 : target_keyspace.consume_keyspace(),
772 1693398 : read_desc.lsn_range,
773 1693398 : )),
774 0 : None => unreachable!("fringe internals are always consistent"),
775 : }
776 3399834 : }
777 :
778 1693426 : pub(crate) fn update(
779 1693426 : &mut self,
780 1693426 : layer: ReadableLayer,
781 1693426 : keyspace: KeySpace,
782 1693426 : lsn_range: Range<Lsn>,
783 1693426 : ) {
784 1693426 : let layer_to_visit_id = LayerToVisitId {
785 1693426 : layer_id: layer.id(),
786 1693426 : lsn_floor: lsn_range.start,
787 1693426 : };
788 1693426 :
789 1693426 : let entry = self.visit_reads.entry(layer_to_visit_id.clone());
790 1693426 : match entry {
791 28 : Entry::Occupied(mut entry) => {
792 28 : entry.get_mut().target_keyspace.add_keyspace(keyspace);
793 28 : }
794 1693398 : Entry::Vacant(entry) => {
795 1693398 : self.planned_visits_by_lsn.push(LayerVisit {
796 1693398 : lsn_range,
797 1693398 : layer_to_visit_id: layer_to_visit_id.clone(),
798 1693398 : });
799 1693398 : let mut accum = KeySpaceRandomAccum::new();
800 1693398 : accum.add_keyspace(keyspace);
801 1693398 : entry.insert(LayerVisitReads {
802 1693398 : layer,
803 1693398 : target_keyspace: accum,
804 1693398 : });
805 1693398 : }
806 : }
807 1693426 : }
808 : }
809 :
810 : impl Default for LayerFringe {
811 0 : fn default() -> Self {
812 0 : Self::new()
813 0 : }
814 : }
815 :
816 : impl Ord for LayerVisit {
817 60 : fn cmp(&self, other: &Self) -> Ordering {
818 60 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
819 60 : if ord == std::cmp::Ordering::Equal {
820 44 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
821 : } else {
822 16 : ord
823 : }
824 60 : }
825 : }
826 :
827 : impl PartialOrd for LayerVisit {
828 60 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
829 60 : Some(self.cmp(other))
830 60 : }
831 : }
832 :
833 : impl PartialEq for LayerVisit {
834 0 : fn eq(&self, other: &Self) -> bool {
835 0 : self.lsn_range == other.lsn_range
836 0 : }
837 : }
838 :
839 : impl Eq for LayerVisit {}
840 :
841 : impl ReadableLayer {
842 1693426 : pub(crate) fn id(&self) -> LayerId {
843 1693426 : match self {
844 480194 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
845 1213232 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
846 : }
847 1693426 : }
848 :
849 1693398 : pub(crate) async fn get_values_reconstruct_data(
850 1693398 : &self,
851 1693398 : keyspace: KeySpace,
852 1693398 : lsn_range: Range<Lsn>,
853 1693398 : reconstruct_state: &mut ValuesReconstructState,
854 1693398 : ctx: &RequestContext,
855 1693398 : ) -> Result<(), GetVectoredError> {
856 1693398 : match self {
857 480166 : ReadableLayer::PersistentLayer(layer) => {
858 480166 : layer
859 480166 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
860 480166 : .await
861 : }
862 1213232 : ReadableLayer::InMemoryLayer(layer) => {
863 1213232 : layer
864 1213232 : .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
865 1213232 : .await
866 : }
867 : }
868 1693398 : }
869 : }
870 :
871 : /// Layers contain a hint indicating whether they are likely to be used for reads.
872 : ///
873 : /// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
874 : /// when changing the visibility of layers (for example when creating a branch that makes some previously
875 : /// covered layers visible). It should be used for cache management but not for correctness-critical checks.
876 : #[derive(Debug, Clone, PartialEq, Eq)]
877 : pub enum LayerVisibilityHint {
878 : /// A Visible layer might be read while serving a read, because there is not an image layer between it
879 : /// and a readable LSN (the tip of the branch or a child's branch point)
880 : Visible,
881 : /// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
882 : /// a branch or ephemeral endpoint at an LSN below the layer that covers this.
883 : Covered,
884 : }
885 :
886 : pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
887 :
888 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
889 : pub(crate) enum LayerAccessStatsReset {
890 : NoReset,
891 : AllStats,
892 : }
893 :
894 : impl Default for LayerAccessStats {
895 3836 : fn default() -> Self {
896 3836 : // Default value is to assume resident since creation time, and visible.
897 3836 : let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
898 3836 : value |= 0x1 << Self::VISIBILITY_SHIFT;
899 3836 :
900 3836 : Self(std::sync::atomic::AtomicU64::new(value))
901 3836 : }
902 : }
903 :
904 : // Efficient store of two very-low-resolution timestamps and some bits. Used for storing last access time and
905 : // last residence change time.
906 : impl LayerAccessStats {
907 : // How many high bits to drop from a u32 timestamp?
908 : // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
909 : // after that, this software has been very successful!)
910 : // - Dropping the top bit is implicitly safe because unix timestamps are meant to be
911 : // stored in an i32, so they never used it.
912 : // - Dropping the next two bits is safe because this code is only running on systems in
913 : // years >= 2024, and these bits have been 1 since 2021
914 : //
915 : // Therefore we may store only 28 bits for a timestamp with one second resolution. We do
916 : // this truncation to make space for some flags in the high bits of our u64.
917 : const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
918 : const TS_MASK: u32 = 0x1f_ff_ff_ff;
919 : const TS_ONES: u32 = 0x60_00_00_00;
920 :
921 : const ATIME_SHIFT: u32 = 0;
922 : const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
923 : const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
924 :
925 480406 : fn write_bits(&self, mask: u64, value: u64) -> u64 {
926 480406 : self.0
927 480406 : .fetch_update(
928 480406 : // TODO: decide what orderings are correct
929 480406 : std::sync::atomic::Ordering::Relaxed,
930 480406 : std::sync::atomic::Ordering::Relaxed,
931 480406 : |v| Some((v & !mask) | (value & mask)),
932 480406 : )
933 480406 : .expect("Inner function is infallible")
934 480406 : }
935 :
936 483522 : fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
937 483522 : // Drop the low three bits of the timestamp, for an ~8s accuracy
938 483522 : let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
939 483522 :
940 483522 : ((Self::TS_MASK as u64) << shift, timestamp << shift)
941 483522 : }
942 :
943 124 : fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
944 124 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
945 124 :
946 124 : let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
947 124 : if ts_bits == 0 {
948 48 : None
949 : } else {
950 76 : Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
951 : }
952 124 : }
953 :
954 : /// Record a change in layer residency.
955 : ///
956 : /// Recording the event must happen while holding the layer map lock to
957 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
958 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
959 : ///
960 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
961 : /// the following race could happen:
962 : ///
963 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
964 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
965 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
966 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
967 52 : pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
968 52 : let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
969 52 : self.write_bits(mask, value);
970 52 : }
971 :
972 48 : pub(crate) fn record_residence_event(&self) {
973 48 : self.record_residence_event_at(SystemTime::now())
974 48 : }
975 :
976 479634 : fn record_access_at(&self, now: SystemTime) -> bool {
977 479634 : let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
978 479634 :
979 479634 : // A layer which is accessed must be visible.
980 479634 : mask |= 0x1 << Self::VISIBILITY_SHIFT;
981 479634 : value |= 0x1 << Self::VISIBILITY_SHIFT;
982 479634 :
983 479634 : let old_bits = self.write_bits(mask, value);
984 4 : !matches!(
985 479634 : self.decode_visibility(old_bits),
986 : LayerVisibilityHint::Visible
987 : )
988 479634 : }
989 :
990 : /// Returns true if we modified the layer's visibility to set it to Visible implicitly
991 : /// as a result of this access
992 480190 : pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
993 480190 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
994 568 : return false;
995 479622 : }
996 479622 :
997 479622 : self.record_access_at(SystemTime::now())
998 480190 : }
999 :
1000 0 : fn as_api_model(
1001 0 : &self,
1002 0 : reset: LayerAccessStatsReset,
1003 0 : ) -> pageserver_api::models::LayerAccessStats {
1004 0 : let ret = pageserver_api::models::LayerAccessStats {
1005 0 : access_time: self
1006 0 : .read_low_res_timestamp(Self::ATIME_SHIFT)
1007 0 : .unwrap_or(UNIX_EPOCH),
1008 0 : residence_time: self
1009 0 : .read_low_res_timestamp(Self::RTIME_SHIFT)
1010 0 : .unwrap_or(UNIX_EPOCH),
1011 0 : visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
1012 : };
1013 0 : match reset {
1014 0 : LayerAccessStatsReset::NoReset => {}
1015 0 : LayerAccessStatsReset::AllStats => {
1016 0 : self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
1017 0 : self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
1018 0 : }
1019 : }
1020 0 : ret
1021 0 : }
1022 :
1023 : /// Get the latest access timestamp, falling back to latest residence event. The latest residence event
1024 : /// will be this Layer's construction time, if its residence hasn't changed since then.
1025 32 : pub(crate) fn latest_activity(&self) -> SystemTime {
1026 32 : if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
1027 12 : t
1028 : } else {
1029 20 : self.read_low_res_timestamp(Self::RTIME_SHIFT)
1030 20 : .expect("Residence time is set on construction")
1031 : }
1032 32 : }
1033 :
1034 : /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
1035 : ///
1036 : /// This indicates whether the layer has been used for some purpose that would motivate
1037 : /// us to keep it on disk, such as for serving a getpage request.
1038 36 : fn accessed(&self) -> bool {
1039 36 : // Consider it accessed if the most recent access is more recent than
1040 36 : // the most recent change in residence status.
1041 36 : match (
1042 36 : self.read_low_res_timestamp(Self::ATIME_SHIFT),
1043 36 : self.read_low_res_timestamp(Self::RTIME_SHIFT),
1044 : ) {
1045 28 : (None, _) => false,
1046 0 : (Some(_), None) => true,
1047 8 : (Some(a), Some(r)) => a >= r,
1048 : }
1049 36 : }
1050 :
1051 : /// Helper for extracting the visibility hint from the literal value of our inner u64
1052 481782 : fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
1053 481782 : match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
1054 481738 : 1 => LayerVisibilityHint::Visible,
1055 44 : 0 => LayerVisibilityHint::Covered,
1056 0 : _ => unreachable!(),
1057 : }
1058 481782 : }
1059 :
1060 : /// Returns the old value which has been replaced
1061 720 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
1062 720 : let value = match visibility {
1063 616 : LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
1064 104 : LayerVisibilityHint::Covered => 0x0,
1065 : };
1066 :
1067 720 : let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
1068 720 : self.decode_visibility(old_bits)
1069 720 : }
1070 :
1071 1428 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
1072 1428 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
1073 1428 : self.decode_visibility(read)
1074 1428 : }
1075 : }
1076 :
1077 : /// Get a layer descriptor from a layer.
1078 : pub(crate) trait AsLayerDesc {
1079 : /// Get the layer descriptor.
1080 : fn layer_desc(&self) -> &PersistentLayerDesc;
1081 : }
1082 :
1083 : pub mod tests {
1084 : use pageserver_api::shard::TenantShardId;
1085 : use utils::id::TimelineId;
1086 :
1087 : use super::*;
1088 :
1089 : impl From<DeltaLayerName> for PersistentLayerDesc {
1090 0 : fn from(value: DeltaLayerName) -> Self {
1091 0 : PersistentLayerDesc::new_delta(
1092 0 : TenantShardId::from([0; 18]),
1093 0 : TimelineId::from_array([0; 16]),
1094 0 : value.key_range,
1095 0 : value.lsn_range,
1096 0 : 233,
1097 0 : )
1098 0 : }
1099 : }
1100 :
1101 : impl From<ImageLayerName> for PersistentLayerDesc {
1102 0 : fn from(value: ImageLayerName) -> Self {
1103 0 : PersistentLayerDesc::new_img(
1104 0 : TenantShardId::from([0; 18]),
1105 0 : TimelineId::from_array([0; 16]),
1106 0 : value.key_range,
1107 0 : value.lsn,
1108 0 : 233,
1109 0 : )
1110 0 : }
1111 : }
1112 :
1113 : impl From<LayerName> for PersistentLayerDesc {
1114 0 : fn from(value: LayerName) -> Self {
1115 0 : match value {
1116 0 : LayerName::Delta(d) => Self::from(d),
1117 0 : LayerName::Image(i) => Self::from(i),
1118 : }
1119 0 : }
1120 : }
1121 : }
1122 :
1123 : /// Range wrapping newtype, which uses display to render Debug.
1124 : ///
1125 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
1126 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
1127 :
1128 : impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
1129 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1130 0 : write!(f, "{}..{}", self.0.start, self.0.end)
1131 0 : }
1132 : }
1133 :
1134 : #[cfg(test)]
1135 : mod tests2 {
1136 : use pageserver_api::key::DBDIR_KEY;
1137 : use tracing::info;
1138 :
1139 : use super::*;
1140 : use crate::tenant::storage_layer::IoConcurrency;
1141 :
1142 : /// TODO: currently this test relies on manual visual inspection of the --no-capture output.
1143 : /// Should look like so:
1144 : /// ```text
1145 : /// RUST_LOG=trace cargo nextest run --features testing --no-capture test_io_concurrency_noise
1146 : /// running 1 test
1147 : /// 2025-01-21T17:42:01.335679Z INFO get_vectored_concurrent_io test selected=SidecarTask
1148 : /// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
1149 : /// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
1150 : /// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
1151 : /// 2025-01-21T17:42:01.335999Z INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
1152 : /// 2025-01-21T17:42:01.336229Z WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace= 0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
1153 : /// at ./src/tenant/storage_layer.rs:553:24
1154 : /// 1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
1155 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:521:1
1156 : /// 2: core::mem::drop
1157 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:942:24
1158 : /// 3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
1159 : /// at ./src/tenant/storage_layer.rs:1159:9
1160 : /// ...
1161 : /// 49: <unknown>
1162 : /// 2025-01-21T17:42:01.452293Z INFO IoConcurrency_sidecar{task_id=0}: completing IO
1163 : /// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
1164 : /// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
1165 : /// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
1166 : ///
1167 : /// ```
1168 : #[tokio::test]
1169 4 : async fn test_io_concurrency_noise() {
1170 4 : crate::tenant::harness::setup_logging();
1171 4 :
1172 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1173 4 : match *io_concurrency {
1174 4 : IoConcurrency::Sequential => {
1175 4 : // This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
1176 4 : return;
1177 4 : }
1178 4 : IoConcurrency::SidecarTask { .. } => {}
1179 2 : }
1180 2 : let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
1181 2 :
1182 2 : let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
1183 2 : let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
1184 2 : let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
1185 2 :
1186 2 : let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
1187 2 : reconstruct_state
1188 2 : .spawn_io(async move {
1189 2 : info!("waiting for signal to complete IO");
1190 4 : io_fut_is_waiting_tx.send(()).unwrap();
1191 2 : should_complete_io.await.unwrap();
1192 2 : info!("completing IO");
1193 4 : io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
1194 2 : io_fut_exiting_tx.send(()).unwrap();
1195 2 : })
1196 2 : .await;
1197 4 :
1198 4 : io_fut_is_waiting.await.unwrap();
1199 2 :
1200 2 : // this is what makes the noise
1201 2 : drop(reconstruct_state);
1202 2 :
1203 2 : do_complete_io.send(()).unwrap();
1204 2 :
1205 2 : io_fut_exiting.await.unwrap();
1206 4 : }
1207 : }
|