            Source code
       1              : //! Common traits and structs for layers
       2              : 
       3              : pub mod batch_split_writer;
       4              : pub mod delta_layer;
       5              : pub mod filter_iterator;
       6              : pub mod image_layer;
       7              : pub mod inmemory_layer;
       8              : pub(crate) mod layer;
       9              : mod layer_desc;
      10              : mod layer_name;
      11              : pub mod merge_iterator;
      12              : 
      13              : use std::cmp::Ordering;
      14              : use std::collections::hash_map::Entry;
      15              : use std::collections::{BinaryHeap, HashMap};
      16              : use std::future::Future;
      17              : use std::ops::Range;
      18              : use std::pin::Pin;
      19              : use std::sync::Arc;
      20              : use std::sync::atomic::AtomicUsize;
      21              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
      22              : 
      23              : pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
      24              : use bytes::Bytes;
      25              : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
      26              : use futures::StreamExt;
      27              : use futures::stream::FuturesUnordered;
      28              : pub use image_layer::{ImageLayer, ImageLayerWriter};
      29              : pub use inmemory_layer::InMemoryLayer;
      30              : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
      31              : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
      32              : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
      33              : use pageserver_api::key::Key;
      34              : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
      35              : use pageserver_api::record::NeonWalRecord;
      36              : use pageserver_api::value::Value;
      37              : use tracing::{Instrument, trace};
      38              : use utils::lsn::Lsn;
      39              : use utils::sync::gate::GateGuard;
      40              : 
      41              : use self::inmemory_layer::InMemoryLayerFileId;
      42              : use super::PageReconstructError;
      43              : use super::layer_map::InMemoryLayerDesc;
      44              : use super::timeline::{GetVectoredError, ReadPath};
      45              : use crate::config::PageServerConf;
      46              : use crate::context::{AccessStatsBehavior, RequestContext};
      47              : 
      48            0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
      49            0 : where
      50            0 :     T: PartialOrd<T>,
      51            0 : {
      52            0 :     if a.start < b.start {
      53            0 :         a.end > b.start
      54              :     } else {
      55            0 :         b.end > a.start
      56              :     }
      57            0 : }
      58              : 
      59              : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
      60              : ///
      61              : /// Before first call, you can fill in 'page_img' if you have an older cached
      62              : /// version of the page available. That can save work in
      63              : /// 'get_value_reconstruct_data', as it can stop searching for page versions
      64              : /// when all the WAL records going back to the cached image have been collected.
      65              : ///
      66              : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
      67              : /// of the page, or the oldest WAL record in 'records' is a will_init-type
      68              : /// record that initializes the page without requiring a previous image.
      69              : ///
      70              : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
      71              : /// been collected, but there are more records outside the current layer. Pass
      72              : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
      73              : /// call, to collect more records.
      74              : ///
      75              : #[derive(Debug, Default)]
      76              : pub(crate) struct ValueReconstructState {
      77              :     pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
      78              :     pub(crate) img: Option<(Lsn, Bytes)>,
      79              : }
      80              : 
      81              : impl ValueReconstructState {
      82              :     /// Returns the number of page deltas applied to the page image.
      83      1335978 :     pub fn num_deltas(&self) -> usize {
      84      1335978 :         match self.img {
      85      1335882 :             Some(_) => self.records.len(),
      86           96 :             None => self.records.len() - 1, // omit will_init record
      87              :         }
      88      1335978 :     }
      89              : }
      90              : 
      91              : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
      92              : pub(crate) enum ValueReconstructSituation {
      93              :     Complete,
      94              :     #[default]
      95              :     Continue,
      96              : }
      97              : 
      98              : /// On disk representation of a value loaded in a buffer
      99              : #[derive(Debug)]
     100              : pub(crate) enum OnDiskValue {
     101              :     /// Unencoded [`Value::Image`]
     102              :     RawImage(Bytes),
     103              :     /// Encoded [`Value`]. Can deserialize into an image or a WAL record
     104              :     WalRecordOrImage(Bytes),
     105              : }
     106              : 
     107              : /// Reconstruct data accumulated for a single key during a vectored get
     108              : #[derive(Debug, Default)]
     109              : pub(crate) struct VectoredValueReconstructState {
     110              :     pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
     111              : 
     112              :     pub(crate) situation: ValueReconstructSituation,
     113              : }
     114              : 
     115              : #[derive(Debug)]
     116              : pub(crate) struct OnDiskValueIoWaiter {
     117              :     rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
     118              : }
     119              : 
     120              : #[derive(Debug)]
     121              : #[must_use]
     122              : pub(crate) enum OnDiskValueIo {
     123              :     /// Traversal identified this IO as required to complete the vectored get.
     124              :     Required {
     125              :         num_active_ios: Arc<AtomicUsize>,
     126              :         tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
     127              :     },
     128              :     /// Sparse keyspace reads always read all the values for a given key,
     129              :     /// even though only the first value is needed.
     130              :     ///
     131              :     /// This variant represents the unnecessary IOs for those values at lower LSNs
     132              :     /// that aren't needed, but are currently still being done.
     133              :     ///
     134              :     /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
     135              :     /// We added this explicit representation here so that we can drop
     136              :     /// unnecessary IO results immediately, instead of buffering them in
     137              :     /// `oneshot` channels inside [`VectoredValueReconstructState`] until
     138              :     /// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
     139              :     Unnecessary,
     140              : }
     141              : 
     142              : type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
     143              : 
     144              : impl OnDiskValueIo {
     145      1482487 :     pub(crate) fn complete(self, res: OnDiskValueIoResult) {
     146      1482487 :         match self {
     147      1337504 :             OnDiskValueIo::Required { num_active_ios, tx } => {
     148      1337504 :                 num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
     149      1337504 :                 let _ = tx.send(res);
     150      1337504 :             }
     151       144983 :             OnDiskValueIo::Unnecessary => {
     152       144983 :                 // Nobody cared, see variant doc comment.
     153       144983 :             }
     154              :         }
     155      1482487 :     }
     156              : }
     157              : 
     158              : #[derive(Debug, thiserror::Error)]
     159              : pub(crate) enum WaitCompletionError {
     160              :     #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
     161              :     IoDropped,
     162              : }
     163              : 
     164              : impl OnDiskValueIoWaiter {
     165      1337502 :     pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
     166      1337502 :         // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
     167      1337502 :         self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
     168      1337502 :     }
     169              : }
     170              : 
     171              : impl VectoredValueReconstructState {
     172              :     /// # Cancel-Safety
     173              :     ///
     174              :     /// Technically fine to stop polling this future, but, the IOs will still
     175              :     /// be executed to completion by the sidecar task and hold on to / consume resources.
     176              :     /// Better not do it to make reasonsing about the system easier.
     177      1336118 :     pub(crate) async fn collect_pending_ios(
     178      1336118 :         self,
     179      1336118 :     ) -> Result<ValueReconstructState, PageReconstructError> {
     180              :         use utils::bin_ser::BeSer;
     181              : 
     182      1336118 :         let mut res = Ok(ValueReconstructState::default());
     183              : 
     184              :         // We should try hard not to bail early, so that by the time we return from this
     185              :         // function, all IO for this value is done. It's not required -- we could totally
     186              :         // stop polling the IO futures in the sidecar task, they need to support that,
     187              :         // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
     188              :         // to reason about the system if we just wait for all IO to complete, even if
     189              :         // we're no longer interested in the result.
     190              :         //
     191              :         // Revisit this when IO futures are replaced with a more sophisticated IO system
     192              :         // and an IO scheduler, where we know which IOs were submitted and which ones
     193              :         // just queued. Cf the comment on IoConcurrency::spawn_io.
     194      2673620 :         for (lsn, waiter) in self.on_disk_values {
     195      1337502 :             let value_recv_res = waiter
     196      1337502 :                 .wait_completion()
     197      1337502 :                 // we rely on the caller to poll us to completion, so this is not a bail point
     198      1337502 :                 .await;
     199              :             // Force not bailing early by wrapping the code into a closure.
     200              :             #[allow(clippy::redundant_closure_call)]
     201      1337502 :             let _: () = (|| {
     202      1337502 :                 match (&mut res, value_recv_res) {
     203            0 :                     (Err(_), _) => {
     204            0 :                         // We've already failed, no need to process more.
     205            0 :                     }
     206            0 :                     (Ok(_), Err(wait_err)) => {
     207            0 :                         // This shouldn't happen - likely the sidecar task panicked.
     208            0 :                         res = Err(PageReconstructError::Other(wait_err.into()));
     209            0 :                     }
     210            0 :                     (Ok(_), Ok(Err(err))) => {
     211            0 :                         let err: std::io::Error = err;
     212            0 :                         // TODO: returning IO error here will fail a compute query.
     213            0 :                         // Probably not what we want, we're not doing `maybe_fatal_err`
     214            0 :                         // in the IO futures.
     215            0 :                         // But it's been like that for a long time, not changing it
     216            0 :                         // as part of concurrent IO.
     217            0 :                         // =>
     218            0 :                         res = Err(PageReconstructError::Other(err.into()));
     219            0 :                     }
     220        38217 :                     (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
     221        38217 :                         assert!(ok.img.is_none());
     222        38217 :                         ok.img = Some((lsn, img));
     223              :                     }
     224      1299285 :                     (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
     225      1299285 :                         match Value::des(&buf) {
     226         1480 :                             Ok(Value::WalRecord(rec)) => {
     227         1480 :                                 ok.records.push((lsn, rec));
     228         1480 :                             }
     229      1297805 :                             Ok(Value::Image(img)) => {
     230      1297805 :                                 assert!(ok.img.is_none());
     231      1297805 :                                 ok.img = Some((lsn, img));
     232              :                             }
     233            0 :                             Err(err) => {
     234            0 :                                 res = Err(PageReconstructError::Other(err.into()));
     235            0 :                             }
     236              :                         }
     237              :                     }
     238              :                 }
     239      1337502 :             })();
     240      1337502 :         }
     241              : 
     242      1336118 :         res
     243      1336118 :     }
     244              : }
     245              : 
     246              : /// Bag of data accumulated during a vectored get..
     247              : pub(crate) struct ValuesReconstructState {
     248              :     /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
     249              :     /// should not expect to get anything from this hashmap.
     250              :     pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
     251              :     /// The keys which are already retrieved
     252              :     keys_done: KeySpaceRandomAccum,
     253              : 
     254              :     /// The keys covered by the image layers
     255              :     keys_with_image_coverage: Option<Range<Key>>,
     256              : 
     257              :     // Statistics that are still accessible as a caller of `get_vectored_impl`.
     258              :     layers_visited: u32,
     259              :     delta_layers_visited: u32,
     260              : 
     261              :     pub(crate) io_concurrency: IoConcurrency,
     262              :     num_active_ios: Arc<AtomicUsize>,
     263              : 
     264              :     pub(crate) read_path: Option<ReadPath>,
     265              : }
     266              : 
     267              : /// The level of IO concurrency to be used on the read path
     268              : ///
     269              : /// The desired end state is that we always do parallel IO.
     270              : /// This struct and the dispatching in the impl will be removed once
     271              : /// we've built enough confidence.
     272              : pub(crate) enum IoConcurrency {
     273              :     Sequential,
     274              :     SidecarTask {
     275              :         task_id: usize,
     276              :         ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
     277              :     },
     278              : }
     279              : 
     280              : type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
     281              : 
     282              : pub(crate) enum SelectedIoConcurrency {
     283              :     Sequential,
     284              :     SidecarTask(GateGuard),
     285              : }
     286              : 
     287              : impl std::fmt::Debug for IoConcurrency {
     288            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     289            0 :         match self {
     290            0 :             IoConcurrency::Sequential => write!(f, "Sequential"),
     291            0 :             IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
     292              :         }
     293            0 :     }
     294              : }
     295              : 
     296              : impl std::fmt::Debug for SelectedIoConcurrency {
     297           64 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     298           64 :         match self {
     299           32 :             SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
     300           32 :             SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
     301              :         }
     302           64 :     }
     303              : }
     304              : 
     305              : impl IoConcurrency {
     306              :     /// Force sequential IO. This is a temporary workaround until we have
     307              :     /// moved plumbing-through-the-call-stack
     308              :     /// of IoConcurrency into `RequestContextq.
     309              :     ///
     310              :     /// DO NOT USE for new code.
     311              :     ///
     312              :     /// Tracking issue: <>.
     313      1215138 :     pub(crate) fn sequential() -> Self {
     314      1215138 :         Self::spawn(SelectedIoConcurrency::Sequential)
     315      1215138 :     }
     316              : 
     317         1036 :     pub(crate) fn spawn_from_conf(
     318         1036 :         conf: &'static PageServerConf,
     319         1036 :         gate_guard: GateGuard,
     320         1036 :     ) -> IoConcurrency {
     321              :         use pageserver_api::config::GetVectoredConcurrentIo;
     322         1036 :         let selected = match conf.get_vectored_concurrent_io {
     323         1036 :             GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
     324            0 :             GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
     325              :         };
     326         1036 :         Self::spawn(selected)
     327         1036 :     }
     328              : 
     329      1216238 :     pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
     330      1216238 :         match io_concurrency {
     331      1216206 :             SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
     332           32 :             SelectedIoConcurrency::SidecarTask(gate_guard) => {
     333           32 :                 let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
     334              :                 static TASK_ID: AtomicUsize = AtomicUsize::new(0);
     335           32 :                 let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     336              :                 // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
     337           32 :                 let span =
     338           32 :                     tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
     339           32 :                 trace!(task_id, "spawning sidecar task");
     340           32 :                 tokio::spawn(async move {
     341           32 :                     trace!("start");
     342           32 :                     scopeguard::defer!{ trace!("end") };
     343              :                     type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
     344              :                     enum State {
     345              :                         Waiting {
     346              :                             // invariant: is_empty(), but we recycle the allocation
     347              :                             empty_futures: FuturesUnordered<IoFuture>,
     348              :                             ios_rx: IosRx,
     349              :                         },
     350              :                         Executing {
     351              :                             futures: FuturesUnordered<IoFuture>,
     352              :                             ios_rx: IosRx,
     353              :                         },
     354              :                         ShuttingDown {
     355              :                             futures: FuturesUnordered<IoFuture>,
     356              :                         },
     357              :                     }
     358           32 :                     let mut state = State::Waiting {
     359           32 :                         empty_futures: FuturesUnordered::new(),
     360           32 :                         ios_rx,
     361           32 :                     };
     362              :                     loop {
     363        39110 :                         match state {
     364              :                             State::Waiting {
     365        18563 :                                 empty_futures,
     366        18563 :                                 mut ios_rx,
     367        18563 :                             } => {
     368        18563 :                                 assert!(empty_futures.is_empty());
     369        18563 :                                 tokio::select! {
     370        18563 :                                     fut = ios_rx.recv() => {
     371        18531 :                                         if let Some(fut) = fut {
     372        18531 :                                             trace!("received new io future");
     373        18531 :                                             empty_futures.push(fut);
     374        18531 :                                             state = State::Executing { futures: empty_futures, ios_rx };
     375              :                                         } else {
     376            0 :                                             state = State::ShuttingDown { futures: empty_futures }
     377              :                                         }
     378              :                                     }
     379              :                                 }
     380              :                             }
     381              :                             State::Executing {
     382        20547 :                                 mut futures,
     383        20547 :                                 mut ios_rx,
     384        20547 :                             } => {
     385        20547 :                                 tokio::select! {
     386        20547 :                                     res = => {
     387        19539 :                                         trace!("io future completed");
     388        19539 :                                         assert!(res.is_some());
     389        19539 :                                         if futures.is_empty() {
     390        18531 :                                             state = State::Waiting { empty_futures: futures, ios_rx};
     391        18531 :                                         } else {
     392         1008 :                                             state = State::Executing { futures, ios_rx };
     393         1008 :                                         }
     394              :                                     }
     395        20547 :                                     fut = ios_rx.recv() => {
     396         1008 :                                         if let Some(fut) = fut {
     397         1008 :                                             trace!("received new io future");
     398         1008 :                                             futures.push(fut);
     399         1008 :                                             state =  State::Executing { futures, ios_rx};
     400            0 :                                         } else {
     401            0 :                                             state = State::ShuttingDown { futures };
     402            0 :                                         }
     403              :                                     }
     404              :                                 }
     405              :                             }
     406              :                             State::ShuttingDown {
     407            0 :                                 mut futures,
     408            0 :                             } => {
     409            0 :                                 trace!("shutting down");
     410            0 :                                 while let Some(()) = {
     411            0 :                                     trace!("io future completed (shutdown)");
     412              :                                     // drain
     413              :                                 }
     414            0 :                                 trace!("shutdown complete");
     415            0 :                                 break;
     416            0 :                             }
     417            0 :                         }
     418            0 :                     }
     419            0 :                     drop(gate_guard); // drop it right before we exit
     420           32 :                 }.instrument(span));
     421           32 :                 IoConcurrency::SidecarTask { task_id, ios_tx }
     422              :             }
     423              :         }
     424      1216238 :     }
     425              : 
     426        76410 :     pub(crate) fn clone(&self) -> Self {
     427        76410 :         match self {
     428        39532 :             IoConcurrency::Sequential => IoConcurrency::Sequential,
     429        36878 :             IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
     430        36878 :                 task_id: *task_id,
     431        36878 :                 ios_tx: ios_tx.clone(),
     432        36878 :             },
     433              :         }
     434        76410 :     }
     435              : 
     436              :     /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
     437              :     ///
     438              :     /// The IO is represented as an opaque future.
     439              :     /// IO completion must be handled inside the future, e.g., through a oneshot channel.
     440              :     ///
     441              :     /// The API seems simple but there are multiple **pitfalls** involving
     442              :     /// DEADLOCK RISK.
     443              :     ///
     444              :     /// First, there are no guarantees about the exexecution of the IO.
     445              :     /// It may be `await`ed in-place before this function returns.
     446              :     /// It may be polled partially by this task and handed off to another task to be finished.
     447              :     /// It may be polled and then dropped before returning ready.
     448              :     ///
     449              :     /// This means that submitted IOs must not be interedependent.
     450              :     /// Interdependence may be through shared limited resources, e.g.,
     451              :     /// - VirtualFile file descriptor cache slot acquisition
     452              :     /// - tokio-epoll-uring slot
     453              :     ///
     454              :     /// # Why current usage is safe from deadlocks
     455              :     ///
     456              :     /// Textbook condition for a deadlock is that _all_ of the following be given
     457              :     /// - Mutual exclusion
     458              :     /// - Hold and wait
     459              :     /// - No preemption
     460              :     /// - Circular wait
     461              :     ///
     462              :     /// The current usage is safe because:
     463              :     /// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
     464              :     /// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
     465              :     ///   for acquisition of other resources:
     466              :     ///    - VirtualFile file descriptor cache slot tokio mutex
     467              :     ///    - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
     468              :     /// - No preemption: there's no taking-away of acquired locks/resources => given
     469              :     /// - Circular wait: this is the part of the condition that isn't met: all IO futures
     470              :     ///   first acquire VirtualFile mutex, then tokio-epoll-uring slot.
     471              :     ///   There is no IO future that acquires slot before VirtualFile.
     472              :     ///   Hence there can be no circular waiting.
     473              :     ///   Hence there cannot be a deadlock.
     474              :     ///
     475              :     /// This is a very fragile situation and must be revisited whenver any code called from
     476              :     /// inside the IO futures is changed.
     477              :     ///
     478              :     /// We will move away from opaque IO futures towards well-defined IOs at some point in
     479              :     /// the future when we have shipped this first version of concurrent IO to production
     480              :     /// and are ready to retire the Sequential mode which runs the futures in place.
     481              :     /// Right now, while brittle, the opaque IO approach allows us to ship the feature
     482              :     /// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
     483              :     ///
     484              :     /// Also read the comment in `collect_pending_ios`.
     485      1525669 :     pub(crate) async fn spawn_io<F>(&mut self, fut: F)
     486      1525669 :     where
     487      1525669 :         F: std::future::Future<Output = ()> + Send + 'static,
     488      1525669 :     {
     489      1525669 :         match self {
     490      1506130 :             IoConcurrency::Sequential => fut.await,
     491        19539 :             IoConcurrency::SidecarTask { ios_tx, .. } => {
     492        19539 :                 let fut = Box::pin(fut);
     493        19539 :                 // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
     494        19539 :                 // while insignificant for latency.
     495        19539 :                 // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
     496        19539 :                 // a submission here, but never poll the future. That way, io_uring can make proccess while
     497        19539 :                 // the future sits in the ios_tx queue.
     498        19539 :                 match ios_tx.send(fut) {
     499        19539 :                     Ok(()) => {}
     500              :                     Err(_) => {
     501            0 :                         unreachable!("the io task must have exited, likely it panicked")
     502              :                     }
     503              :                 }
     504              :             }
     505              :         }
     506      1525669 :     }
     507              : 
     508              :     #[cfg(test)]
     509           64 :     pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
     510              :         use std::ops::{Deref, DerefMut};
     511              : 
     512              :         use tracing::info;
     513              :         use utils::sync::gate::Gate;
     514              : 
     515              :         // Spawn needs a Gate, give it one.
     516              :         struct Wrapper {
     517              :             inner: IoConcurrency,
     518              :             #[allow(dead_code)]
     519              :             gate: Box<Gate>,
     520              :         }
     521              :         impl Deref for Wrapper {
     522              :             type Target = IoConcurrency;
     523              : 
     524        36974 :             fn deref(&self) -> &Self::Target {
     525        36974 :                 &self.inner
     526        36974 :             }
     527              :         }
     528              :         impl DerefMut for Wrapper {
     529            0 :             fn deref_mut(&mut self) -> &mut Self::Target {
     530            0 :                 &mut self.inner
     531            0 :             }
     532              :         }
     533           64 :         let gate = Box::new(Gate::default());
     534              : 
     535              :         // The default behavior when running Rust unit tests without any further
     536              :         // flags is to use the new behavior.
     537              :         // The CI uses the following environment variable to unit test both old
     538              :         // and new behavior.
     539              :         // NB: the Python regression & perf tests take the `else` branch
     540              :         // below and have their own defaults management.
     541           64 :         let selected = {
     542              :             // The pageserver_api::config type is unsuitable because it's internally tagged.
     543           64 :             #[derive(serde::Deserialize)]
     544              :             #[serde(rename_all = "kebab-case")]
     545              :             enum TestOverride {
     546              :                 Sequential,
     547              :                 SidecarTask,
     548              :             }
     549              :             use once_cell::sync::Lazy;
     550           64 :             static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
     551           64 :                 utils::env::var_serde_json_string(
     552           64 :                     "NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
     553           64 :                 )
     554           64 :                 .unwrap_or(TestOverride::SidecarTask)
     555           64 :             });
     556              : 
     557           64 :             match *TEST_OVERRIDE {
     558           32 :                 TestOverride::Sequential => SelectedIoConcurrency::Sequential,
     559              :                 TestOverride::SidecarTask => {
     560           32 :                     SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
     561              :                 }
     562              :             }
     563              :         };
     564              : 
     565           64 :         info!(?selected, "get_vectored_concurrent_io test");
     566              : 
     567           64 :         Wrapper {
     568           64 :             inner: Self::spawn(selected),
     569           64 :             gate,
     570           64 :         }
     571           64 :     }
     572              : }
     573              : 
     574              : /// Make noise in case the [`ValuesReconstructState`] gets dropped while
     575              : /// there are still IOs in flight.
     576              : /// Refer to `collect_pending_ios` for why we prefer not to do that.
     577              : //
     578              : /// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
     579              : /// gets dropped in a tracing span with more context.
     580              : /// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
     581              : /// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
     582              : impl Drop for ValuesReconstructState {
     583      1255332 :     fn drop(&mut self) {
     584      1255332 :         let num_active_ios = self
     585      1255332 :             .num_active_ios
     586      1255332 :             .load(std::sync::atomic::Ordering::Acquire);
     587      1255332 :         if num_active_ios == 0 {
     588      1255330 :             return;
     589            2 :         }
     590            2 :         let sidecar_task_id = match &self.io_concurrency {
     591            0 :             IoConcurrency::Sequential => None,
     592            2 :             IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
     593              :         };
     594            2 :         tracing::warn!(
     595              :             num_active_ios,
     596              :             ?sidecar_task_id,
     597            0 :             backtrace=%std::backtrace::Backtrace::force_capture(),
     598            0 :             "dropping ValuesReconstructState while some IOs have not been completed",
     599              :         );
     600      1255332 :     }
     601              : }
     602              : 
     603              : impl ValuesReconstructState {
     604      1255332 :     pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
     605      1255332 :         Self {
     606      1255332 :             keys: HashMap::new(),
     607      1255332 :             keys_done: KeySpaceRandomAccum::new(),
     608      1255332 :             keys_with_image_coverage: None,
     609      1255332 :             layers_visited: 0,
     610      1255332 :             delta_layers_visited: 0,
     611      1255332 :             io_concurrency,
     612      1255332 :             num_active_ios: Arc::new(AtomicUsize::new(0)),
     613      1255332 :             read_path: None,
     614      1255332 :         }
     615      1255332 :     }
     616              : 
     617              :     /// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
     618      1525669 :     pub(crate) async fn spawn_io<F>(&mut self, fut: F)
     619      1525669 :     where
     620      1525669 :         F: std::future::Future<Output = ()> + Send + 'static,
     621      1525669 :     {
     622      1525669 :         self.io_concurrency.spawn_io(fut).await;
     623      1525669 :     }
     624              : 
     625      1693718 :     pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
     626      1693718 :         self.layers_visited += 1;
     627      1693718 :         if let ReadableLayer::PersistentLayer(layer) = layer {
     628       480346 :             if layer.layer_desc().is_delta() {
     629       435246 :                 self.delta_layers_visited += 1;
     630       435246 :             }
     631      1213372 :         }
     632      1693718 :     }
     633              : 
     634          468 :     pub(crate) fn get_delta_layers_visited(&self) -> u32 {
     635          468 :         self.delta_layers_visited
     636          468 :     }
     637              : 
     638      1255274 :     pub(crate) fn get_layers_visited(&self) -> u32 {
     639      1255274 :         self.layers_visited
     640      1255274 :     }
     641              : 
     642              :     /// On hitting image layer, we can mark all keys in this range as done, because
     643              :     /// if the image layer does not contain a key, it is deleted/never added.
     644        45124 :     pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
     645        45124 :         let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
     646        45124 :         assert_eq!(
     647              :             prev_val, None,
     648            0 :             "should consume the keyspace before the next iteration"
     649              :         );
     650        45124 :     }
     651              : 
     652              :     /// Update the state collected for a given key.
     653              :     /// Returns true if this was the last value needed for the key and false otherwise.
     654              :     ///
     655              :     /// If the key is done after the update, mark it as such.
     656              :     ///
     657              :     /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
     658              :     /// `key_done`.
     659              :     // TODO: rename this method & update description.
     660      1482487 :     pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
     661      1482487 :         let state = self.keys.entry(*key).or_default();
     662      1482487 : 
     663      1482487 :         let is_sparse_key = key.is_sparse();
     664              : 
     665      1482487 :         let required_io = match state.situation {
     666              :             ValueReconstructSituation::Complete => {
     667       144983 :                 if is_sparse_key {
     668              :                     // Sparse keyspace might be visited multiple times because
     669              :                     // we don't track unmapped keyspaces.
     670       144983 :                     return OnDiskValueIo::Unnecessary;
     671              :                 } else {
     672            0 :                     unreachable!()
     673              :                 }
     674              :             }
     675              :             ValueReconstructSituation::Continue => {
     676      1337504 :                 self.num_active_ios
     677      1337504 :                     .fetch_add(1, std::sync::atomic::Ordering::Release);
     678      1337504 :                 let (tx, rx) = tokio::sync::oneshot::channel();
     679      1337504 :                 state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
     680      1337504 :                 OnDiskValueIo::Required {
     681      1337504 :                     tx,
     682      1337504 :                     num_active_ios: Arc::clone(&self.num_active_ios),
     683      1337504 :                 }
     684      1337504 :             }
     685      1337504 :         };
     686      1337504 : 
     687      1337504 :         if completes && state.situation == ValueReconstructSituation::Continue {
     688      1336120 :             state.situation = ValueReconstructSituation::Complete;
     689      1336120 :             if !is_sparse_key {
     690      1208604 :                 self.keys_done.add_key(*key);
     691      1208604 :             }
     692         1384 :         }
     693              : 
     694      1337504 :         required_io
     695      1482487 :     }
     696              : 
     697              :     /// Returns the key space describing the keys that have
     698              :     /// been marked as completed since the last call to this function.
     699              :     /// Returns individual keys done, and the image layer coverage.
     700      3397353 :     pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
     701      3397353 :         (
     702      3397353 :             self.keys_done.consume_keyspace(),
     703      3397353 :             self.keys_with_image_coverage.take(),
     704      3397353 :         )
     705      3397353 :     }
     706              : }
     707              : 
     708              : /// A key that uniquely identifies a layer in a timeline
     709              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     710              : pub(crate) enum LayerId {
     711              :     PersitentLayerId(PersistentLayerKey),
     712              :     InMemoryLayerId(InMemoryLayerFileId),
     713              : }
     714              : 
     715              : /// Uniquely identify a layer visit by the layer
     716              : /// and LSN floor (or start LSN) of the reads.
     717              : /// The layer itself is not enough since we may
     718              : /// have different LSN lower bounds for delta layer reads.
     719              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     720              : struct LayerToVisitId {
     721              :     layer_id: LayerId,
     722              :     lsn_floor: Lsn,
     723              : }
     724              : 
     725              : #[derive(Debug, PartialEq, Eq, Hash)]
     726              : pub enum ReadableLayerWeak {
     727              :     PersistentLayer(Arc<PersistentLayerDesc>),
     728              :     InMemoryLayer(InMemoryLayerDesc),
     729              : }
     730              : 
     731              : /// Layer wrapper for the read path. Note that it is valid
     732              : /// to use these layers even after external operations have
     733              : /// been performed on them (compaction, freeze, etc.).
     734              : #[derive(Debug)]
     735              : pub(crate) enum ReadableLayer {
     736              :     PersistentLayer(Layer),
     737              :     InMemoryLayer(Arc<InMemoryLayer>),
     738              : }
     739              : 
     740              : /// A partial description of a read to be done.
     741              : #[derive(Debug, Clone)]
     742              : struct LayerVisit {
     743              :     /// An id used to resolve the readable layer within the fringe
     744              :     layer_to_visit_id: LayerToVisitId,
     745              :     /// Lsn range for the read, used for selecting the next read
     746              :     lsn_range: Range<Lsn>,
     747              : }
     748              : 
     749              : /// Data structure which maintains a fringe of layers for the
     750              : /// read path. The fringe is the set of layers which intersects
     751              : /// the current keyspace that the search is descending on.
     752              : /// Each layer tracks the keyspace that intersects it.
     753              : ///
     754              : /// The fringe must appear sorted by Lsn. Hence, it uses
     755              : /// a two layer indexing scheme.
     756              : #[derive(Debug)]
     757              : pub(crate) struct LayerFringe {
     758              :     planned_visits_by_lsn: BinaryHeap<LayerVisit>,
     759              :     visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
     760              : }
     761              : 
     762              : #[derive(Debug)]
     763              : struct LayerVisitReads {
     764              :     layer: ReadableLayer,
     765              :     target_keyspace: KeySpaceRandomAccum,
     766              : }
     767              : 
     768              : impl LayerFringe {
     769      1703635 :     pub(crate) fn new() -> Self {
     770      1703635 :         LayerFringe {
     771      1703635 :             planned_visits_by_lsn: BinaryHeap::new(),
     772      1703635 :             visit_reads: HashMap::new(),
     773      1703635 :         }
     774      1703635 :     }
     775              : 
     776      3397353 :     pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
     777      3397353 :         let read_desc = self.planned_visits_by_lsn.pop()?;
     778              : 
     779      1693718 :         let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
     780      1693718 : 
     781      1693718 :         match removed {
     782              :             Some((
     783              :                 _,
     784              :                 LayerVisitReads {
     785      1693718 :                     layer,
     786      1693718 :                     mut target_keyspace,
     787      1693718 :                 },
     788      1693718 :             )) => Some((
     789      1693718 :                 layer,
     790      1693718 :                 target_keyspace.consume_keyspace(),
     791      1693718 :                 read_desc.lsn_range,
     792      1693718 :             )),
     793            0 :             None => unreachable!("fringe internals are always consistent"),
     794              :         }
     795      3397353 :     }
     796              : 
     797      1694654 :     pub(crate) fn update(
     798      1694654 :         &mut self,
     799      1694654 :         layer: ReadableLayer,
     800      1694654 :         keyspace: KeySpace,
     801      1694654 :         lsn_range: Range<Lsn>,
     802      1694654 :     ) {
     803      1694654 :         let layer_to_visit_id = LayerToVisitId {
     804      1694654 :             layer_id:,
     805      1694654 :             lsn_floor: lsn_range.start,
     806      1694654 :         };
     807      1694654 : 
     808      1694654 :         let entry = self.visit_reads.entry(layer_to_visit_id.clone());
     809      1694654 :         match entry {
     810          936 :             Entry::Occupied(mut entry) => {
     811          936 :                 entry.get_mut().target_keyspace.add_keyspace(keyspace);
     812          936 :             }
     813      1693718 :             Entry::Vacant(entry) => {
     814      1693718 :                 self.planned_visits_by_lsn.push(LayerVisit {
     815      1693718 :                     lsn_range,
     816      1693718 :                     layer_to_visit_id: layer_to_visit_id.clone(),
     817      1693718 :                 });
     818      1693718 :                 let mut accum = KeySpaceRandomAccum::new();
     819      1693718 :                 accum.add_keyspace(keyspace);
     820      1693718 :                 entry.insert(LayerVisitReads {
     821      1693718 :                     layer,
     822      1693718 :                     target_keyspace: accum,
     823      1693718 :                 });
     824      1693718 :             }
     825              :         }
     826      1694654 :     }
     827              : }
     828              : 
     829              : impl Default for LayerFringe {
     830            0 :     fn default() -> Self {
     831            0 :         Self::new()
     832            0 :     }
     833              : }
     834              : 
     835              : impl Ord for LayerVisit {
     836           72 :     fn cmp(&self, other: &Self) -> Ordering {
     837           72 :         let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
     838           72 :         if ord == std::cmp::Ordering::Equal {
     839           48 :             self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
     840              :         } else {
     841           24 :             ord
     842              :         }
     843           72 :     }
     844              : }
     845              : 
     846              : impl PartialOrd for LayerVisit {
     847           72 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     848           72 :         Some(self.cmp(other))
     849           72 :     }
     850              : }
     851              : 
     852              : impl PartialEq for LayerVisit {
     853            0 :     fn eq(&self, other: &Self) -> bool {
     854            0 :         self.lsn_range == other.lsn_range
     855            0 :     }
     856              : }
     857              : 
     858              : impl Eq for LayerVisit {}
     859              : 
     860              : impl ReadableLayer {
     861      1694654 :     pub(crate) fn id(&self) -> LayerId {
     862      1694654 :         match self {
     863       480442 :             Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
     864      1214212 :             Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
     865              :         }
     866      1694654 :     }
     867              : 
     868      1693718 :     pub(crate) async fn get_values_reconstruct_data(
     869      1693718 :         &self,
     870      1693718 :         keyspace: KeySpace,
     871      1693718 :         lsn_range: Range<Lsn>,
     872      1693718 :         reconstruct_state: &mut ValuesReconstructState,
     873      1693718 :         ctx: &RequestContext,
     874      1693718 :     ) -> Result<(), GetVectoredError> {
     875      1693718 :         match self {
     876       480346 :             ReadableLayer::PersistentLayer(layer) => {
     877       480346 :                 layer
     878       480346 :                     .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
     879       480346 :                     .await
     880              :             }
     881      1213372 :             ReadableLayer::InMemoryLayer(layer) => {
     882      1213372 :                 layer
     883      1213372 :                     .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
     884      1213372 :                     .await
     885              :             }
     886              :         }
     887      1693718 :     }
     888              : }
     889              : 
     890              : /// Layers contain a hint indicating whether they are likely to be used for reads.
     891              : ///
     892              : /// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
     893              : /// when changing the visibility of layers (for example when creating a branch that makes some previously
     894              : /// covered layers visible).  It should be used for cache management but not for correctness-critical checks.
     895              : #[derive(Debug, Clone, PartialEq, Eq)]
     896              : pub enum LayerVisibilityHint {
     897              :     /// A Visible layer might be read while serving a read, because there is not an image layer between it
     898              :     /// and a readable LSN (the tip of the branch or a child's branch point)
     899              :     Visible,
     900              :     /// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
     901              :     /// a branch or ephemeral endpoint at an LSN below the layer that covers this.
     902              :     Covered,
     903              : }
     904              : 
     905              : pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
     906              : 
     907            0 : #[derive(Clone, Copy, strum_macros::EnumString)]
     908              : pub(crate) enum LayerAccessStatsReset {
     909              :     NoReset,
     910              :     AllStats,
     911              : }
     912              : 
     913              : impl Default for LayerAccessStats {
     914         3864 :     fn default() -> Self {
     915         3864 :         // Default value is to assume resident since creation time, and visible.
     916         3864 :         let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
     917         3864 :         value |= 0x1 << Self::VISIBILITY_SHIFT;
     918         3864 : 
     919         3864 :         Self(std::sync::atomic::AtomicU64::new(value))
     920         3864 :     }
     921              : }
     922              : 
     923              : // Efficient store of two very-low-resolution timestamps and some bits.  Used for storing last access time and
     924              : // last residence change time.
     925              : impl LayerAccessStats {
     926              :     // How many high bits to drop from a u32 timestamp?
     927              :     // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
     928              :     //   after that, this software has been very successful!)
     929              :     // - Dropping the top bit is implicitly safe because unix timestamps are meant to be
     930              :     // stored in an i32, so they never used it.
     931              :     // - Dropping the next two bits is safe because this code is only running on systems in
     932              :     // years >= 2024, and these bits have been 1 since 2021
     933              :     //
     934              :     // Therefore we may store only 28 bits for a timestamp with one second resolution.  We do
     935              :     // this truncation to make space for some flags in the high bits of our u64.
     936              :     const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
     937              :     const TS_MASK: u32 = 0x1f_ff_ff_ff;
     938              :     const TS_ONES: u32 = 0x60_00_00_00;
     939              : 
     940              :     const ATIME_SHIFT: u32 = 0;
     941              :     const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
     942              :     const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
     943              : 
     944       480654 :     fn write_bits(&self, mask: u64, value: u64) -> u64 {
     945       480654 :         self.0
     946       480654 :             .fetch_update(
     947       480654 :                 // TODO: decide what orderings are correct
     948       480654 :                 std::sync::atomic::Ordering::Relaxed,
     949       480654 :                 std::sync::atomic::Ordering::Relaxed,
     950       480654 :                 |v| Some((v & !mask) | (value & mask)),
     951       480654 :             )
     952       480654 :             .expect("Inner function is infallible")
     953       480654 :     }
     954              : 
     955       483774 :     fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
     956       483774 :         // Drop the low three bits of the timestamp, for an ~8s accuracy
     957       483774 :         let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
     958       483774 : 
     959       483774 :         ((Self::TS_MASK as u64) << shift, timestamp << shift)
     960       483774 :     }
     961              : 
     962          292 :     fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
     963          292 :         let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
     964          292 : 
     965          292 :         let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
     966          292 :         if ts_bits == 0 {
     967          132 :             None
     968              :         } else {
     969          160 :             Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
     970              :         }
     971          292 :     }
     972              : 
     973              :     /// Record a change in layer residency.
     974              :     ///
     975              :     /// Recording the event must happen while holding the layer map lock to
     976              :     /// ensure that latest-activity-threshold-based layer eviction (
     977              :     /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
     978              :     ///
     979              :     /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
     980              :     /// the following race could happen:
     981              :     ///
     982              :     /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
     983              :     /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
     984              :     /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
     985              :     /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
     986          100 :     pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
     987          100 :         let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
     988          100 :         self.write_bits(mask, value);
     989          100 :     }
     990              : 
     991           96 :     pub(crate) fn record_residence_event(&self) {
     992           96 :         self.record_residence_event_at(SystemTime::now())
     993           96 :     }
     994              : 
     995       479810 :     fn record_access_at(&self, now: SystemTime) -> bool {
     996       479810 :         let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
     997       479810 : 
     998       479810 :         // A layer which is accessed must be visible.
     999       479810 :         mask |= 0x1 << Self::VISIBILITY_SHIFT;
    1000       479810 :         value |= 0x1 << Self::VISIBILITY_SHIFT;
    1001       479810 : 
    1002       479810 :         let old_bits = self.write_bits(mask, value);
    1003            4 :         !matches!(
    1004       479810 :             self.decode_visibility(old_bits),
    1005              :             LayerVisibilityHint::Visible
    1006              :         )
    1007       479810 :     }
    1008              : 
    1009              :     /// Returns true if we modified the layer's visibility to set it to Visible implicitly
    1010              :     /// as a result of this access
    1011       480370 :     pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
    1012       480370 :         if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
    1013          572 :             return false;
    1014       479798 :         }
    1015       479798 : 
    1016       479798 :         self.record_access_at(SystemTime::now())
    1017       480370 :     }
    1018              : 
    1019            0 :     fn as_api_model(
    1020            0 :         &self,
    1021            0 :         reset: LayerAccessStatsReset,
    1022            0 :     ) -> pageserver_api::models::LayerAccessStats {
    1023            0 :         let ret = pageserver_api::models::LayerAccessStats {
    1024            0 :             access_time: self
    1025            0 :                 .read_low_res_timestamp(Self::ATIME_SHIFT)
    1026            0 :                 .unwrap_or(UNIX_EPOCH),
    1027            0 :             residence_time: self
    1028            0 :                 .read_low_res_timestamp(Self::RTIME_SHIFT)
    1029            0 :                 .unwrap_or(UNIX_EPOCH),
    1030            0 :             visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
    1031              :         };
    1032            0 :         match reset {
    1033            0 :             LayerAccessStatsReset::NoReset => {}
    1034            0 :             LayerAccessStatsReset::AllStats => {
    1035            0 :                 self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
    1036            0 :                 self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
    1037            0 :             }
    1038              :         }
    1039            0 :         ret
    1040            0 :     }
    1041              : 
    1042              :     /// Get the latest access timestamp, falling back to latest residence event.  The latest residence event
    1043              :     /// will be this Layer's construction time, if its residence hasn't changed since then.
    1044           84 :     pub(crate) fn latest_activity(&self) -> SystemTime {
    1045           84 :         if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
    1046           12 :             t
    1047              :         } else {
    1048           72 :             self.read_low_res_timestamp(Self::RTIME_SHIFT)
    1049           72 :                 .expect("Residence time is set on construction")
    1050              :         }
    1051           84 :     }
    1052              : 
    1053              :     /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
    1054              :     ///
    1055              :     /// This indicates whether the layer has been used for some purpose that would motivate
    1056              :     /// us to keep it on disk, such as for serving a getpage request.
    1057           68 :     fn accessed(&self) -> bool {
    1058           68 :         // Consider it accessed if the most recent access is more recent than
    1059           68 :         // the most recent change in residence status.
    1060           68 :         match (
    1061           68 :             self.read_low_res_timestamp(Self::ATIME_SHIFT),
    1062           68 :             self.read_low_res_timestamp(Self::RTIME_SHIFT),
    1063              :         ) {
    1064           60 :             (None, _) => false,
    1065            0 :             (Some(_), None) => true,
    1066            8 :             (Some(a), Some(r)) => a >= r,
    1067              :         }
    1068           68 :     }
    1069              : 
    1070              :     /// Helper for extracting the visibility hint from the literal value of our inner u64
    1071       482148 :     fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
    1072       482148 :         match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
    1073       482100 :             1 => LayerVisibilityHint::Visible,
    1074           48 :             0 => LayerVisibilityHint::Covered,
    1075            0 :             _ => unreachable!(),
    1076              :         }
    1077       482148 :     }
    1078              : 
    1079              :     /// Returns the old value which has been replaced
    1080          744 :     pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
    1081          744 :         let value = match visibility {
    1082          640 :             LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
    1083          104 :             LayerVisibilityHint::Covered => 0x0,
    1084              :         };
    1085              : 
    1086          744 :         let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
    1087          744 :         self.decode_visibility(old_bits)
    1088          744 :     }
    1089              : 
    1090         1594 :     pub(crate) fn visibility(&self) -> LayerVisibilityHint {
    1091         1594 :         let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
    1092         1594 :         self.decode_visibility(read)
    1093         1594 :     }
    1094              : }
    1095              : 
    1096              : /// Get a layer descriptor from a layer.
    1097              : pub(crate) trait AsLayerDesc {
    1098              :     /// Get the layer descriptor.
    1099              :     fn layer_desc(&self) -> &PersistentLayerDesc;
    1100              : }
    1101              : 
    1102              : pub mod tests {
    1103              :     use pageserver_api::shard::TenantShardId;
    1104              :     use utils::id::TimelineId;
    1105              : 
    1106              :     use super::*;
    1107              : 
    1108              :     impl From<DeltaLayerName> for PersistentLayerDesc {
    1109           44 :         fn from(value: DeltaLayerName) -> Self {
    1110           44 :             PersistentLayerDesc::new_delta(
    1111           44 :                 TenantShardId::from([0; 18]),
    1112           44 :                 TimelineId::from_array([0; 16]),
    1113           44 :                 value.key_range,
    1114           44 :                 value.lsn_range,
    1115           44 :                 233,
    1116           44 :             )
    1117           44 :         }
    1118              :     }
    1119              : 
    1120              :     impl From<ImageLayerName> for PersistentLayerDesc {
    1121           48 :         fn from(value: ImageLayerName) -> Self {
    1122           48 :             PersistentLayerDesc::new_img(
    1123           48 :                 TenantShardId::from([0; 18]),
    1124           48 :                 TimelineId::from_array([0; 16]),
    1125           48 :                 value.key_range,
    1126           48 :                 value.lsn,
    1127           48 :                 233,
    1128           48 :             )
    1129           48 :         }
    1130              :     }
    1131              : 
    1132              :     impl From<LayerName> for PersistentLayerDesc {
    1133           92 :         fn from(value: LayerName) -> Self {
    1134           92 :             match value {
    1135           44 :                 LayerName::Delta(d) => Self::from(d),
    1136           48 :                 LayerName::Image(i) => Self::from(i),
    1137              :             }
    1138           92 :         }
    1139              :     }
    1140              : }
    1141              : 
    1142              : /// Range wrapping newtype, which uses display to render Debug.
    1143              : ///
    1144              : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
    1145              : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
    1146              : 
    1147              : impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
    1148            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    1149            0 :         write!(f, "{}..{}", self.0.start, self.0.end)
    1150            0 :     }
    1151              : }
    1152              : 
    1153              : #[cfg(test)]
    1154              : mod tests2 {
    1155              :     use pageserver_api::key::DBDIR_KEY;
    1156              :     use tracing::info;
    1157              : 
    1158              :     use super::*;
    1159              :     use crate::tenant::storage_layer::IoConcurrency;
    1160              : 
    1161              :     /// TODO: currently this test relies on manual visual inspection of the --no-capture output.
    1162              :     /// Should look like so:
    1163              :     /// ```text
    1164              :     /// RUST_LOG=trace cargo nextest run  --features testing  --no-capture test_io_concurrency_noise
    1165              :     /// running 1 test
    1166              :     /// 2025-01-21T17:42:01.335679Z  INFO get_vectored_concurrent_io test selected=SidecarTask
    1167              :     /// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
    1168              :     /// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
    1169              :     /// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
    1170              :     /// 2025-01-21T17:42:01.335999Z  INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
    1171              :     /// 2025-01-21T17:42:01.336229Z  WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace=   0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
    1172              :     ///              at ./src/tenant/
    1173              :     ///    1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
    1174              :     ///              at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/
    1175              :     ///    2: core::mem::drop
    1176              :     ///              at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/
    1177              :     ///    3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
    1178              :     ///              at ./src/tenant/
    1179              :     ///   ...
    1180              :     ///   49: <unknown>
    1181              :     /// 2025-01-21T17:42:01.452293Z  INFO IoConcurrency_sidecar{task_id=0}: completing IO
    1182              :     /// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
    1183              :     /// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
    1184              :     /// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
    1185              :     ///
    1186              :     /// ```
    1187              :     #[tokio::test]
    1188            4 :     async fn test_io_concurrency_noise() {
    1189            4 :         crate::tenant::harness::setup_logging();
    1190            4 : 
    1191            4 :         let io_concurrency = IoConcurrency::spawn_for_test();
    1192            4 :         match *io_concurrency {
    1193            4 :             IoConcurrency::Sequential => {
    1194            4 :                 // This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
    1195            4 :                 return;
    1196            4 :             }
    1197            4 :             IoConcurrency::SidecarTask { .. } => {}
    1198            2 :         }
    1199            2 :         let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
    1200            2 : 
    1201            2 :         let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
    1202            2 :         let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
    1203            2 :         let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
    1204            2 : 
    1205            2 :         let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
    1206            2 :         reconstruct_state
    1207            2 :             .spawn_io(async move {
    1208            2 :                 info!("waiting for signal to complete IO");
    1209            4 :                 io_fut_is_waiting_tx.send(()).unwrap();
    1210            2 :                 should_complete_io.await.unwrap();
    1211            2 :                 info!("completing IO");
    1212            4 :                 io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
    1213            2 :                 io_fut_exiting_tx.send(()).unwrap();
    1214            2 :             })
    1215            2 :             .await;
    1216            4 : 
    1217            4 :         io_fut_is_waiting.await.unwrap();
    1218            2 : 
    1219            2 :         // this is what makes the noise
    1220            2 :         drop(reconstruct_state);
    1221            2 : 
    1222            2 :         do_complete_io.send(()).unwrap();
    1223            2 : 
    1224            2 :         io_fut_exiting.await.unwrap();
    1225            4 :     }
    1226              : }

