Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod batch_split_writer;
4 : pub mod delta_layer;
5 : pub mod filter_iterator;
6 : pub mod image_layer;
7 : pub mod inmemory_layer;
8 : pub(crate) mod layer;
9 : mod layer_desc;
10 : mod layer_name;
11 : pub mod merge_iterator;
12 :
13 : use std::cmp::Ordering;
14 : use std::collections::hash_map::Entry;
15 : use std::collections::{BinaryHeap, HashMap};
16 : use std::future::Future;
17 : use std::ops::Range;
18 : use std::pin::Pin;
19 : use std::sync::Arc;
20 : use std::sync::atomic::AtomicUsize;
21 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
22 :
23 : pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
24 : use bytes::Bytes;
25 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
26 : use futures::StreamExt;
27 : use futures::stream::FuturesUnordered;
28 : pub use image_layer::{ImageLayer, ImageLayerWriter};
29 : pub use inmemory_layer::InMemoryLayer;
30 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
31 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
32 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
33 : use pageserver_api::key::Key;
34 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
35 : use pageserver_api::record::NeonWalRecord;
36 : use pageserver_api::value::Value;
37 : use tracing::{Instrument, trace};
38 : use utils::lsn::Lsn;
39 : use utils::sync::gate::GateGuard;
40 :
41 : use self::inmemory_layer::InMemoryLayerFileId;
42 : use super::PageReconstructError;
43 : use super::timeline::{GetVectoredError, ReadPath};
44 : use crate::config::PageServerConf;
45 : use crate::context::{AccessStatsBehavior, RequestContext};
46 :
47 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
48 0 : where
49 0 : T: PartialOrd<T>,
50 0 : {
51 0 : if a.start < b.start {
52 0 : a.end > b.start
53 : } else {
54 0 : b.end > a.start
55 : }
56 0 : }
57 :
58 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
59 : ///
60 : /// Before first call, you can fill in 'page_img' if you have an older cached
61 : /// version of the page available. That can save work in
62 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
63 : /// when all the WAL records going back to the cached image have been collected.
64 : ///
65 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
66 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
67 : /// record that initializes the page without requiring a previous image.
68 : ///
69 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
70 : /// been collected, but there are more records outside the current layer. Pass
71 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
72 : /// call, to collect more records.
73 : ///
74 : #[derive(Debug, Default)]
75 : pub(crate) struct ValueReconstructState {
76 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
77 : pub(crate) img: Option<(Lsn, Bytes)>,
78 : }
79 :
80 : impl ValueReconstructState {
81 : /// Returns the number of page deltas applied to the page image.
82 1335952 : pub fn num_deltas(&self) -> usize {
83 1335952 : match self.img {
84 1335864 : Some(_) => self.records.len(),
85 88 : None => self.records.len() - 1, // omit will_init record
86 : }
87 1335952 : }
88 : }
89 :
90 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
91 : pub(crate) enum ValueReconstructSituation {
92 : Complete,
93 : #[default]
94 : Continue,
95 : }
96 :
97 : /// On disk representation of a value loaded in a buffer
98 : #[derive(Debug)]
99 : pub(crate) enum OnDiskValue {
100 : /// Unencoded [`Value::Image`]
101 : RawImage(Bytes),
102 : /// Encoded [`Value`]. Can deserialize into an image or a WAL record
103 : WalRecordOrImage(Bytes),
104 : }
105 :
106 : /// Reconstruct data accumulated for a single key during a vectored get
107 : #[derive(Debug, Default)]
108 : pub(crate) struct VectoredValueReconstructState {
109 : pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
110 :
111 : pub(crate) situation: ValueReconstructSituation,
112 : }
113 :
114 : #[derive(Debug)]
115 : pub(crate) struct OnDiskValueIoWaiter {
116 : rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
117 : }
118 :
119 : #[derive(Debug)]
120 : #[must_use]
121 : pub(crate) enum OnDiskValueIo {
122 : /// Traversal identified this IO as required to complete the vectored get.
123 : Required {
124 : num_active_ios: Arc<AtomicUsize>,
125 : tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
126 : },
127 : /// Sparse keyspace reads always read all the values for a given key,
128 : /// even though only the first value is needed.
129 : ///
130 : /// This variant represents the unnecessary IOs for those values at lower LSNs
131 : /// that aren't needed, but are currently still being done.
132 : ///
133 : /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
134 : /// We added this explicit representation here so that we can drop
135 : /// unnecessary IO results immediately, instead of buffering them in
136 : /// `oneshot` channels inside [`VectoredValueReconstructState`] until
137 : /// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
138 : Unnecessary,
139 : }
140 :
141 : type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
142 :
143 : impl OnDiskValueIo {
144 1481949 : pub(crate) fn complete(self, res: OnDiskValueIoResult) {
145 1481949 : match self {
146 1337406 : OnDiskValueIo::Required { num_active_ios, tx } => {
147 1337406 : num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
148 1337406 : let _ = tx.send(res);
149 1337406 : }
150 144543 : OnDiskValueIo::Unnecessary => {
151 144543 : // Nobody cared, see variant doc comment.
152 144543 : }
153 : }
154 1481949 : }
155 : }
156 :
157 : #[derive(Debug, thiserror::Error)]
158 : pub(crate) enum WaitCompletionError {
159 : #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
160 : IoDropped,
161 : }
162 :
163 : impl OnDiskValueIoWaiter {
164 1337404 : pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
165 1337404 : // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
166 1337404 : self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
167 1337404 : }
168 : }
169 :
170 : impl VectoredValueReconstructState {
171 : /// # Cancel-Safety
172 : ///
173 : /// Technically fine to stop polling this future, but, the IOs will still
174 : /// be executed to completion by the sidecar task and hold on to / consume resources.
175 : /// Better not do it to make reasonsing about the system easier.
176 1336092 : pub(crate) async fn collect_pending_ios(
177 1336092 : self,
178 1336092 : ) -> Result<ValueReconstructState, PageReconstructError> {
179 : use utils::bin_ser::BeSer;
180 :
181 1336092 : let mut res = Ok(ValueReconstructState::default());
182 :
183 : // We should try hard not to bail early, so that by the time we return from this
184 : // function, all IO for this value is done. It's not required -- we could totally
185 : // stop polling the IO futures in the sidecar task, they need to support that,
186 : // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
187 : // to reason about the system if we just wait for all IO to complete, even if
188 : // we're no longer interested in the result.
189 : //
190 : // Revisit this when IO futures are replaced with a more sophisticated IO system
191 : // and an IO scheduler, where we know which IOs were submitted and which ones
192 : // just queued. Cf the comment on IoConcurrency::spawn_io.
193 2673496 : for (lsn, waiter) in self.on_disk_values {
194 1337404 : let value_recv_res = waiter
195 1337404 : .wait_completion()
196 1337404 : // we rely on the caller to poll us to completion, so this is not a bail point
197 1337404 : .await;
198 : // Force not bailing early by wrapping the code into a closure.
199 : #[allow(clippy::redundant_closure_call)]
200 1337404 : let _: () = (|| {
201 1337404 : match (&mut res, value_recv_res) {
202 0 : (Err(_), _) => {
203 0 : // We've already failed, no need to process more.
204 0 : }
205 0 : (Ok(_), Err(wait_err)) => {
206 0 : // This shouldn't happen - likely the sidecar task panicked.
207 0 : res = Err(PageReconstructError::Other(wait_err.into()));
208 0 : }
209 0 : (Ok(_), Ok(Err(err))) => {
210 0 : let err: std::io::Error = err;
211 0 : // TODO: returning IO error here will fail a compute query.
212 0 : // Probably not what we want, we're not doing `maybe_fatal_err`
213 0 : // in the IO futures.
214 0 : // But it's been like that for a long time, not changing it
215 0 : // as part of concurrent IO.
216 0 : // => https://github.com/neondatabase/neon/issues/10454
217 0 : res = Err(PageReconstructError::Other(err.into()));
218 0 : }
219 38396 : (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
220 38396 : assert!(ok.img.is_none());
221 38396 : ok.img = Some((lsn, img));
222 : }
223 1299008 : (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
224 1299008 : match Value::des(&buf) {
225 1400 : Ok(Value::WalRecord(rec)) => {
226 1400 : ok.records.push((lsn, rec));
227 1400 : }
228 1297608 : Ok(Value::Image(img)) => {
229 1297608 : assert!(ok.img.is_none());
230 1297608 : ok.img = Some((lsn, img));
231 : }
232 0 : Err(err) => {
233 0 : res = Err(PageReconstructError::Other(err.into()));
234 0 : }
235 : }
236 : }
237 : }
238 1337404 : })();
239 1337404 : }
240 :
241 1336092 : res
242 1336092 : }
243 : }
244 :
245 : /// Bag of data accumulated during a vectored get..
246 : pub(crate) struct ValuesReconstructState {
247 : /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
248 : /// should not expect to get anything from this hashmap.
249 : pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
250 : /// The keys which are already retrieved
251 : keys_done: KeySpaceRandomAccum,
252 :
253 : /// The keys covered by the image layers
254 : keys_with_image_coverage: Option<Range<Key>>,
255 :
256 : // Statistics that are still accessible as a caller of `get_vectored_impl`.
257 : layers_visited: u32,
258 : delta_layers_visited: u32,
259 :
260 : pub(crate) io_concurrency: IoConcurrency,
261 : num_active_ios: Arc<AtomicUsize>,
262 :
263 : pub(crate) read_path: Option<ReadPath>,
264 : }
265 :
266 : /// The level of IO concurrency to be used on the read path
267 : ///
268 : /// The desired end state is that we always do parallel IO.
269 : /// This struct and the dispatching in the impl will be removed once
270 : /// we've built enough confidence.
271 : pub(crate) enum IoConcurrency {
272 : Sequential,
273 : SidecarTask {
274 : task_id: usize,
275 : ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
276 : },
277 : }
278 :
279 : type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
280 :
281 : pub(crate) enum SelectedIoConcurrency {
282 : Sequential,
283 : SidecarTask(GateGuard),
284 : }
285 :
286 : impl std::fmt::Debug for IoConcurrency {
287 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 0 : match self {
289 0 : IoConcurrency::Sequential => write!(f, "Sequential"),
290 0 : IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
291 : }
292 0 : }
293 : }
294 :
295 : impl std::fmt::Debug for SelectedIoConcurrency {
296 64 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 64 : match self {
298 32 : SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
299 32 : SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
300 : }
301 64 : }
302 : }
303 :
304 : impl IoConcurrency {
305 : /// Force sequential IO. This is a temporary workaround until we have
306 : /// moved plumbing-through-the-call-stack
307 : /// of IoConcurrency into `RequestContextq.
308 : ///
309 : /// DO NOT USE for new code.
310 : ///
311 : /// Tracking issue: <https://github.com/neondatabase/neon/issues/10460>.
312 1215204 : pub(crate) fn sequential() -> Self {
313 1215204 : Self::spawn(SelectedIoConcurrency::Sequential)
314 1215204 : }
315 :
316 1020 : pub(crate) fn spawn_from_conf(
317 1020 : conf: &'static PageServerConf,
318 1020 : gate_guard: GateGuard,
319 1020 : ) -> IoConcurrency {
320 : use pageserver_api::config::GetVectoredConcurrentIo;
321 1020 : let selected = match conf.get_vectored_concurrent_io {
322 1020 : GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
323 0 : GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
324 : };
325 1020 : Self::spawn(selected)
326 1020 : }
327 :
328 1216288 : pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
329 1216288 : match io_concurrency {
330 1216256 : SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
331 32 : SelectedIoConcurrency::SidecarTask(gate_guard) => {
332 32 : let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
333 : static TASK_ID: AtomicUsize = AtomicUsize::new(0);
334 32 : let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
335 : // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
336 32 : let span =
337 32 : tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
338 32 : trace!(task_id, "spawning sidecar task");
339 32 : tokio::spawn(async move {
340 32 : trace!("start");
341 32 : scopeguard::defer!{ trace!("end") };
342 : type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
343 : enum State {
344 : Waiting {
345 : // invariant: is_empty(), but we recycle the allocation
346 : empty_futures: FuturesUnordered<IoFuture>,
347 : ios_rx: IosRx,
348 : },
349 : Executing {
350 : futures: FuturesUnordered<IoFuture>,
351 : ios_rx: IosRx,
352 : },
353 : ShuttingDown {
354 : futures: FuturesUnordered<IoFuture>,
355 : },
356 : }
357 32 : let mut state = State::Waiting {
358 32 : empty_futures: FuturesUnordered::new(),
359 32 : ios_rx,
360 32 : };
361 : loop {
362 39152 : match state {
363 : State::Waiting {
364 18590 : empty_futures,
365 18590 : mut ios_rx,
366 18590 : } => {
367 18590 : assert!(empty_futures.is_empty());
368 18590 : tokio::select! {
369 18590 : fut = ios_rx.recv() => {
370 18558 : if let Some(fut) = fut {
371 18558 : trace!("received new io future");
372 18558 : empty_futures.push(fut);
373 18558 : state = State::Executing { futures: empty_futures, ios_rx };
374 : } else {
375 0 : state = State::ShuttingDown { futures: empty_futures }
376 : }
377 : }
378 : }
379 : }
380 : State::Executing {
381 20562 : mut futures,
382 20562 : mut ios_rx,
383 20562 : } => {
384 20562 : tokio::select! {
385 20562 : res = futures.next() => {
386 19560 : trace!("io future completed");
387 19560 : assert!(res.is_some());
388 19560 : if futures.is_empty() {
389 18558 : state = State::Waiting { empty_futures: futures, ios_rx};
390 18558 : } else {
391 1002 : state = State::Executing { futures, ios_rx };
392 1002 : }
393 : }
394 20562 : fut = ios_rx.recv() => {
395 1002 : if let Some(fut) = fut {
396 1002 : trace!("received new io future");
397 1002 : futures.push(fut);
398 1002 : state = State::Executing { futures, ios_rx};
399 0 : } else {
400 0 : state = State::ShuttingDown { futures };
401 0 : }
402 : }
403 : }
404 : }
405 : State::ShuttingDown {
406 0 : mut futures,
407 0 : } => {
408 0 : trace!("shutting down");
409 0 : while let Some(()) = futures.next().await {
410 0 : trace!("io future completed (shutdown)");
411 : // drain
412 : }
413 0 : trace!("shutdown complete");
414 0 : break;
415 0 : }
416 0 : }
417 0 : }
418 0 : drop(gate_guard); // drop it right before we exit
419 32 : }.instrument(span));
420 32 : IoConcurrency::SidecarTask { task_id, ios_tx }
421 : }
422 : }
423 1216288 : }
424 :
425 76362 : pub(crate) fn clone(&self) -> Self {
426 76362 : match self {
427 39484 : IoConcurrency::Sequential => IoConcurrency::Sequential,
428 36878 : IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
429 36878 : task_id: *task_id,
430 36878 : ios_tx: ios_tx.clone(),
431 36878 : },
432 : }
433 76362 : }
434 :
435 : /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
436 : ///
437 : /// The IO is represented as an opaque future.
438 : /// IO completion must be handled inside the future, e.g., through a oneshot channel.
439 : ///
440 : /// The API seems simple but there are multiple **pitfalls** involving
441 : /// DEADLOCK RISK.
442 : ///
443 : /// First, there are no guarantees about the exexecution of the IO.
444 : /// It may be `await`ed in-place before this function returns.
445 : /// It may be polled partially by this task and handed off to another task to be finished.
446 : /// It may be polled and then dropped before returning ready.
447 : ///
448 : /// This means that submitted IOs must not be interedependent.
449 : /// Interdependence may be through shared limited resources, e.g.,
450 : /// - VirtualFile file descriptor cache slot acquisition
451 : /// - tokio-epoll-uring slot
452 : ///
453 : /// # Why current usage is safe from deadlocks
454 : ///
455 : /// Textbook condition for a deadlock is that _all_ of the following be given
456 : /// - Mutual exclusion
457 : /// - Hold and wait
458 : /// - No preemption
459 : /// - Circular wait
460 : ///
461 : /// The current usage is safe because:
462 : /// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
463 : /// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
464 : /// for acquisition of other resources:
465 : /// - VirtualFile file descriptor cache slot tokio mutex
466 : /// - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
467 : /// - No preemption: there's no taking-away of acquired locks/resources => given
468 : /// - Circular wait: this is the part of the condition that isn't met: all IO futures
469 : /// first acquire VirtualFile mutex, then tokio-epoll-uring slot.
470 : /// There is no IO future that acquires slot before VirtualFile.
471 : /// Hence there can be no circular waiting.
472 : /// Hence there cannot be a deadlock.
473 : ///
474 : /// This is a very fragile situation and must be revisited whenver any code called from
475 : /// inside the IO futures is changed.
476 : ///
477 : /// We will move away from opaque IO futures towards well-defined IOs at some point in
478 : /// the future when we have shipped this first version of concurrent IO to production
479 : /// and are ready to retire the Sequential mode which runs the futures in place.
480 : /// Right now, while brittle, the opaque IO approach allows us to ship the feature
481 : /// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
482 : ///
483 : /// Also read the comment in `collect_pending_ios`.
484 1525552 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
485 1525552 : where
486 1525552 : F: std::future::Future<Output = ()> + Send + 'static,
487 1525552 : {
488 1525552 : match self {
489 1505992 : IoConcurrency::Sequential => fut.await,
490 19560 : IoConcurrency::SidecarTask { ios_tx, .. } => {
491 19560 : let fut = Box::pin(fut);
492 19560 : // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
493 19560 : // while insignificant for latency.
494 19560 : // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
495 19560 : // a submission here, but never poll the future. That way, io_uring can make proccess while
496 19560 : // the future sits in the ios_tx queue.
497 19560 : match ios_tx.send(fut) {
498 19560 : Ok(()) => {}
499 : Err(_) => {
500 0 : unreachable!("the io task must have exited, likely it panicked")
501 : }
502 : }
503 : }
504 : }
505 1525552 : }
506 :
507 : #[cfg(test)]
508 64 : pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
509 : use std::ops::{Deref, DerefMut};
510 :
511 : use tracing::info;
512 : use utils::sync::gate::Gate;
513 :
514 : // Spawn needs a Gate, give it one.
515 : struct Wrapper {
516 : inner: IoConcurrency,
517 : #[allow(dead_code)]
518 : gate: Box<Gate>,
519 : }
520 : impl Deref for Wrapper {
521 : type Target = IoConcurrency;
522 :
523 36974 : fn deref(&self) -> &Self::Target {
524 36974 : &self.inner
525 36974 : }
526 : }
527 : impl DerefMut for Wrapper {
528 0 : fn deref_mut(&mut self) -> &mut Self::Target {
529 0 : &mut self.inner
530 0 : }
531 : }
532 64 : let gate = Box::new(Gate::default());
533 :
534 : // The default behavior when running Rust unit tests without any further
535 : // flags is to use the new behavior.
536 : // The CI uses the following environment variable to unit test both old
537 : // and new behavior.
538 : // NB: the Python regression & perf tests take the `else` branch
539 : // below and have their own defaults management.
540 64 : let selected = {
541 : // The pageserver_api::config type is unsuitable because it's internally tagged.
542 64 : #[derive(serde::Deserialize)]
543 : #[serde(rename_all = "kebab-case")]
544 : enum TestOverride {
545 : Sequential,
546 : SidecarTask,
547 : }
548 : use once_cell::sync::Lazy;
549 64 : static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
550 64 : utils::env::var_serde_json_string(
551 64 : "NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
552 64 : )
553 64 : .unwrap_or(TestOverride::SidecarTask)
554 64 : });
555 :
556 64 : match *TEST_OVERRIDE {
557 32 : TestOverride::Sequential => SelectedIoConcurrency::Sequential,
558 : TestOverride::SidecarTask => {
559 32 : SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
560 : }
561 : }
562 : };
563 :
564 64 : info!(?selected, "get_vectored_concurrent_io test");
565 :
566 64 : Wrapper {
567 64 : inner: Self::spawn(selected),
568 64 : gate,
569 64 : }
570 64 : }
571 : }
572 :
573 : /// Make noise in case the [`ValuesReconstructState`] gets dropped while
574 : /// there are still IOs in flight.
575 : /// Refer to `collect_pending_ios` for why we prefer not to do that.
576 : //
577 : /// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
578 : /// gets dropped in a tracing span with more context.
579 : /// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
580 : /// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
581 : impl Drop for ValuesReconstructState {
582 1255342 : fn drop(&mut self) {
583 1255342 : let num_active_ios = self
584 1255342 : .num_active_ios
585 1255342 : .load(std::sync::atomic::Ordering::Acquire);
586 1255342 : if num_active_ios == 0 {
587 1255340 : return;
588 2 : }
589 2 : let sidecar_task_id = match &self.io_concurrency {
590 0 : IoConcurrency::Sequential => None,
591 2 : IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
592 : };
593 2 : tracing::warn!(
594 : num_active_ios,
595 : ?sidecar_task_id,
596 0 : backtrace=%std::backtrace::Backtrace::force_capture(),
597 0 : "dropping ValuesReconstructState while some IOs have not been completed",
598 : );
599 1255342 : }
600 : }
601 :
602 : impl ValuesReconstructState {
603 1255342 : pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
604 1255342 : Self {
605 1255342 : keys: HashMap::new(),
606 1255342 : keys_done: KeySpaceRandomAccum::new(),
607 1255342 : keys_with_image_coverage: None,
608 1255342 : layers_visited: 0,
609 1255342 : delta_layers_visited: 0,
610 1255342 : io_concurrency,
611 1255342 : num_active_ios: Arc::new(AtomicUsize::new(0)),
612 1255342 : read_path: None,
613 1255342 : }
614 1255342 : }
615 :
616 : /// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
617 1525552 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
618 1525552 : where
619 1525552 : F: std::future::Future<Output = ()> + Send + 'static,
620 1525552 : {
621 1525552 : self.io_concurrency.spawn_io(fut).await;
622 1525552 : }
623 :
624 1693344 : pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
625 1693344 : self.layers_visited += 1;
626 1693344 : if let ReadableLayer::PersistentLayer(layer) = layer {
627 479975 : if layer.layer_desc().is_delta() {
628 434883 : self.delta_layers_visited += 1;
629 434883 : }
630 1213369 : }
631 1693344 : }
632 :
633 460 : pub(crate) fn get_delta_layers_visited(&self) -> u32 {
634 460 : self.delta_layers_visited
635 460 : }
636 :
637 1255284 : pub(crate) fn get_layers_visited(&self) -> u32 {
638 1255284 : self.layers_visited
639 1255284 : }
640 :
641 : /// On hitting image layer, we can mark all keys in this range as done, because
642 : /// if the image layer does not contain a key, it is deleted/never added.
643 45116 : pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
644 45116 : let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
645 45116 : assert_eq!(
646 : prev_val, None,
647 0 : "should consume the keyspace before the next iteration"
648 : );
649 45116 : }
650 :
651 : /// Update the state collected for a given key.
652 : /// Returns true if this was the last value needed for the key and false otherwise.
653 : ///
654 : /// If the key is done after the update, mark it as such.
655 : ///
656 : /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
657 : /// `key_done`.
658 : // TODO: rename this method & update description.
659 1481949 : pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
660 1481949 : let state = self.keys.entry(*key).or_default();
661 1481949 :
662 1481949 : let is_sparse_key = key.is_sparse();
663 :
664 1481949 : let required_io = match state.situation {
665 : ValueReconstructSituation::Complete => {
666 144543 : if is_sparse_key {
667 : // Sparse keyspace might be visited multiple times because
668 : // we don't track unmapped keyspaces.
669 144543 : return OnDiskValueIo::Unnecessary;
670 : } else {
671 0 : unreachable!()
672 : }
673 : }
674 : ValueReconstructSituation::Continue => {
675 1337406 : self.num_active_ios
676 1337406 : .fetch_add(1, std::sync::atomic::Ordering::Release);
677 1337406 : let (tx, rx) = tokio::sync::oneshot::channel();
678 1337406 : state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
679 1337406 : OnDiskValueIo::Required {
680 1337406 : tx,
681 1337406 : num_active_ios: Arc::clone(&self.num_active_ios),
682 1337406 : }
683 1337406 : }
684 1337406 : };
685 1337406 :
686 1337406 : if completes && state.situation == ValueReconstructSituation::Continue {
687 1336094 : state.situation = ValueReconstructSituation::Complete;
688 1336094 : if !is_sparse_key {
689 1208578 : self.keys_done.add_key(*key);
690 1208578 : }
691 1312 : }
692 :
693 1337406 : required_io
694 1481949 : }
695 :
696 : /// Returns the key space describing the keys that have
697 : /// been marked as completed since the last call to this function.
698 : /// Returns individual keys done, and the image layer coverage.
699 3399539 : pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
700 3399539 : (
701 3399539 : self.keys_done.consume_keyspace(),
702 3399539 : self.keys_with_image_coverage.take(),
703 3399539 : )
704 3399539 : }
705 : }
706 :
707 : /// A key that uniquely identifies a layer in a timeline
708 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
709 : pub(crate) enum LayerId {
710 : PersitentLayerId(PersistentLayerKey),
711 : InMemoryLayerId(InMemoryLayerFileId),
712 : }
713 :
714 : /// Uniquely identify a layer visit by the layer
715 : /// and LSN floor (or start LSN) of the reads.
716 : /// The layer itself is not enough since we may
717 : /// have different LSN lower bounds for delta layer reads.
718 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
719 : struct LayerToVisitId {
720 : layer_id: LayerId,
721 : lsn_floor: Lsn,
722 : }
723 :
724 : /// Layer wrapper for the read path. Note that it is valid
725 : /// to use these layers even after external operations have
726 : /// been performed on them (compaction, freeze, etc.).
727 : #[derive(Debug)]
728 : pub(crate) enum ReadableLayer {
729 : PersistentLayer(Layer),
730 : InMemoryLayer(Arc<InMemoryLayer>),
731 : }
732 :
733 : /// A partial description of a read to be done.
734 : #[derive(Debug, Clone)]
735 : struct LayerVisit {
736 : /// An id used to resolve the readable layer within the fringe
737 : layer_to_visit_id: LayerToVisitId,
738 : /// Lsn range for the read, used for selecting the next read
739 : lsn_range: Range<Lsn>,
740 : }
741 :
742 : /// Data structure which maintains a fringe of layers for the
743 : /// read path. The fringe is the set of layers which intersects
744 : /// the current keyspace that the search is descending on.
745 : /// Each layer tracks the keyspace that intersects it.
746 : ///
747 : /// The fringe must appear sorted by Lsn. Hence, it uses
748 : /// a two layer indexing scheme.
749 : #[derive(Debug)]
750 : pub(crate) struct LayerFringe {
751 : planned_visits_by_lsn: BinaryHeap<LayerVisit>,
752 : visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
753 : }
754 :
755 : #[derive(Debug)]
756 : struct LayerVisitReads {
757 : layer: ReadableLayer,
758 : target_keyspace: KeySpaceRandomAccum,
759 : }
760 :
761 : impl LayerFringe {
762 1706195 : pub(crate) fn new() -> Self {
763 1706195 : LayerFringe {
764 1706195 : planned_visits_by_lsn: BinaryHeap::new(),
765 1706195 : visit_reads: HashMap::new(),
766 1706195 : }
767 1706195 : }
768 :
769 3399539 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
770 3399539 : let read_desc = self.planned_visits_by_lsn.pop()?;
771 :
772 1693344 : let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
773 1693344 :
774 1693344 : match removed {
775 : Some((
776 : _,
777 : LayerVisitReads {
778 1693344 : layer,
779 1693344 : mut target_keyspace,
780 1693344 : },
781 1693344 : )) => Some((
782 1693344 : layer,
783 1693344 : target_keyspace.consume_keyspace(),
784 1693344 : read_desc.lsn_range,
785 1693344 : )),
786 0 : None => unreachable!("fringe internals are always consistent"),
787 : }
788 3399539 : }
789 :
790 1693432 : pub(crate) fn update(
791 1693432 : &mut self,
792 1693432 : layer: ReadableLayer,
793 1693432 : keyspace: KeySpace,
794 1693432 : lsn_range: Range<Lsn>,
795 1693432 : ) {
796 1693432 : let layer_to_visit_id = LayerToVisitId {
797 1693432 : layer_id: layer.id(),
798 1693432 : lsn_floor: lsn_range.start,
799 1693432 : };
800 1693432 :
801 1693432 : let entry = self.visit_reads.entry(layer_to_visit_id.clone());
802 1693432 : match entry {
803 88 : Entry::Occupied(mut entry) => {
804 88 : entry.get_mut().target_keyspace.add_keyspace(keyspace);
805 88 : }
806 1693344 : Entry::Vacant(entry) => {
807 1693344 : self.planned_visits_by_lsn.push(LayerVisit {
808 1693344 : lsn_range,
809 1693344 : layer_to_visit_id: layer_to_visit_id.clone(),
810 1693344 : });
811 1693344 : let mut accum = KeySpaceRandomAccum::new();
812 1693344 : accum.add_keyspace(keyspace);
813 1693344 : entry.insert(LayerVisitReads {
814 1693344 : layer,
815 1693344 : target_keyspace: accum,
816 1693344 : });
817 1693344 : }
818 : }
819 1693432 : }
820 : }
821 :
822 : impl Default for LayerFringe {
823 0 : fn default() -> Self {
824 0 : Self::new()
825 0 : }
826 : }
827 :
828 : impl Ord for LayerVisit {
829 60 : fn cmp(&self, other: &Self) -> Ordering {
830 60 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
831 60 : if ord == std::cmp::Ordering::Equal {
832 44 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
833 : } else {
834 16 : ord
835 : }
836 60 : }
837 : }
838 :
839 : impl PartialOrd for LayerVisit {
840 60 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
841 60 : Some(self.cmp(other))
842 60 : }
843 : }
844 :
845 : impl PartialEq for LayerVisit {
846 0 : fn eq(&self, other: &Self) -> bool {
847 0 : self.lsn_range == other.lsn_range
848 0 : }
849 : }
850 :
851 : impl Eq for LayerVisit {}
852 :
853 : impl ReadableLayer {
854 1693432 : pub(crate) fn id(&self) -> LayerId {
855 1693432 : match self {
856 480063 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
857 1213369 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
858 : }
859 1693432 : }
860 :
861 1693344 : pub(crate) async fn get_values_reconstruct_data(
862 1693344 : &self,
863 1693344 : keyspace: KeySpace,
864 1693344 : lsn_range: Range<Lsn>,
865 1693344 : reconstruct_state: &mut ValuesReconstructState,
866 1693344 : ctx: &RequestContext,
867 1693344 : ) -> Result<(), GetVectoredError> {
868 1693344 : match self {
869 479975 : ReadableLayer::PersistentLayer(layer) => {
870 479975 : layer
871 479975 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
872 479975 : .await
873 : }
874 1213369 : ReadableLayer::InMemoryLayer(layer) => {
875 1213369 : layer
876 1213369 : .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
877 1213369 : .await
878 : }
879 : }
880 1693344 : }
881 : }
882 :
883 : /// Layers contain a hint indicating whether they are likely to be used for reads.
884 : ///
885 : /// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
886 : /// when changing the visibility of layers (for example when creating a branch that makes some previously
887 : /// covered layers visible). It should be used for cache management but not for correctness-critical checks.
888 : #[derive(Debug, Clone, PartialEq, Eq)]
889 : pub enum LayerVisibilityHint {
890 : /// A Visible layer might be read while serving a read, because there is not an image layer between it
891 : /// and a readable LSN (the tip of the branch or a child's branch point)
892 : Visible,
893 : /// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
894 : /// a branch or ephemeral endpoint at an LSN below the layer that covers this.
895 : Covered,
896 : }
897 :
898 : pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
899 :
900 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
901 : pub(crate) enum LayerAccessStatsReset {
902 : NoReset,
903 : AllStats,
904 : }
905 :
906 : impl Default for LayerAccessStats {
907 3848 : fn default() -> Self {
908 3848 : // Default value is to assume resident since creation time, and visible.
909 3848 : let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
910 3848 : value |= 0x1 << Self::VISIBILITY_SHIFT;
911 3848 :
912 3848 : Self(std::sync::atomic::AtomicU64::new(value))
913 3848 : }
914 : }
915 :
916 : // Efficient store of two very-low-resolution timestamps and some bits. Used for storing last access time and
917 : // last residence change time.
918 : impl LayerAccessStats {
919 : // How many high bits to drop from a u32 timestamp?
920 : // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
921 : // after that, this software has been very successful!)
922 : // - Dropping the top bit is implicitly safe because unix timestamps are meant to be
923 : // stored in an i32, so they never used it.
924 : // - Dropping the next two bits is safe because this code is only running on systems in
925 : // years >= 2024, and these bits have been 1 since 2021
926 : //
927 : // Therefore we may store only 28 bits for a timestamp with one second resolution. We do
928 : // this truncation to make space for some flags in the high bits of our u64.
929 : const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
930 : const TS_MASK: u32 = 0x1f_ff_ff_ff;
931 : const TS_ONES: u32 = 0x60_00_00_00;
932 :
933 : const ATIME_SHIFT: u32 = 0;
934 : const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
935 : const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
936 :
937 480275 : fn write_bits(&self, mask: u64, value: u64) -> u64 {
938 480275 : self.0
939 480275 : .fetch_update(
940 480275 : // TODO: decide what orderings are correct
941 480275 : std::sync::atomic::Ordering::Relaxed,
942 480275 : std::sync::atomic::Ordering::Relaxed,
943 480275 : |v| Some((v & !mask) | (value & mask)),
944 480275 : )
945 480275 : .expect("Inner function is infallible")
946 480275 : }
947 :
948 483387 : fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
949 483387 : // Drop the low three bits of the timestamp, for an ~8s accuracy
950 483387 : let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
951 483387 :
952 483387 : ((Self::TS_MASK as u64) << shift, timestamp << shift)
953 483387 : }
954 :
955 292 : fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
956 292 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
957 292 :
958 292 : let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
959 292 : if ts_bits == 0 {
960 132 : None
961 : } else {
962 160 : Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
963 : }
964 292 : }
965 :
966 : /// Record a change in layer residency.
967 : ///
968 : /// Recording the event must happen while holding the layer map lock to
969 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
970 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
971 : ///
972 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
973 : /// the following race could happen:
974 : ///
975 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
976 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
977 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
978 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
979 100 : pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
980 100 : let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
981 100 : self.write_bits(mask, value);
982 100 : }
983 :
984 96 : pub(crate) fn record_residence_event(&self) {
985 96 : self.record_residence_event_at(SystemTime::now())
986 96 : }
987 :
988 479439 : fn record_access_at(&self, now: SystemTime) -> bool {
989 479439 : let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
990 479439 :
991 479439 : // A layer which is accessed must be visible.
992 479439 : mask |= 0x1 << Self::VISIBILITY_SHIFT;
993 479439 : value |= 0x1 << Self::VISIBILITY_SHIFT;
994 479439 :
995 479439 : let old_bits = self.write_bits(mask, value);
996 4 : !matches!(
997 479439 : self.decode_visibility(old_bits),
998 : LayerVisibilityHint::Visible
999 : )
1000 479439 : }
1001 :
1002 : /// Returns true if we modified the layer's visibility to set it to Visible implicitly
1003 : /// as a result of this access
1004 479999 : pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
1005 479999 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
1006 572 : return false;
1007 479427 : }
1008 479427 :
1009 479427 : self.record_access_at(SystemTime::now())
1010 479999 : }
1011 :
1012 0 : fn as_api_model(
1013 0 : &self,
1014 0 : reset: LayerAccessStatsReset,
1015 0 : ) -> pageserver_api::models::LayerAccessStats {
1016 0 : let ret = pageserver_api::models::LayerAccessStats {
1017 0 : access_time: self
1018 0 : .read_low_res_timestamp(Self::ATIME_SHIFT)
1019 0 : .unwrap_or(UNIX_EPOCH),
1020 0 : residence_time: self
1021 0 : .read_low_res_timestamp(Self::RTIME_SHIFT)
1022 0 : .unwrap_or(UNIX_EPOCH),
1023 0 : visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
1024 : };
1025 0 : match reset {
1026 0 : LayerAccessStatsReset::NoReset => {}
1027 0 : LayerAccessStatsReset::AllStats => {
1028 0 : self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
1029 0 : self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
1030 0 : }
1031 : }
1032 0 : ret
1033 0 : }
1034 :
1035 : /// Get the latest access timestamp, falling back to latest residence event. The latest residence event
1036 : /// will be this Layer's construction time, if its residence hasn't changed since then.
1037 84 : pub(crate) fn latest_activity(&self) -> SystemTime {
1038 84 : if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
1039 12 : t
1040 : } else {
1041 72 : self.read_low_res_timestamp(Self::RTIME_SHIFT)
1042 72 : .expect("Residence time is set on construction")
1043 : }
1044 84 : }
1045 :
1046 : /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
1047 : ///
1048 : /// This indicates whether the layer has been used for some purpose that would motivate
1049 : /// us to keep it on disk, such as for serving a getpage request.
1050 68 : fn accessed(&self) -> bool {
1051 68 : // Consider it accessed if the most recent access is more recent than
1052 68 : // the most recent change in residence status.
1053 68 : match (
1054 68 : self.read_low_res_timestamp(Self::ATIME_SHIFT),
1055 68 : self.read_low_res_timestamp(Self::RTIME_SHIFT),
1056 : ) {
1057 60 : (None, _) => false,
1058 0 : (Some(_), None) => true,
1059 8 : (Some(a), Some(r)) => a >= r,
1060 : }
1061 68 : }
1062 :
1063 : /// Helper for extracting the visibility hint from the literal value of our inner u64
1064 481766 : fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
1065 481766 : match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
1066 481718 : 1 => LayerVisibilityHint::Visible,
1067 48 : 0 => LayerVisibilityHint::Covered,
1068 0 : _ => unreachable!(),
1069 : }
1070 481766 : }
1071 :
1072 : /// Returns the old value which has been replaced
1073 736 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
1074 736 : let value = match visibility {
1075 632 : LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
1076 104 : LayerVisibilityHint::Covered => 0x0,
1077 : };
1078 :
1079 736 : let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
1080 736 : self.decode_visibility(old_bits)
1081 736 : }
1082 :
1083 1591 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
1084 1591 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
1085 1591 : self.decode_visibility(read)
1086 1591 : }
1087 : }
1088 :
1089 : /// Get a layer descriptor from a layer.
1090 : pub(crate) trait AsLayerDesc {
1091 : /// Get the layer descriptor.
1092 : fn layer_desc(&self) -> &PersistentLayerDesc;
1093 : }
1094 :
1095 : pub mod tests {
1096 : use pageserver_api::shard::TenantShardId;
1097 : use utils::id::TimelineId;
1098 :
1099 : use super::*;
1100 :
1101 : impl From<DeltaLayerName> for PersistentLayerDesc {
1102 44 : fn from(value: DeltaLayerName) -> Self {
1103 44 : PersistentLayerDesc::new_delta(
1104 44 : TenantShardId::from([0; 18]),
1105 44 : TimelineId::from_array([0; 16]),
1106 44 : value.key_range,
1107 44 : value.lsn_range,
1108 44 : 233,
1109 44 : )
1110 44 : }
1111 : }
1112 :
1113 : impl From<ImageLayerName> for PersistentLayerDesc {
1114 48 : fn from(value: ImageLayerName) -> Self {
1115 48 : PersistentLayerDesc::new_img(
1116 48 : TenantShardId::from([0; 18]),
1117 48 : TimelineId::from_array([0; 16]),
1118 48 : value.key_range,
1119 48 : value.lsn,
1120 48 : 233,
1121 48 : )
1122 48 : }
1123 : }
1124 :
1125 : impl From<LayerName> for PersistentLayerDesc {
1126 92 : fn from(value: LayerName) -> Self {
1127 92 : match value {
1128 44 : LayerName::Delta(d) => Self::from(d),
1129 48 : LayerName::Image(i) => Self::from(i),
1130 : }
1131 92 : }
1132 : }
1133 : }
1134 :
1135 : /// Range wrapping newtype, which uses display to render Debug.
1136 : ///
1137 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
1138 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
1139 :
1140 : impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
1141 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1142 0 : write!(f, "{}..{}", self.0.start, self.0.end)
1143 0 : }
1144 : }
1145 :
1146 : #[cfg(test)]
1147 : mod tests2 {
1148 : use pageserver_api::key::DBDIR_KEY;
1149 : use tracing::info;
1150 :
1151 : use super::*;
1152 : use crate::tenant::storage_layer::IoConcurrency;
1153 :
1154 : /// TODO: currently this test relies on manual visual inspection of the --no-capture output.
1155 : /// Should look like so:
1156 : /// ```text
1157 : /// RUST_LOG=trace cargo nextest run --features testing --no-capture test_io_concurrency_noise
1158 : /// running 1 test
1159 : /// 2025-01-21T17:42:01.335679Z INFO get_vectored_concurrent_io test selected=SidecarTask
1160 : /// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
1161 : /// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
1162 : /// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
1163 : /// 2025-01-21T17:42:01.335999Z INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
1164 : /// 2025-01-21T17:42:01.336229Z WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace= 0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
1165 : /// at ./src/tenant/storage_layer.rs:553:24
1166 : /// 1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
1167 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:521:1
1168 : /// 2: core::mem::drop
1169 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:942:24
1170 : /// 3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
1171 : /// at ./src/tenant/storage_layer.rs:1159:9
1172 : /// ...
1173 : /// 49: <unknown>
1174 : /// 2025-01-21T17:42:01.452293Z INFO IoConcurrency_sidecar{task_id=0}: completing IO
1175 : /// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
1176 : /// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
1177 : /// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
1178 : ///
1179 : /// ```
1180 : #[tokio::test]
1181 4 : async fn test_io_concurrency_noise() {
1182 4 : crate::tenant::harness::setup_logging();
1183 4 :
1184 4 : let io_concurrency = IoConcurrency::spawn_for_test();
1185 4 : match *io_concurrency {
1186 4 : IoConcurrency::Sequential => {
1187 4 : // This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
1188 4 : return;
1189 4 : }
1190 4 : IoConcurrency::SidecarTask { .. } => {}
1191 2 : }
1192 2 : let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
1193 2 :
1194 2 : let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
1195 2 : let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
1196 2 : let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
1197 2 :
1198 2 : let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
1199 2 : reconstruct_state
1200 2 : .spawn_io(async move {
1201 2 : info!("waiting for signal to complete IO");
1202 4 : io_fut_is_waiting_tx.send(()).unwrap();
1203 2 : should_complete_io.await.unwrap();
1204 2 : info!("completing IO");
1205 4 : io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
1206 2 : io_fut_exiting_tx.send(()).unwrap();
1207 2 : })
1208 2 : .await;
1209 4 :
1210 4 : io_fut_is_waiting.await.unwrap();
1211 2 :
1212 2 : // this is what makes the noise
1213 2 : drop(reconstruct_state);
1214 2 :
1215 2 : do_complete_io.send(()).unwrap();
1216 2 :
1217 2 : io_fut_exiting.await.unwrap();
1218 4 : }
1219 : }
|