LCOV - code coverage report
Current view: top level - pageserver/src/tenant - storage_layer.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 80.4 % 531 427
Test Date: 2025-07-16 12:29:03 Functions: 81.9 % 83 68

            Line data    Source code
       1              : //! Common traits and structs for layers
       2              : 
       3              : pub mod batch_split_writer;
       4              : pub mod delta_layer;
       5              : pub mod errors;
       6              : pub mod filter_iterator;
       7              : pub mod image_layer;
       8              : pub mod inmemory_layer;
       9              : pub(crate) mod layer;
      10              : mod layer_desc;
      11              : mod layer_name;
      12              : pub mod merge_iterator;
      13              : 
      14              : use std::cmp::Ordering;
      15              : use std::collections::hash_map::Entry;
      16              : use std::collections::{BinaryHeap, HashMap};
      17              : use std::ops::Range;
      18              : use std::pin::Pin;
      19              : use std::sync::Arc;
      20              : use std::sync::atomic::AtomicUsize;
      21              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
      22              : 
      23              : use crate::PERF_TRACE_TARGET;
      24              : pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
      25              : use bytes::Bytes;
      26              : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
      27              : use futures::StreamExt;
      28              : use futures::stream::FuturesUnordered;
      29              : pub use image_layer::{ImageLayer, ImageLayerWriter};
      30              : pub use inmemory_layer::InMemoryLayer;
      31              : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
      32              : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
      33              : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
      34              : use pageserver_api::config::GetVectoredConcurrentIo;
      35              : use pageserver_api::key::Key;
      36              : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
      37              : use tracing::{Instrument, info_span, trace};
      38              : use utils::lsn::Lsn;
      39              : use utils::sync::gate::GateGuard;
      40              : use wal_decoder::models::record::NeonWalRecord;
      41              : use wal_decoder::models::value::Value;
      42              : 
      43              : use self::inmemory_layer::InMemoryLayerFileId;
      44              : use super::PageReconstructError;
      45              : use super::layer_map::InMemoryLayerDesc;
      46              : use super::timeline::{GetVectoredError, ReadPath};
      47              : use crate::context::{
      48              :     AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
      49              : };
      50              : 
      51            0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
      52            0 : where
      53            0 :     T: PartialOrd<T>,
      54              : {
      55            0 :     if a.start < b.start {
      56            0 :         a.end > b.start
      57              :     } else {
      58            0 :         b.end > a.start
      59              :     }
      60            0 : }
      61              : 
      62              : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
      63              : ///
      64              : /// Before first call, you can fill in 'page_img' if you have an older cached
      65              : /// version of the page available. That can save work in
      66              : /// 'get_value_reconstruct_data', as it can stop searching for page versions
      67              : /// when all the WAL records going back to the cached image have been collected.
      68              : ///
      69              : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
      70              : /// of the page, or the oldest WAL record in 'records' is a will_init-type
      71              : /// record that initializes the page without requiring a previous image.
      72              : ///
      73              : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
      74              : /// been collected, but there are more records outside the current layer. Pass
      75              : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
      76              : /// call, to collect more records.
      77              : ///
      78              : #[derive(Debug, Default, Clone)]
      79              : pub(crate) struct ValueReconstructState {
      80              :     pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
      81              :     pub(crate) img: Option<(Lsn, Bytes)>,
      82              : }
      83              : 
      84              : impl ValueReconstructState {
      85              :     /// Returns the number of page deltas applied to the page image.
      86       727070 :     pub fn num_deltas(&self) -> usize {
      87       727070 :         match self.img {
      88       699602 :             Some(_) => self.records.len(),
      89        27468 :             None => self.records.len() - 1, // omit will_init record
      90              :         }
      91       727070 :     }
      92              : }
      93              : 
      94              : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
      95              : pub(crate) enum ValueReconstructSituation {
      96              :     Complete,
      97              :     #[default]
      98              :     Continue,
      99              : }
     100              : 
     101              : /// On disk representation of a value loaded in a buffer
     102              : #[derive(Debug)]
     103              : pub(crate) enum OnDiskValue {
     104              :     /// Unencoded [`Value::Image`]
     105              :     RawImage(Bytes),
     106              :     /// Encoded [`Value`]. Can deserialize into an image or a WAL record
     107              :     WalRecordOrImage(Bytes),
     108              : }
     109              : 
     110              : /// Reconstruct data accumulated for a single key during a vectored get
     111              : #[derive(Debug, Default)]
     112              : pub struct VectoredValueReconstructState {
     113              :     pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
     114              : 
     115              :     pub(crate) situation: ValueReconstructSituation,
     116              : }
     117              : 
     118              : #[derive(Debug)]
     119              : pub(crate) struct OnDiskValueIoWaiter {
     120              :     rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
     121              : }
     122              : 
     123              : #[derive(Debug)]
     124              : #[must_use]
     125              : pub(crate) enum OnDiskValueIo {
     126              :     /// Traversal identified this IO as required to complete the vectored get.
     127              :     Required {
     128              :         num_active_ios: Arc<AtomicUsize>,
     129              :         tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
     130              :     },
     131              :     /// Sparse keyspace reads always read all the values for a given key,
     132              :     /// even though only the first value is needed.
     133              :     ///
     134              :     /// This variant represents the unnecessary IOs for those values at lower LSNs
     135              :     /// that aren't needed, but are currently still being done.
     136              :     ///
     137              :     /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
     138              :     /// We added this explicit representation here so that we can drop
     139              :     /// unnecessary IO results immediately, instead of buffering them in
     140              :     /// `oneshot` channels inside [`VectoredValueReconstructState`] until
     141              :     /// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
     142              :     Unnecessary,
     143              : }
     144              : 
     145              : type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
     146              : 
     147              : impl OnDiskValueIo {
     148      1797835 :     pub(crate) fn complete(self, res: OnDiskValueIoResult) {
     149      1797835 :         match self {
     150      1753091 :             OnDiskValueIo::Required { num_active_ios, tx } => {
     151      1753091 :                 num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
     152      1753091 :                 let _ = tx.send(res);
     153      1753091 :             }
     154        44744 :             OnDiskValueIo::Unnecessary => {
     155        44744 :                 // Nobody cared, see variant doc comment.
     156        44744 :             }
     157              :         }
     158      1797835 :     }
     159              : }
     160              : 
     161              : #[derive(Debug, thiserror::Error)]
     162              : pub(crate) enum WaitCompletionError {
     163              :     #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
     164              :     IoDropped,
     165              : }
     166              : 
     167              : impl OnDiskValueIoWaiter {
     168      1753090 :     pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
     169              :         // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
     170      1753090 :         self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
     171      1753090 :     }
     172              : }
     173              : 
     174              : impl VectoredValueReconstructState {
     175              :     /// # Cancel-Safety
     176              :     ///
     177              :     /// Technically fine to stop polling this future, but, the IOs will still
     178              :     /// be executed to completion by the sidecar task and hold on to / consume resources.
     179              :     /// Better not do it to make reasonsing about the system easier.
     180       363570 :     pub(crate) async fn collect_pending_ios(
     181       363570 :         self,
     182       363570 :     ) -> Result<ValueReconstructState, PageReconstructError> {
     183              :         use utils::bin_ser::BeSer;
     184              : 
     185       363570 :         let mut res = Ok(ValueReconstructState::default());
     186              : 
     187              :         // We should try hard not to bail early, so that by the time we return from this
     188              :         // function, all IO for this value is done. It's not required -- we could totally
     189              :         // stop polling the IO futures in the sidecar task, they need to support that,
     190              :         // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
     191              :         // to reason about the system if we just wait for all IO to complete, even if
     192              :         // we're no longer interested in the result.
     193              :         //
     194              :         // Revisit this when IO futures are replaced with a more sophisticated IO system
     195              :         // and an IO scheduler, where we know which IOs were submitted and which ones
     196              :         // just queued. Cf the comment on IoConcurrency::spawn_io.
     197      2116660 :         for (lsn, waiter) in self.on_disk_values {
     198      1753090 :             let value_recv_res = waiter
     199      1753090 :                 .wait_completion()
     200      1753090 :                 // we rely on the caller to poll us to completion, so this is not a bail point
     201      1753090 :                 .await;
     202              :             // Force not bailing early by wrapping the code into a closure.
     203              :             #[allow(clippy::redundant_closure_call)]
     204      1753090 :             let _: () = (|| {
     205      1753090 :                 match (&mut res, value_recv_res) {
     206            0 :                     (Err(_), _) => {
     207            0 :                         // We've already failed, no need to process more.
     208            0 :                     }
     209            0 :                     (Ok(_), Err(wait_err)) => {
     210            0 :                         // This shouldn't happen - likely the sidecar task panicked.
     211            0 :                         res = Err(PageReconstructError::Other(wait_err.into()));
     212            0 :                     }
     213            0 :                     (Ok(_), Ok(Err(err))) => {
     214            0 :                         let err: std::io::Error = err;
     215            0 :                         // TODO: returning IO error here will fail a compute query.
     216            0 :                         // Probably not what we want, we're not doing `maybe_fatal_err`
     217            0 :                         // in the IO futures.
     218            0 :                         // But it's been like that for a long time, not changing it
     219            0 :                         // as part of concurrent IO.
     220            0 :                         // => https://github.com/neondatabase/neon/issues/10454
     221            0 :                         res = Err(PageReconstructError::Other(err.into()));
     222            0 :                     }
     223        23857 :                     (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
     224        23857 :                         assert!(ok.img.is_none());
     225        23857 :                         ok.img = Some((lsn, img));
     226              :                     }
     227      1729233 :                     (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
     228      1729233 :                         match Value::des(&buf) {
     229      1403254 :                             Ok(Value::WalRecord(rec)) => {
     230      1403254 :                                 ok.records.push((lsn, rec));
     231      1403254 :                             }
     232       325979 :                             Ok(Value::Image(img)) => {
     233       325979 :                                 assert!(ok.img.is_none());
     234       325979 :                                 ok.img = Some((lsn, img));
     235              :                             }
     236            0 :                             Err(err) => {
     237            0 :                                 res = Err(PageReconstructError::Other(err.into()));
     238            0 :                             }
     239              :                         }
     240              :                     }
     241              :                 }
     242              :             })();
     243              :         }
     244              : 
     245       363570 :         res
     246       363570 :     }
     247              : 
     248              :     /// Benchmarking utility to await for the completion of all pending ios
     249              :     ///
     250              :     /// # Cancel-Safety
     251              :     ///
     252              :     /// Technically fine to stop polling this future, but, the IOs will still
     253              :     /// be executed to completion by the sidecar task and hold on to / consume resources.
     254              :     /// Better not do it to make reasonsing about the system easier.
     255              :     #[cfg(feature = "benchmarking")]
     256              :     pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> {
     257              :         let mut res = Ok(());
     258              : 
     259              :         // We should try hard not to bail early, so that by the time we return from this
     260              :         // function, all IO for this value is done. It's not required -- we could totally
     261              :         // stop polling the IO futures in the sidecar task, they need to support that,
     262              :         // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
     263              :         // to reason about the system if we just wait for all IO to complete, even if
     264              :         // we're no longer interested in the result.
     265              :         //
     266              :         // Revisit this when IO futures are replaced with a more sophisticated IO system
     267              :         // and an IO scheduler, where we know which IOs were submitted and which ones
     268              :         // just queued. Cf the comment on IoConcurrency::spawn_io.
     269              :         for (_lsn, waiter) in self.on_disk_values {
     270              :             let value_recv_res = waiter
     271              :                 .wait_completion()
     272              :                 // we rely on the caller to poll us to completion, so this is not a bail point
     273              :                 .await;
     274              : 
     275              :             match (&mut res, value_recv_res) {
     276              :                 (Err(_), _) => {
     277              :                     // We've already failed, no need to process more.
     278              :                 }
     279              :                 (Ok(_), Err(_wait_err)) => {
     280              :                     // This shouldn't happen - likely the sidecar task panicked.
     281              :                     unreachable!();
     282              :                 }
     283              :                 (Ok(_), Ok(Err(err))) => {
     284              :                     let err: std::io::Error = err;
     285              :                     res = Err(err);
     286              :                 }
     287              :                 (Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {}
     288              :                 (Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {}
     289              :             }
     290              :         }
     291              : 
     292              :         res
     293              :     }
     294              : }
     295              : 
     296              : /// Bag of data accumulated during a vectored get..
     297              : pub struct ValuesReconstructState {
     298              :     /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
     299              :     /// should not expect to get anything from this hashmap.
     300              :     pub keys: HashMap<Key, VectoredValueReconstructState>,
     301              :     /// The keys which are already retrieved
     302              :     keys_done: KeySpaceRandomAccum,
     303              : 
     304              :     /// The keys covered by the image layers
     305              :     keys_with_image_coverage: Option<Range<Key>>,
     306              : 
     307              :     // Statistics that are still accessible as a caller of `get_vectored_impl`.
     308              :     layers_visited: u32,
     309              :     delta_layers_visited: u32,
     310              : 
     311              :     pub(crate) enable_debug: bool,
     312              :     pub(crate) debug_state: ValueReconstructState,
     313              : 
     314              :     pub(crate) io_concurrency: IoConcurrency,
     315              :     num_active_ios: Arc<AtomicUsize>,
     316              : 
     317              :     pub(crate) read_path: Option<ReadPath>,
     318              : }
     319              : 
     320              : /// The level of IO concurrency to be used on the read path
     321              : ///
     322              : /// The desired end state is that we always do parallel IO.
     323              : /// This struct and the dispatching in the impl will be removed once
     324              : /// we've built enough confidence.
     325              : pub enum IoConcurrency {
     326              :     Sequential,
     327              :     SidecarTask {
     328              :         task_id: usize,
     329              :         ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
     330              :     },
     331              : }
     332              : 
     333              : type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
     334              : 
     335              : pub(crate) enum SelectedIoConcurrency {
     336              :     Sequential,
     337              :     SidecarTask(GateGuard),
     338              : }
     339              : 
     340              : impl std::fmt::Debug for IoConcurrency {
     341            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     342            0 :         match self {
     343            0 :             IoConcurrency::Sequential => write!(f, "Sequential"),
     344            0 :             IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
     345              :         }
     346            0 :     }
     347              : }
     348              : 
     349              : impl std::fmt::Debug for SelectedIoConcurrency {
     350           17 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     351           17 :         match self {
     352            0 :             SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
     353           17 :             SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
     354              :         }
     355           17 :     }
     356              : }
     357              : 
     358              : impl IoConcurrency {
     359              :     /// Force sequential IO. This is a temporary workaround until we have
     360              :     /// moved plumbing-through-the-call-stack
     361              :     /// of IoConcurrency into `RequestContextq.
     362              :     ///
     363              :     /// DO NOT USE for new code.
     364              :     ///
     365              :     /// Tracking issue: <https://github.com/neondatabase/neon/issues/10460>.
     366       301278 :     pub(crate) fn sequential() -> Self {
     367       301278 :         Self::spawn(SelectedIoConcurrency::Sequential)
     368       301278 :     }
     369              : 
     370          321 :     pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency {
     371          321 :         let selected = match conf {
     372            0 :             GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
     373          321 :             GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
     374              :         };
     375          321 :         Self::spawn(selected)
     376          321 :     }
     377              : 
     378       301616 :     pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
     379       301616 :         match io_concurrency {
     380       301278 :             SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
     381          338 :             SelectedIoConcurrency::SidecarTask(gate_guard) => {
     382          338 :                 let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
     383              :                 static TASK_ID: AtomicUsize = AtomicUsize::new(0);
     384          338 :                 let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     385              :                 // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
     386          338 :                 let span =
     387          338 :                     tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
     388          338 :                 trace!(task_id, "spawning sidecar task");
     389          338 :                 tokio::spawn(async move {
     390          337 :                     trace!("start");
     391          337 :                     scopeguard::defer!{ trace!("end") };
     392              :                     type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
     393              :                     enum State {
     394              :                         Waiting {
     395              :                             // invariant: is_empty(), but we recycle the allocation
     396              :                             empty_futures: FuturesUnordered<IoFuture>,
     397              :                             ios_rx: IosRx,
     398              :                         },
     399              :                         Executing {
     400              :                             futures: FuturesUnordered<IoFuture>,
     401              :                             ios_rx: IosRx,
     402              :                         },
     403              :                         ShuttingDown {
     404              :                             futures: FuturesUnordered<IoFuture>,
     405              :                         },
     406              :                     }
     407          337 :                     let mut state = State::Waiting {
     408          337 :                         empty_futures: FuturesUnordered::new(),
     409          337 :                         ios_rx,
     410          337 :                     };
     411              :                     loop {
     412        22989 :                         match state {
     413              :                             State::Waiting {
     414        10405 :                                 empty_futures,
     415        10405 :                                 mut ios_rx,
     416              :                             } => {
     417        10405 :                                 assert!(empty_futures.is_empty());
     418        10405 :                                 tokio::select! {
     419        10405 :                                     fut = ios_rx.recv() => {
     420        10386 :                                         if let Some(fut) = fut {
     421        10146 :                                             trace!("received new io future");
     422        10146 :                                             empty_futures.push(fut);
     423        10146 :                                             state = State::Executing { futures: empty_futures, ios_rx };
     424              :                                         } else {
     425          240 :                                             state = State::ShuttingDown { futures: empty_futures }
     426              :                                         }
     427              :                                     }
     428              :                                 }
     429              :                             }
     430              :                             State::Executing {
     431        12266 :                                 mut futures,
     432        12266 :                                 mut ios_rx,
     433              :                             } => {
     434        12266 :                                 tokio::select! {
     435        12266 :                                     res = futures.next() => {
     436        11128 :                                         trace!("io future completed");
     437        11128 :                                         assert!(res.is_some());
     438        11128 :                                         if futures.is_empty() {
     439        10068 :                                             state = State::Waiting { empty_futures: futures, ios_rx};
     440        10068 :                                         } else {
     441         1060 :                                             state = State::Executing { futures, ios_rx };
     442         1060 :                                         }
     443              :                                     }
     444        12266 :                                     fut = ios_rx.recv() => {
     445         1138 :                                         if let Some(fut) = fut {
     446         1060 :                                             trace!("received new io future");
     447         1060 :                                             futures.push(fut);
     448         1060 :                                             state =  State::Executing { futures, ios_rx};
     449           78 :                                         } else {
     450           78 :                                             state = State::ShuttingDown { futures };
     451           78 :                                         }
     452              :                                     }
     453              :                                 }
     454              :                             }
     455              :                             State::ShuttingDown {
     456          318 :                                 mut futures,
     457              :                             } => {
     458          318 :                                 trace!("shutting down");
     459          396 :                                 while let Some(()) = futures.next().await {
     460           78 :                                     trace!("io future completed (shutdown)");
     461              :                                     // drain
     462              :                                 }
     463          318 :                                 trace!("shutdown complete");
     464          318 :                                 break;
     465              :                             }
     466              :                         }
     467              :                     }
     468          318 :                     drop(gate_guard); // drop it right before we exit
     469          338 :                 }.instrument(span));
     470          338 :                 IoConcurrency::SidecarTask { task_id, ios_tx }
     471              :             }
     472              :         }
     473       301616 :     }
     474              : 
     475              :     /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
     476              :     ///
     477              :     /// The IO is represented as an opaque future.
     478              :     /// IO completion must be handled inside the future, e.g., through a oneshot channel.
     479              :     ///
     480              :     /// The API seems simple but there are multiple **pitfalls** involving
     481              :     /// DEADLOCK RISK.
     482              :     ///
     483              :     /// First, there are no guarantees about the exexecution of the IO.
     484              :     /// It may be `await`ed in-place before this function returns.
     485              :     /// It may be polled partially by this task and handed off to another task to be finished.
     486              :     /// It may be polled and then dropped before returning ready.
     487              :     ///
     488              :     /// This means that submitted IOs must not be interedependent.
     489              :     /// Interdependence may be through shared limited resources, e.g.,
     490              :     /// - VirtualFile file descriptor cache slot acquisition
     491              :     /// - tokio-epoll-uring slot
     492              :     ///
     493              :     /// # Why current usage is safe from deadlocks
     494              :     ///
     495              :     /// Textbook condition for a deadlock is that _all_ of the following be given
     496              :     /// - Mutual exclusion
     497              :     /// - Hold and wait
     498              :     /// - No preemption
     499              :     /// - Circular wait
     500              :     ///
     501              :     /// The current usage is safe because:
     502              :     /// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
     503              :     /// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
     504              :     ///   for acquisition of other resources:
     505              :     ///    - VirtualFile file descriptor cache slot tokio mutex
     506              :     ///    - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
     507              :     /// - No preemption: there's no taking-away of acquired locks/resources => given
     508              :     /// - Circular wait: this is the part of the condition that isn't met: all IO futures
     509              :     ///   first acquire VirtualFile mutex, then tokio-epoll-uring slot.
     510              :     ///   There is no IO future that acquires slot before VirtualFile.
     511              :     ///   Hence there can be no circular waiting.
     512              :     ///   Hence there cannot be a deadlock.
     513              :     ///
     514              :     /// This is a very fragile situation and must be revisited whenver any code called from
     515              :     /// inside the IO futures is changed.
     516              :     ///
     517              :     /// We will move away from opaque IO futures towards well-defined IOs at some point in
     518              :     /// the future when we have shipped this first version of concurrent IO to production
     519              :     /// and are ready to retire the Sequential mode which runs the futures in place.
     520              :     /// Right now, while brittle, the opaque IO approach allows us to ship the feature
     521              :     /// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
     522              :     ///
     523              :     /// Also read the comment in `collect_pending_ios`.
     524       412408 :     pub(crate) async fn spawn_io<F>(&mut self, fut: F)
     525       412408 :     where
     526       412408 :         F: std::future::Future<Output = ()> + Send + 'static,
     527       412408 :     {
     528       412408 :         match self {
     529       401202 :             IoConcurrency::Sequential => fut.await,
     530        11206 :             IoConcurrency::SidecarTask { ios_tx, .. } => {
     531        11206 :                 let fut = Box::pin(fut);
     532              :                 // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
     533              :                 // while insignificant for latency.
     534              :                 // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
     535              :                 // a submission here, but never poll the future. That way, io_uring can make proccess while
     536              :                 // the future sits in the ios_tx queue.
     537        11206 :                 match ios_tx.send(fut) {
     538        11206 :                     Ok(()) => {}
     539              :                     Err(_) => {
     540            0 :                         unreachable!("the io task must have exited, likely it panicked")
     541              :                     }
     542              :                 }
     543              :             }
     544              :         }
     545       412408 :     }
     546              : 
     547              :     #[cfg(test)]
     548           17 :     pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
     549              :         use std::ops::{Deref, DerefMut};
     550              : 
     551              :         use tracing::info;
     552              :         use utils::sync::gate::Gate;
     553              : 
     554              :         // Spawn needs a Gate, give it one.
     555              :         struct Wrapper {
     556              :             inner: IoConcurrency,
     557              :             #[allow(dead_code)]
     558              :             gate: Box<Gate>,
     559              :         }
     560              :         impl Deref for Wrapper {
     561              :             type Target = IoConcurrency;
     562              : 
     563         9246 :             fn deref(&self) -> &Self::Target {
     564         9246 :                 &self.inner
     565         9246 :             }
     566              :         }
     567              :         impl DerefMut for Wrapper {
     568            0 :             fn deref_mut(&mut self) -> &mut Self::Target {
     569            0 :                 &mut self.inner
     570            0 :             }
     571              :         }
     572           17 :         let gate = Box::new(Gate::default());
     573              : 
     574              :         // The default behavior when running Rust unit tests without any further
     575              :         // flags is to use the new behavior.
     576              :         // The CI uses the following environment variable to unit test both old
     577              :         // and new behavior.
     578              :         // NB: the Python regression & perf tests take the `else` branch
     579              :         // below and have their own defaults management.
     580           17 :         let selected = {
     581              :             // The pageserver_api::config type is unsuitable because it's internally tagged.
     582            0 :             #[derive(serde::Deserialize)]
     583              :             #[serde(rename_all = "kebab-case")]
     584              :             enum TestOverride {
     585              :                 Sequential,
     586              :                 SidecarTask,
     587              :             }
     588              :             use once_cell::sync::Lazy;
     589           17 :             static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
     590           17 :                 utils::env::var_serde_json_string(
     591           17 :                     "NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
     592              :                 )
     593           17 :                 .unwrap_or(TestOverride::SidecarTask)
     594           17 :             });
     595              : 
     596           17 :             match *TEST_OVERRIDE {
     597            0 :                 TestOverride::Sequential => SelectedIoConcurrency::Sequential,
     598              :                 TestOverride::SidecarTask => {
     599           17 :                     SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
     600              :                 }
     601              :             }
     602              :         };
     603              : 
     604           17 :         info!(?selected, "get_vectored_concurrent_io test");
     605              : 
     606           17 :         Wrapper {
     607           17 :             inner: Self::spawn(selected),
     608           17 :             gate,
     609           17 :         }
     610           17 :     }
     611              : }
     612              : 
     613              : impl Clone for IoConcurrency {
     614        19141 :     fn clone(&self) -> Self {
     615        19141 :         match self {
     616            0 :             IoConcurrency::Sequential => IoConcurrency::Sequential,
     617        19141 :             IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
     618        19141 :                 task_id: *task_id,
     619        19141 :                 ios_tx: ios_tx.clone(),
     620        19141 :             },
     621              :         }
     622        19141 :     }
     623              : }
     624              : 
     625              : /// Make noise in case the [`ValuesReconstructState`] gets dropped while
     626              : /// there are still IOs in flight.
     627              : /// Refer to `collect_pending_ios` for why we prefer not to do that.
     628              : //
     629              : /// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
     630              : /// gets dropped in a tracing span with more context.
     631              : /// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
     632              : /// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
     633              : impl Drop for ValuesReconstructState {
     634       312421 :     fn drop(&mut self) {
     635       312421 :         let num_active_ios = self
     636       312421 :             .num_active_ios
     637       312421 :             .load(std::sync::atomic::Ordering::Acquire);
     638       312421 :         if num_active_ios == 0 {
     639       312420 :             return;
     640            1 :         }
     641            1 :         let sidecar_task_id = match &self.io_concurrency {
     642            0 :             IoConcurrency::Sequential => None,
     643            1 :             IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
     644              :         };
     645            1 :         tracing::warn!(
     646              :             num_active_ios,
     647              :             ?sidecar_task_id,
     648            0 :             backtrace=%std::backtrace::Backtrace::force_capture(),
     649            0 :             "dropping ValuesReconstructState while some IOs have not been completed",
     650              :         );
     651       312421 :     }
     652              : }
     653              : 
     654              : impl ValuesReconstructState {
     655       312421 :     pub fn new(io_concurrency: IoConcurrency) -> Self {
     656       312421 :         Self {
     657       312421 :             keys: HashMap::new(),
     658       312421 :             keys_done: KeySpaceRandomAccum::new(),
     659       312421 :             keys_with_image_coverage: None,
     660       312421 :             layers_visited: 0,
     661       312421 :             delta_layers_visited: 0,
     662       312421 :             io_concurrency,
     663       312421 :             enable_debug: false,
     664       312421 :             debug_state: ValueReconstructState::default(),
     665       312421 :             num_active_ios: Arc::new(AtomicUsize::new(0)),
     666       312421 :             read_path: None,
     667       312421 :         }
     668       312421 :     }
     669              : 
     670            0 :     pub(crate) fn new_with_debug(io_concurrency: IoConcurrency) -> Self {
     671            0 :         Self {
     672            0 :             keys: HashMap::new(),
     673            0 :             keys_done: KeySpaceRandomAccum::new(),
     674            0 :             keys_with_image_coverage: None,
     675            0 :             layers_visited: 0,
     676            0 :             delta_layers_visited: 0,
     677            0 :             io_concurrency,
     678            0 :             enable_debug: true,
     679            0 :             debug_state: ValueReconstructState::default(),
     680            0 :             num_active_ios: Arc::new(AtomicUsize::new(0)),
     681            0 :             read_path: None,
     682            0 :         }
     683            0 :     }
     684              : 
     685              :     /// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
     686       412408 :     pub(crate) async fn spawn_io<F>(&mut self, fut: F)
     687       412408 :     where
     688       412408 :         F: std::future::Future<Output = ()> + Send + 'static,
     689       412408 :     {
     690       412408 :         self.io_concurrency.spawn_io(fut).await;
     691       412408 :     }
     692              : 
     693            0 :     pub(crate) fn set_debug_state(&mut self, debug_state: &ValueReconstructState) {
     694            0 :         if self.enable_debug {
     695            0 :             self.debug_state = debug_state.clone();
     696            0 :         }
     697            0 :     }
     698              : 
     699       446162 :     pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
     700       446162 :         self.layers_visited += 1;
     701       446162 :         if let ReadableLayer::PersistentLayer(layer) = layer {
     702       138845 :             if layer.layer_desc().is_delta() {
     703       124723 :                 self.delta_layers_visited += 1;
     704       124723 :             }
     705       307317 :         }
     706       446162 :     }
     707              : 
     708          173 :     pub(crate) fn get_delta_layers_visited(&self) -> u32 {
     709          173 :         self.delta_layers_visited
     710          173 :     }
     711              : 
     712       312406 :     pub(crate) fn get_layers_visited(&self) -> u32 {
     713       312406 :         self.layers_visited
     714       312406 :     }
     715              : 
     716              :     /// On hitting image layer, we can mark all keys in this range as done, because
     717              :     /// if the image layer does not contain a key, it is deleted/never added.
     718        14128 :     pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
     719        14128 :         let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
     720        14128 :         assert_eq!(
     721              :             prev_val, None,
     722            0 :             "should consume the keyspace before the next iteration"
     723              :         );
     724        14128 :     }
     725              : 
     726              :     /// Update the state collected for a given key.
     727              :     /// Returns true if this was the last value needed for the key and false otherwise.
     728              :     ///
     729              :     /// If the key is done after the update, mark it as such.
     730              :     ///
     731              :     /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
     732              :     /// `key_done`.
     733              :     // TODO: rename this method & update description.
     734      1797835 :     pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
     735      1797835 :         let state = self.keys.entry(*key).or_default();
     736              : 
     737      1797835 :         let is_sparse_key = key.is_sparse();
     738              : 
     739      1797835 :         let required_io = match state.situation {
     740              :             ValueReconstructSituation::Complete => {
     741        44744 :                 if is_sparse_key {
     742              :                     // Sparse keyspace might be visited multiple times because
     743              :                     // we don't track unmapped keyspaces.
     744        44744 :                     return OnDiskValueIo::Unnecessary;
     745              :                 } else {
     746            0 :                     unreachable!()
     747              :                 }
     748              :             }
     749              :             ValueReconstructSituation::Continue => {
     750      1753091 :                 self.num_active_ios
     751      1753091 :                     .fetch_add(1, std::sync::atomic::Ordering::Release);
     752      1753091 :                 let (tx, rx) = tokio::sync::oneshot::channel();
     753      1753091 :                 state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
     754      1753091 :                 OnDiskValueIo::Required {
     755      1753091 :                     tx,
     756      1753091 :                     num_active_ios: Arc::clone(&self.num_active_ios),
     757      1753091 :                 }
     758              :             }
     759              :         };
     760              : 
     761      1753091 :         if completes && state.situation == ValueReconstructSituation::Continue {
     762       363571 :             state.situation = ValueReconstructSituation::Complete;
     763       363571 :             if !is_sparse_key {
     764       331685 :                 self.keys_done.add_key(*key);
     765       331685 :             }
     766      1389520 :         }
     767              : 
     768      1753091 :         required_io
     769      1797835 :     }
     770              : 
     771              :     /// Returns the key space describing the keys that have
     772              :     /// been marked as completed since the last call to this function.
     773              :     /// Returns individual keys done, and the image layer coverage.
     774       446162 :     pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
     775       446162 :         (
     776       446162 :             self.keys_done.consume_keyspace(),
     777       446162 :             self.keys_with_image_coverage.take(),
     778       446162 :         )
     779       446162 :     }
     780              : }
     781              : 
     782              : /// A key that uniquely identifies a layer in a timeline
     783              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     784              : pub(crate) enum LayerId {
     785              :     PersitentLayerId(PersistentLayerKey),
     786              :     InMemoryLayerId(InMemoryLayerFileId),
     787              : }
     788              : 
     789              : /// Uniquely identify a layer visit by the layer
     790              : /// and LSN range of the reads. Note that the end of the range is exclusive.
     791              : ///
     792              : /// The layer itself is not enough since we may have different LSN lower
     793              : /// bounds for delta layer reads. Scenarios where this can happen are:
     794              : ///
     795              : /// 1. Layer overlaps: imagine an image layer inside and in-memory layer
     796              : ///    and a query that only partially hits the image layer. Part of the query
     797              : ///    needs to read the whole in-memory layer and the other part needs to read
     798              : ///    only up to the image layer. Hence, they'll have different LSN floor values
     799              : ///    for the read.
     800              : ///
     801              : /// 2. Scattered reads: the read path supports starting at different LSNs. Imagine
     802              : ///    The start LSN for one range is inside a layer and the start LSN for another range
     803              : ///    Is above the layer (includes all of it). Both ranges need to read the layer all the
     804              : ///    Way to the end but starting at different points. Hence, they'll have different LSN
     805              : ///    Ceil values.
     806              : ///
     807              : /// The implication is that we might visit the same layer multiple times
     808              : /// in order to read different LSN ranges from it. In practice, this isn't very concerning
     809              : /// because:
     810              : /// 1. Layer overlaps are rare and generally not intended
     811              : /// 2. Scattered reads will stabilise after the first few layers provided their starting LSNs
     812              : ///    are grouped tightly enough (likely the case).
     813              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     814              : struct LayerToVisitId {
     815              :     layer_id: LayerId,
     816              :     lsn_floor: Lsn,
     817              :     lsn_ceil: Lsn,
     818              : }
     819              : 
     820              : #[derive(Debug, PartialEq, Eq, Hash)]
     821              : pub enum ReadableLayerWeak {
     822              :     PersistentLayer(Arc<PersistentLayerDesc>),
     823              :     InMemoryLayer(InMemoryLayerDesc),
     824              : }
     825              : 
     826              : /// Layer wrapper for the read path. Note that it is valid
     827              : /// to use these layers even after external operations have
     828              : /// been performed on them (compaction, freeze, etc.).
     829              : #[derive(Debug)]
     830              : pub(crate) enum ReadableLayer {
     831              :     PersistentLayer(Layer),
     832              :     InMemoryLayer(Arc<InMemoryLayer>),
     833              : }
     834              : 
     835              : /// A partial description of a read to be done.
     836              : #[derive(Debug, Clone)]
     837              : struct LayerVisit {
     838              :     /// An id used to resolve the readable layer within the fringe
     839              :     layer_to_visit_id: LayerToVisitId,
     840              :     /// Lsn range for the read, used for selecting the next read
     841              :     lsn_range: Range<Lsn>,
     842              : }
     843              : 
     844              : /// Data structure which maintains a fringe of layers for the
     845              : /// read path. The fringe is the set of layers which intersects
     846              : /// the current keyspace that the search is descending on.
     847              : /// Each layer tracks the keyspace that intersects it.
     848              : ///
     849              : /// The fringe must appear sorted by Lsn. Hence, it uses
     850              : /// a two layer indexing scheme.
     851              : #[derive(Debug)]
     852              : pub(crate) struct LayerFringe {
     853              :     planned_visits_by_lsn: BinaryHeap<LayerVisit>,
     854              :     visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
     855              : }
     856              : 
     857              : #[derive(Debug)]
     858              : struct LayerVisitReads {
     859              :     layer: ReadableLayer,
     860              :     target_keyspace: KeySpaceRandomAccum,
     861              : }
     862              : 
     863              : impl LayerFringe {
     864       425394 :     pub(crate) fn new() -> Self {
     865       425394 :         LayerFringe {
     866       425394 :             planned_visits_by_lsn: BinaryHeap::new(),
     867       425394 :             visit_reads: HashMap::new(),
     868       425394 :         }
     869       425394 :     }
     870              : 
     871       871556 :     pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
     872       871556 :         let read_desc = self.planned_visits_by_lsn.pop()?;
     873              : 
     874       446162 :         let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
     875              : 
     876       446162 :         match removed {
     877              :             Some((
     878              :                 _,
     879              :                 LayerVisitReads {
     880       446162 :                     layer,
     881       446162 :                     mut target_keyspace,
     882              :                 },
     883       446162 :             )) => Some((
     884       446162 :                 layer,
     885       446162 :                 target_keyspace.consume_keyspace(),
     886       446162 :                 read_desc.lsn_range,
     887       446162 :             )),
     888            0 :             None => unreachable!("fringe internals are always consistent"),
     889              :         }
     890       871556 :     }
     891              : 
     892       460553 :     pub(crate) fn update(
     893       460553 :         &mut self,
     894       460553 :         layer: ReadableLayer,
     895       460553 :         keyspace: KeySpace,
     896       460553 :         lsn_range: Range<Lsn>,
     897       460553 :     ) {
     898       460553 :         let layer_to_visit_id = LayerToVisitId {
     899       460553 :             layer_id: layer.id(),
     900       460553 :             lsn_floor: lsn_range.start,
     901       460553 :             lsn_ceil: lsn_range.end,
     902       460553 :         };
     903              : 
     904       460553 :         let entry = self.visit_reads.entry(layer_to_visit_id.clone());
     905       460553 :         match entry {
     906        14391 :             Entry::Occupied(mut entry) => {
     907        14391 :                 entry.get_mut().target_keyspace.add_keyspace(keyspace);
     908        14391 :             }
     909       446162 :             Entry::Vacant(entry) => {
     910       446162 :                 self.planned_visits_by_lsn.push(LayerVisit {
     911       446162 :                     lsn_range,
     912       446162 :                     layer_to_visit_id: layer_to_visit_id.clone(),
     913       446162 :                 });
     914       446162 :                 let mut accum = KeySpaceRandomAccum::new();
     915       446162 :                 accum.add_keyspace(keyspace);
     916       446162 :                 entry.insert(LayerVisitReads {
     917       446162 :                     layer,
     918       446162 :                     target_keyspace: accum,
     919       446162 :                 });
     920       446162 :             }
     921              :         }
     922       460553 :     }
     923              : }
     924              : 
     925              : impl Default for LayerFringe {
     926            0 :     fn default() -> Self {
     927            0 :         Self::new()
     928            0 :     }
     929              : }
     930              : 
     931              : impl Ord for LayerVisit {
     932        89326 :     fn cmp(&self, other: &Self) -> Ordering {
     933        89326 :         let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
     934        89326 :         if ord == std::cmp::Ordering::Equal {
     935         7257 :             self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
     936              :         } else {
     937        82069 :             ord
     938              :         }
     939        89326 :     }
     940              : }
     941              : 
     942              : impl PartialOrd for LayerVisit {
     943        89326 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     944        89326 :         Some(self.cmp(other))
     945        89326 :     }
     946              : }
     947              : 
     948              : impl PartialEq for LayerVisit {
     949            0 :     fn eq(&self, other: &Self) -> bool {
     950            0 :         self.lsn_range == other.lsn_range
     951            0 :     }
     952              : }
     953              : 
     954              : impl Eq for LayerVisit {}
     955              : 
     956              : impl ReadableLayer {
     957       460553 :     pub(crate) fn id(&self) -> LayerId {
     958       460553 :         match self {
     959       149392 :             Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
     960       311161 :             Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
     961              :         }
     962       460553 :     }
     963              : 
     964       446162 :     pub(crate) async fn get_values_reconstruct_data(
     965       446162 :         &self,
     966       446162 :         keyspace: KeySpace,
     967       446162 :         lsn_range: Range<Lsn>,
     968       446162 :         reconstruct_state: &mut ValuesReconstructState,
     969       446162 :         ctx: &RequestContext,
     970       446162 :     ) -> Result<(), GetVectoredError> {
     971       446162 :         match self {
     972       138845 :             ReadableLayer::PersistentLayer(layer) => {
     973       138845 :                 let ctx = RequestContextBuilder::from(ctx)
     974       138845 :                     .perf_span(|crnt_perf_span| {
     975            0 :                         info_span!(
     976              :                             target: PERF_TRACE_TARGET,
     977            0 :                             parent: crnt_perf_span,
     978              :                             "PLAN_LAYER",
     979              :                             layer = %layer
     980              :                         )
     981            0 :                     })
     982       138845 :                     .attached_child();
     983              : 
     984       138845 :                 layer
     985       138845 :                     .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
     986       138845 :                     .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
     987       138845 :                     .await
     988              :             }
     989       307317 :             ReadableLayer::InMemoryLayer(layer) => {
     990       307317 :                 let ctx = RequestContextBuilder::from(ctx)
     991       307317 :                     .perf_span(|crnt_perf_span| {
     992            0 :                         info_span!(
     993              :                             target: PERF_TRACE_TARGET,
     994            0 :                             parent: crnt_perf_span,
     995              :                             "PLAN_LAYER",
     996              :                             layer = %layer
     997              :                         )
     998            0 :                     })
     999       307317 :                     .attached_child();
    1000              : 
    1001       307317 :                 layer
    1002       307317 :                     .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
    1003       307317 :                     .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
    1004       307317 :                     .await
    1005              :             }
    1006              :         }
    1007       446162 :     }
    1008              : }
    1009              : 
    1010              : /// Layers contain a hint indicating whether they are likely to be used for reads.
    1011              : ///
    1012              : /// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
    1013              : /// when changing the visibility of layers (for example when creating a branch that makes some previously
    1014              : /// covered layers visible).  It should be used for cache management but not for correctness-critical checks.
    1015              : #[derive(Debug, Clone, PartialEq, Eq)]
    1016              : pub enum LayerVisibilityHint {
    1017              :     /// A Visible layer might be read while serving a read, because there is not an image layer between it
    1018              :     /// and a readable LSN (the tip of the branch or a child's branch point)
    1019              :     Visible,
    1020              :     /// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
    1021              :     /// a branch or ephemeral endpoint at an LSN below the layer that covers this.
    1022              :     Covered,
    1023              : }
    1024              : 
    1025              : pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
    1026              : 
    1027              : #[derive(Clone, Copy, strum_macros::EnumString)]
    1028              : pub(crate) enum LayerAccessStatsReset {
    1029              :     NoReset,
    1030              :     AllStats,
    1031              : }
    1032              : 
    1033              : impl Default for LayerAccessStats {
    1034          991 :     fn default() -> Self {
    1035              :         // Default value is to assume resident since creation time, and visible.
    1036          991 :         let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
    1037          991 :         value |= 0x1 << Self::VISIBILITY_SHIFT;
    1038              : 
    1039          991 :         Self(std::sync::atomic::AtomicU64::new(value))
    1040          991 :     }
    1041              : }
    1042              : 
    1043              : // Efficient store of two very-low-resolution timestamps and some bits.  Used for storing last access time and
    1044              : // last residence change time.
    1045              : impl LayerAccessStats {
    1046              :     // How many high bits to drop from a u32 timestamp?
    1047              :     // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
    1048              :     //   after that, this software has been very successful!)
    1049              :     // - Dropping the top bit is implicitly safe because unix timestamps are meant to be
    1050              :     // stored in an i32, so they never used it.
    1051              :     // - Dropping the next two bits is safe because this code is only running on systems in
    1052              :     // years >= 2024, and these bits have been 1 since 2021
    1053              :     //
    1054              :     // Therefore we may store only 28 bits for a timestamp with one second resolution.  We do
    1055              :     // this truncation to make space for some flags in the high bits of our u64.
    1056              :     const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
    1057              :     const TS_MASK: u32 = 0x1f_ff_ff_ff;
    1058              :     const TS_ONES: u32 = 0x60_00_00_00;
    1059              : 
    1060              :     const ATIME_SHIFT: u32 = 0;
    1061              :     const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
    1062              :     const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
    1063              : 
    1064       137680 :     fn write_bits(&self, mask: u64, value: u64) -> u64 {
    1065       137680 :         self.0
    1066       137680 :             .fetch_update(
    1067              :                 // TODO: decide what orderings are correct
    1068       137680 :                 std::sync::atomic::Ordering::Relaxed,
    1069       137680 :                 std::sync::atomic::Ordering::Relaxed,
    1070       137680 :                 |v| Some((v & !mask) | (value & mask)),
    1071              :             )
    1072       137680 :             .expect("Inner function is infallible")
    1073       137680 :     }
    1074              : 
    1075       138474 :     fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
    1076              :         // Drop the low three bits of the timestamp, for an ~8s accuracy
    1077       138474 :         let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
    1078              : 
    1079       138474 :         ((Self::TS_MASK as u64) << shift, timestamp << shift)
    1080       138474 :     }
    1081              : 
    1082           73 :     fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
    1083           73 :         let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
    1084              : 
    1085           73 :         let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
    1086           73 :         if ts_bits == 0 {
    1087           33 :             None
    1088              :         } else {
    1089           40 :             Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
    1090              :         }
    1091           73 :     }
    1092              : 
    1093              :     /// Record a change in layer residency.
    1094              :     ///
    1095              :     /// Recording the event must happen while holding the layer map lock to
    1096              :     /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
    1097              :     /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
    1098              :     ///
    1099              :     /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
    1100              :     /// the following race could happen:
    1101              :     ///
    1102              :     /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
    1103              :     /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
    1104              :     /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
    1105              :     /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
    1106           25 :     pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
    1107           25 :         let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
    1108           25 :         self.write_bits(mask, value);
    1109           25 :     }
    1110              : 
    1111           24 :     pub(crate) fn record_residence_event(&self) {
    1112           24 :         self.record_residence_event_at(SystemTime::now())
    1113           24 :     }
    1114              : 
    1115       137458 :     fn record_access_at(&self, now: SystemTime) -> bool {
    1116       137458 :         let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
    1117              : 
    1118              :         // A layer which is accessed must be visible.
    1119       137458 :         mask |= 0x1 << Self::VISIBILITY_SHIFT;
    1120       137458 :         value |= 0x1 << Self::VISIBILITY_SHIFT;
    1121              : 
    1122       137458 :         let old_bits = self.write_bits(mask, value);
    1123            1 :         !matches!(
    1124       137458 :             self.decode_visibility(old_bits),
    1125              :             LayerVisibilityHint::Visible
    1126              :         )
    1127       137458 :     }
    1128              : 
    1129              :     /// Returns true if we modified the layer's visibility to set it to Visible implicitly
    1130              :     /// as a result of this access
    1131       138851 :     pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
    1132       138851 :         if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
    1133         1396 :             return false;
    1134       137455 :         }
    1135              : 
    1136       137455 :         self.record_access_at(SystemTime::now())
    1137       138851 :     }
    1138              : 
    1139            0 :     fn as_api_model(
    1140            0 :         &self,
    1141            0 :         reset: LayerAccessStatsReset,
    1142            0 :     ) -> pageserver_api::models::LayerAccessStats {
    1143            0 :         let ret = pageserver_api::models::LayerAccessStats {
    1144            0 :             access_time: self
    1145            0 :                 .read_low_res_timestamp(Self::ATIME_SHIFT)
    1146            0 :                 .unwrap_or(UNIX_EPOCH),
    1147            0 :             residence_time: self
    1148            0 :                 .read_low_res_timestamp(Self::RTIME_SHIFT)
    1149            0 :                 .unwrap_or(UNIX_EPOCH),
    1150            0 :             visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
    1151              :         };
    1152            0 :         match reset {
    1153            0 :             LayerAccessStatsReset::NoReset => {}
    1154            0 :             LayerAccessStatsReset::AllStats => {
    1155            0 :                 self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
    1156            0 :                 self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
    1157            0 :             }
    1158              :         }
    1159            0 :         ret
    1160            0 :     }
    1161              : 
    1162              :     /// Get the latest access timestamp, falling back to latest residence event.  The latest residence event
    1163              :     /// will be this Layer's construction time, if its residence hasn't changed since then.
    1164           21 :     pub(crate) fn latest_activity(&self) -> SystemTime {
    1165           21 :         if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
    1166            3 :             t
    1167              :         } else {
    1168           18 :             self.read_low_res_timestamp(Self::RTIME_SHIFT)
    1169           18 :                 .expect("Residence time is set on construction")
    1170              :         }
    1171           21 :     }
    1172              : 
    1173              :     /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
    1174              :     ///
    1175              :     /// This indicates whether the layer has been used for some purpose that would motivate
    1176              :     /// us to keep it on disk, such as for serving a getpage request.
    1177           17 :     fn accessed(&self) -> bool {
    1178              :         // Consider it accessed if the most recent access is more recent than
    1179              :         // the most recent change in residence status.
    1180              :         match (
    1181           17 :             self.read_low_res_timestamp(Self::ATIME_SHIFT),
    1182           17 :             self.read_low_res_timestamp(Self::RTIME_SHIFT),
    1183              :         ) {
    1184           15 :             (None, _) => false,
    1185            0 :             (Some(_), None) => true,
    1186            2 :             (Some(a), Some(r)) => a >= r,
    1187              :         }
    1188           17 :     }
    1189              : 
    1190              :     /// Helper for extracting the visibility hint from the literal value of our inner u64
    1191       138059 :     fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
    1192       138059 :         match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
    1193       138052 :             1 => LayerVisibilityHint::Visible,
    1194            7 :             0 => LayerVisibilityHint::Covered,
    1195            0 :             _ => unreachable!(),
    1196              :         }
    1197       138059 :     }
    1198              : 
    1199              :     /// Returns the old value which has been replaced
    1200          197 :     pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
    1201          197 :         let value = match visibility {
    1202          180 :             LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
    1203           17 :             LayerVisibilityHint::Covered => 0x0,
    1204              :         };
    1205              : 
    1206          197 :         let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
    1207          197 :         self.decode_visibility(old_bits)
    1208          197 :     }
    1209              : 
    1210          404 :     pub(crate) fn visibility(&self) -> LayerVisibilityHint {
    1211          404 :         let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
    1212          404 :         self.decode_visibility(read)
    1213          404 :     }
    1214              : }
    1215              : 
    1216              : /// Get a layer descriptor from a layer.
    1217              : pub(crate) trait AsLayerDesc {
    1218              :     /// Get the layer descriptor.
    1219              :     fn layer_desc(&self) -> &PersistentLayerDesc;
    1220              : }
    1221              : 
    1222              : pub mod tests {
    1223              :     use pageserver_api::shard::TenantShardId;
    1224              :     use utils::id::TimelineId;
    1225              : 
    1226              :     use super::*;
    1227              : 
    1228              :     impl From<DeltaLayerName> for PersistentLayerDesc {
    1229           11 :         fn from(value: DeltaLayerName) -> Self {
    1230           11 :             PersistentLayerDesc::new_delta(
    1231           11 :                 TenantShardId::from([0; 18]),
    1232           11 :                 TimelineId::from_array([0; 16]),
    1233           11 :                 value.key_range,
    1234           11 :                 value.lsn_range,
    1235              :                 233,
    1236              :             )
    1237           11 :         }
    1238              :     }
    1239              : 
    1240              :     impl From<ImageLayerName> for PersistentLayerDesc {
    1241           12 :         fn from(value: ImageLayerName) -> Self {
    1242           12 :             PersistentLayerDesc::new_img(
    1243           12 :                 TenantShardId::from([0; 18]),
    1244           12 :                 TimelineId::from_array([0; 16]),
    1245           12 :                 value.key_range,
    1246           12 :                 value.lsn,
    1247              :                 233,
    1248              :             )
    1249           12 :         }
    1250              :     }
    1251              : 
    1252              :     impl From<LayerName> for PersistentLayerDesc {
    1253           23 :         fn from(value: LayerName) -> Self {
    1254           23 :             match value {
    1255           11 :                 LayerName::Delta(d) => Self::from(d),
    1256           12 :                 LayerName::Image(i) => Self::from(i),
    1257              :             }
    1258           23 :         }
    1259              :     }
    1260              : }
    1261              : 
    1262              : /// Range wrapping newtype, which uses display to render Debug.
    1263              : ///
    1264              : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
    1265              : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
    1266              : 
    1267              : impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
    1268            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    1269            0 :         write!(f, "{}..{}", self.0.start, self.0.end)
    1270            0 :     }
    1271              : }
    1272              : 
    1273              : #[cfg(test)]
    1274              : mod tests2 {
    1275              :     use pageserver_api::key::DBDIR_KEY;
    1276              :     use tracing::info;
    1277              : 
    1278              :     use super::*;
    1279              :     use crate::tenant::storage_layer::IoConcurrency;
    1280              : 
    1281              :     /// TODO: currently this test relies on manual visual inspection of the --no-capture output.
    1282              :     /// Should look like so:
    1283              :     /// ```text
    1284              :     /// RUST_LOG=trace cargo nextest run  --features testing  --no-capture test_io_concurrency_noise
    1285              :     /// running 1 test
    1286              :     /// 2025-01-21T17:42:01.335679Z  INFO get_vectored_concurrent_io test selected=SidecarTask
    1287              :     /// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
    1288              :     /// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
    1289              :     /// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
    1290              :     /// 2025-01-21T17:42:01.335999Z  INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
    1291              :     /// 2025-01-21T17:42:01.336229Z  WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace=   0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
    1292              :     ///              at ./src/tenant/storage_layer.rs:553:24
    1293              :     ///    1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
    1294              :     ///              at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:521:1
    1295              :     ///    2: core::mem::drop
    1296              :     ///              at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:942:24
    1297              :     ///    3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
    1298              :     ///              at ./src/tenant/storage_layer.rs:1159:9
    1299              :     ///   ...
    1300              :     ///   49: <unknown>
    1301              :     /// 2025-01-21T17:42:01.452293Z  INFO IoConcurrency_sidecar{task_id=0}: completing IO
    1302              :     /// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
    1303              :     /// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
    1304              :     /// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
    1305              :     ///
    1306              :     /// ```
    1307              :     #[tokio::test]
    1308            1 :     async fn test_io_concurrency_noise() {
    1309            1 :         crate::tenant::harness::setup_logging();
    1310              : 
    1311            1 :         let io_concurrency = IoConcurrency::spawn_for_test();
    1312            1 :         match *io_concurrency {
    1313              :             IoConcurrency::Sequential => {
    1314              :                 // This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
    1315            0 :                 return;
    1316              :             }
    1317            1 :             IoConcurrency::SidecarTask { .. } => {}
    1318              :         }
    1319            1 :         let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
    1320              : 
    1321            1 :         let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
    1322            1 :         let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
    1323            1 :         let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
    1324              : 
    1325            1 :         let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
    1326            1 :         reconstruct_state
    1327            1 :             .spawn_io(async move {
    1328            1 :                 info!("waiting for signal to complete IO");
    1329            1 :                 io_fut_is_waiting_tx.send(()).unwrap();
    1330            1 :                 should_complete_io.await.unwrap();
    1331            1 :                 info!("completing IO");
    1332            1 :                 io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
    1333            1 :                 io_fut_exiting_tx.send(()).unwrap();
    1334            1 :             })
    1335            1 :             .await;
    1336              : 
    1337            1 :         io_fut_is_waiting.await.unwrap();
    1338              : 
    1339              :         // this is what makes the noise
    1340            1 :         drop(reconstruct_state);
    1341              : 
    1342            1 :         do_complete_io.send(()).unwrap();
    1343              : 
    1344            1 :         io_fut_exiting.await.unwrap();
    1345            1 :     }
    1346              : }
        

Generated by: LCOV version 2.1-beta