LCOV - code coverage report
Current view: top level - pageserver/src/tenant - storage_layer.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 59.8 % 361 216
Test Date: 2024-07-21 16:16:09 Functions: 57.4 % 54 31

            Line data    Source code
       1              : //! Common traits and structs for layers
       2              : 
       3              : pub mod delta_layer;
       4              : pub mod image_layer;
       5              : pub(crate) mod inmemory_layer;
       6              : pub(crate) mod layer;
       7              : mod layer_desc;
       8              : mod layer_name;
       9              : pub mod merge_iterator;
      10              : 
      11              : use crate::context::{AccessStatsBehavior, RequestContext};
      12              : use crate::repository::Value;
      13              : use crate::task_mgr::TaskKind;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use bytes::Bytes;
      16              : use enum_map::EnumMap;
      17              : use enumset::EnumSet;
      18              : use once_cell::sync::Lazy;
      19              : use pageserver_api::key::Key;
      20              : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
      21              : use pageserver_api::models::{
      22              :     LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
      23              : };
      24              : use std::borrow::Cow;
      25              : use std::cmp::{Ordering, Reverse};
      26              : use std::collections::hash_map::Entry;
      27              : use std::collections::{BinaryHeap, HashMap};
      28              : use std::ops::Range;
      29              : use std::sync::{Arc, Mutex};
      30              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
      31              : use tracing::warn;
      32              : use utils::history_buffer::HistoryBufferWithDropCounter;
      33              : use utils::rate_limit::RateLimit;
      34              : 
      35              : use utils::{id::TimelineId, lsn::Lsn};
      36              : 
      37              : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
      38              : pub use image_layer::{ImageLayer, ImageLayerWriter};
      39              : pub use inmemory_layer::InMemoryLayer;
      40              : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
      41              : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
      42              : 
      43              : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
      44              : 
      45              : use self::inmemory_layer::InMemoryLayerFileId;
      46              : 
      47              : use super::timeline::GetVectoredError;
      48              : use super::PageReconstructError;
      49              : 
      50            0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
      51            0 : where
      52            0 :     T: PartialOrd<T>,
      53            0 : {
      54            0 :     if a.start < b.start {
      55            0 :         a.end > b.start
      56              :     } else {
      57            0 :         b.end > a.start
      58              :     }
      59            0 : }
      60              : 
      61              : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
      62              : ///
      63              : /// Before first call, you can fill in 'page_img' if you have an older cached
      64              : /// version of the page available. That can save work in
      65              : /// 'get_value_reconstruct_data', as it can stop searching for page versions
      66              : /// when all the WAL records going back to the cached image have been collected.
      67              : ///
      68              : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
      69              : /// of the page, or the oldest WAL record in 'records' is a will_init-type
      70              : /// record that initializes the page without requiring a previous image.
      71              : ///
      72              : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
      73              : /// been collected, but there are more records outside the current layer. Pass
      74              : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
      75              : /// call, to collect more records.
      76              : ///
      77              : #[derive(Debug, Default)]
      78              : pub struct ValueReconstructState {
      79              :     pub records: Vec<(Lsn, NeonWalRecord)>,
      80              :     pub img: Option<(Lsn, Bytes)>,
      81              : }
      82              : 
      83              : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
      84              : pub(crate) enum ValueReconstructSituation {
      85              :     Complete,
      86              :     #[default]
      87              :     Continue,
      88              : }
      89              : 
      90              : /// Reconstruct data accumulated for a single key during a vectored get
      91              : #[derive(Debug, Default, Clone)]
      92              : pub(crate) struct VectoredValueReconstructState {
      93              :     pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
      94              :     pub(crate) img: Option<(Lsn, Bytes)>,
      95              : 
      96              :     situation: ValueReconstructSituation,
      97              : }
      98              : 
      99              : impl VectoredValueReconstructState {
     100        39816 :     fn get_cached_lsn(&self) -> Option<Lsn> {
     101        39816 :         self.img.as_ref().map(|img| img.0)
     102        39816 :     }
     103              : }
     104              : 
     105              : impl From<VectoredValueReconstructState> for ValueReconstructState {
     106        40436 :     fn from(mut state: VectoredValueReconstructState) -> Self {
     107        40436 :         // walredo expects the records to be descending in terms of Lsn
     108        40436 :         state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
     109        40436 : 
     110        40436 :         ValueReconstructState {
     111        40436 :             records: state.records,
     112        40436 :             img: state.img,
     113        40436 :         }
     114        40436 :     }
     115              : }
     116              : 
     117              : /// Bag of data accumulated during a vectored get..
     118              : pub(crate) struct ValuesReconstructState {
     119              :     /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
     120              :     /// should not expect to get anything from this hashmap.
     121              :     pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
     122              :     /// The keys which are already retrieved
     123              :     keys_done: KeySpaceRandomAccum,
     124              : 
     125              :     /// The keys covered by the image layers
     126              :     keys_with_image_coverage: Option<Range<Key>>,
     127              : 
     128              :     // Statistics that are still accessible as a caller of `get_vectored_impl`.
     129              :     layers_visited: u32,
     130              :     delta_layers_visited: u32,
     131              : }
     132              : 
     133              : impl ValuesReconstructState {
     134          348 :     pub(crate) fn new() -> Self {
     135          348 :         Self {
     136          348 :             keys: HashMap::new(),
     137          348 :             keys_done: KeySpaceRandomAccum::new(),
     138          348 :             keys_with_image_coverage: None,
     139          348 :             layers_visited: 0,
     140          348 :             delta_layers_visited: 0,
     141          348 :         }
     142          348 :     }
     143              : 
     144              :     /// Associate a key with the error which it encountered and mark it as done
     145            0 :     pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
     146            0 :         let previous = self.keys.insert(key, Err(err));
     147            0 :         if let Some(Ok(state)) = previous {
     148            0 :             if state.situation == ValueReconstructSituation::Continue {
     149            0 :                 self.keys_done.add_key(key);
     150            0 :             }
     151            0 :         }
     152            0 :     }
     153              : 
     154          232 :     pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
     155          232 :         self.layers_visited += 1;
     156          232 :         if let ReadableLayer::PersistentLayer(layer) = layer {
     157          218 :             if layer.layer_desc().is_delta() {
     158          152 :                 self.delta_layers_visited += 1;
     159          152 :             }
     160           14 :         }
     161          232 :     }
     162              : 
     163           24 :     pub(crate) fn get_delta_layers_visited(&self) -> u32 {
     164           24 :         self.delta_layers_visited
     165           24 :     }
     166              : 
     167          128 :     pub(crate) fn get_layers_visited(&self) -> u32 {
     168          128 :         self.layers_visited
     169          128 :     }
     170              : 
     171              :     /// This function is called after reading a keyspace from a layer.
     172              :     /// It checks if the read path has now moved past the cached Lsn for any keys.
     173              :     ///
     174              :     /// Implementation note: We intentionally iterate over the keys for which we've
     175              :     /// already collected some reconstruct data. This avoids scaling complexity with
     176              :     /// the size of the search space.
     177          166 :     pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
     178        72391 :         for (key, value) in self.keys.iter_mut() {
     179        72391 :             if !keyspace.contains(key) {
     180        42253 :                 continue;
     181        30138 :             }
     182              : 
     183        30138 :             if let Ok(state) = value {
     184        30138 :                 if state.situation != ValueReconstructSituation::Complete
     185            0 :                     && state.get_cached_lsn() >= Some(advanced_to)
     186            0 :                 {
     187            0 :                     state.situation = ValueReconstructSituation::Complete;
     188            0 :                     self.keys_done.add_key(*key);
     189        30138 :                 }
     190            0 :             }
     191              :         }
     192          166 :     }
     193              : 
     194              :     /// On hitting image layer, we can mark all keys in this range as done, because
     195              :     /// if the image layer does not contain a key, it is deleted/never added.
     196           74 :     pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
     197           74 :         let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
     198           74 :         assert_eq!(
     199              :             prev_val, None,
     200            0 :             "should consume the keyspace before the next iteration"
     201              :         );
     202           74 :     }
     203              : 
     204              :     /// Update the state collected for a given key.
     205              :     /// Returns true if this was the last value needed for the key and false otherwise.
     206              :     ///
     207              :     /// If the key is done after the update, mark it as such.
     208        40508 :     pub(crate) fn update_key(
     209        40508 :         &mut self,
     210        40508 :         key: &Key,
     211        40508 :         lsn: Lsn,
     212        40508 :         value: Value,
     213        40508 :     ) -> ValueReconstructSituation {
     214        40508 :         let state = self
     215        40508 :             .keys
     216        40508 :             .entry(*key)
     217        40508 :             .or_insert(Ok(VectoredValueReconstructState::default()));
     218              : 
     219        40508 :         if let Ok(state) = state {
     220        40508 :             let key_done = match state.situation {
     221            0 :                 ValueReconstructSituation::Complete => unreachable!(),
     222        40508 :                 ValueReconstructSituation::Continue => match value {
     223        40508 :                     Value::Image(img) => {
     224        40508 :                         state.img = Some((lsn, img));
     225        40508 :                         true
     226              :                     }
     227            0 :                     Value::WalRecord(rec) => {
     228            0 :                         debug_assert!(
     229            0 :                             Some(lsn) > state.get_cached_lsn(),
     230            0 :                             "Attempt to collect a record below cached LSN for walredo: {} < {}",
     231            0 :                             lsn,
     232            0 :                             state
     233            0 :                                 .get_cached_lsn()
     234            0 :                                 .expect("Assertion can only fire if a cached lsn is present")
     235              :                         );
     236              : 
     237            0 :                         let will_init = rec.will_init();
     238            0 :                         state.records.push((lsn, rec));
     239            0 :                         will_init
     240              :                     }
     241              :                 },
     242              :             };
     243              : 
     244        40508 :             if key_done && state.situation == ValueReconstructSituation::Continue {
     245        40508 :                 state.situation = ValueReconstructSituation::Complete;
     246        40508 :                 self.keys_done.add_key(*key);
     247        40508 :             }
     248              : 
     249        40508 :             state.situation
     250              :         } else {
     251            0 :             ValueReconstructSituation::Complete
     252              :         }
     253        40508 :     }
     254              : 
     255              :     /// Returns the Lsn at which this key is cached if one exists.
     256              :     /// The read path should go no further than this Lsn for the given key.
     257       191617 :     pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
     258       191617 :         self.keys
     259       191617 :             .get(key)
     260       191617 :             .and_then(|k| k.as_ref().ok())
     261       191617 :             .and_then(|state| state.get_cached_lsn())
     262       191617 :     }
     263              : 
     264              :     /// Returns the key space describing the keys that have
     265              :     /// been marked as completed since the last call to this function.
     266              :     /// Returns individual keys done, and the image layer coverage.
     267          406 :     pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
     268          406 :         (
     269          406 :             self.keys_done.consume_keyspace(),
     270          406 :             self.keys_with_image_coverage.take(),
     271          406 :         )
     272          406 :     }
     273              : }
     274              : 
     275              : impl Default for ValuesReconstructState {
     276           66 :     fn default() -> Self {
     277           66 :         Self::new()
     278           66 :     }
     279              : }
     280              : 
     281              : /// A key that uniquely identifies a layer in a timeline
     282              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     283              : pub(crate) enum LayerId {
     284              :     PersitentLayerId(PersistentLayerKey),
     285              :     InMemoryLayerId(InMemoryLayerFileId),
     286              : }
     287              : 
     288              : /// Layer wrapper for the read path. Note that it is valid
     289              : /// to use these layers even after external operations have
     290              : /// been performed on them (compaction, freeze, etc.).
     291              : #[derive(Debug)]
     292              : pub(crate) enum ReadableLayer {
     293              :     PersistentLayer(Layer),
     294              :     InMemoryLayer(Arc<InMemoryLayer>),
     295              : }
     296              : 
     297              : /// A partial description of a read to be done.
     298              : #[derive(Debug, Clone)]
     299              : struct ReadDesc {
     300              :     /// An id used to resolve the readable layer within the fringe
     301              :     layer_id: LayerId,
     302              :     /// Lsn range for the read, used for selecting the next read
     303              :     lsn_range: Range<Lsn>,
     304              : }
     305              : 
     306              : /// Data structure which maintains a fringe of layers for the
     307              : /// read path. The fringe is the set of layers which intersects
     308              : /// the current keyspace that the search is descending on.
     309              : /// Each layer tracks the keyspace that intersects it.
     310              : ///
     311              : /// The fringe must appear sorted by Lsn. Hence, it uses
     312              : /// a two layer indexing scheme.
     313              : #[derive(Debug)]
     314              : pub(crate) struct LayerFringe {
     315              :     planned_reads_by_lsn: BinaryHeap<ReadDesc>,
     316              :     layers: HashMap<LayerId, LayerKeyspace>,
     317              : }
     318              : 
     319              : #[derive(Debug)]
     320              : struct LayerKeyspace {
     321              :     layer: ReadableLayer,
     322              :     target_keyspace: KeySpaceRandomAccum,
     323              : }
     324              : 
     325              : impl LayerFringe {
     326          174 :     pub(crate) fn new() -> Self {
     327          174 :         LayerFringe {
     328          174 :             planned_reads_by_lsn: BinaryHeap::new(),
     329          174 :             layers: HashMap::new(),
     330          174 :         }
     331          174 :     }
     332              : 
     333          406 :     pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
     334          406 :         let read_desc = match self.planned_reads_by_lsn.pop() {
     335          232 :             Some(desc) => desc,
     336          174 :             None => return None,
     337              :         };
     338              : 
     339          232 :         let removed = self.layers.remove_entry(&read_desc.layer_id);
     340          232 : 
     341          232 :         match removed {
     342              :             Some((
     343              :                 _,
     344              :                 LayerKeyspace {
     345          232 :                     layer,
     346          232 :                     mut target_keyspace,
     347          232 :                 },
     348          232 :             )) => Some((
     349          232 :                 layer,
     350          232 :                 target_keyspace.consume_keyspace(),
     351          232 :                 read_desc.lsn_range,
     352          232 :             )),
     353            0 :             None => unreachable!("fringe internals are always consistent"),
     354              :         }
     355          406 :     }
     356              : 
     357        64050 :     pub(crate) fn update(
     358        64050 :         &mut self,
     359        64050 :         layer: ReadableLayer,
     360        64050 :         keyspace: KeySpace,
     361        64050 :         lsn_range: Range<Lsn>,
     362        64050 :     ) {
     363        64050 :         let layer_id = layer.id();
     364        64050 :         let entry = self.layers.entry(layer_id.clone());
     365        64050 :         match entry {
     366        63818 :             Entry::Occupied(mut entry) => {
     367        63818 :                 entry.get_mut().target_keyspace.add_keyspace(keyspace);
     368        63818 :             }
     369          232 :             Entry::Vacant(entry) => {
     370          232 :                 self.planned_reads_by_lsn.push(ReadDesc {
     371          232 :                     lsn_range,
     372          232 :                     layer_id: layer_id.clone(),
     373          232 :                 });
     374          232 :                 let mut accum = KeySpaceRandomAccum::new();
     375          232 :                 accum.add_keyspace(keyspace);
     376          232 :                 entry.insert(LayerKeyspace {
     377          232 :                     layer,
     378          232 :                     target_keyspace: accum,
     379          232 :                 });
     380          232 :             }
     381              :         }
     382        64050 :     }
     383              : }
     384              : 
     385              : impl Default for LayerFringe {
     386            0 :     fn default() -> Self {
     387            0 :         Self::new()
     388            0 :     }
     389              : }
     390              : 
     391              : impl Ord for ReadDesc {
     392           16 :     fn cmp(&self, other: &Self) -> Ordering {
     393           16 :         let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
     394           16 :         if ord == std::cmp::Ordering::Equal {
     395           16 :             self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
     396              :         } else {
     397            0 :             ord
     398              :         }
     399           16 :     }
     400              : }
     401              : 
     402              : impl PartialOrd for ReadDesc {
     403           16 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     404           16 :         Some(self.cmp(other))
     405           16 :     }
     406              : }
     407              : 
     408              : impl PartialEq for ReadDesc {
     409            0 :     fn eq(&self, other: &Self) -> bool {
     410            0 :         self.lsn_range == other.lsn_range
     411            0 :     }
     412              : }
     413              : 
     414              : impl Eq for ReadDesc {}
     415              : 
     416              : impl ReadableLayer {
     417        64050 :     pub(crate) fn id(&self) -> LayerId {
     418        64050 :         match self {
     419        64036 :             Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
     420           14 :             Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
     421              :         }
     422        64050 :     }
     423              : 
     424          232 :     pub(crate) async fn get_values_reconstruct_data(
     425          232 :         &self,
     426          232 :         keyspace: KeySpace,
     427          232 :         lsn_range: Range<Lsn>,
     428          232 :         reconstruct_state: &mut ValuesReconstructState,
     429          232 :         ctx: &RequestContext,
     430          232 :     ) -> Result<(), GetVectoredError> {
     431          232 :         match self {
     432          218 :             ReadableLayer::PersistentLayer(layer) => {
     433          218 :                 layer
     434          218 :                     .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
     435        11401 :                     .await
     436              :             }
     437           14 :             ReadableLayer::InMemoryLayer(layer) => {
     438           14 :                 layer
     439           14 :                     .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
     440           27 :                     .await
     441              :             }
     442              :         }
     443          232 :     }
     444              : }
     445              : 
     446              : /// Return value from [`Layer::get_value_reconstruct_data`]
     447              : #[derive(Clone, Copy, Debug)]
     448              : pub enum ValueReconstructResult {
     449              :     /// Got all the data needed to reconstruct the requested page
     450              :     Complete,
     451              :     /// This layer didn't contain all the required data, the caller should look up
     452              :     /// the predecessor layer at the returned LSN and collect more data from there.
     453              :     Continue,
     454              : 
     455              :     /// This layer didn't contain data needed to reconstruct the page version at
     456              :     /// the returned LSN. This is usually considered an error, but might be OK
     457              :     /// in some circumstances.
     458              :     Missing,
     459              : }
     460              : 
     461              : #[derive(Debug)]
     462              : pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
     463              : 
     464              : /// This struct holds two instances of [`LayerAccessStatsInner`].
     465              : /// Accesses are recorded to both instances.
     466              : /// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
     467              : /// The `for_eviction_policy` is never reset.
     468              : #[derive(Debug, Default, Clone)]
     469              : struct LayerAccessStatsLocked {
     470              :     for_scraping_api: LayerAccessStatsInner,
     471              :     for_eviction_policy: LayerAccessStatsInner,
     472              : }
     473              : 
     474              : impl LayerAccessStatsLocked {
     475       213416 :     fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
     476       213416 :         [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
     477       213416 :     }
     478              : }
     479              : 
     480              : #[derive(Debug, Default, Clone)]
     481              : struct LayerAccessStatsInner {
     482              :     first_access: Option<LayerAccessStatFullDetails>,
     483              :     count_by_access_kind: EnumMap<LayerAccessKind, u64>,
     484              :     task_kind_flag: EnumSet<TaskKind>,
     485              :     last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
     486              :     last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
     487              : }
     488              : 
     489              : #[derive(Debug, Clone, Copy)]
     490              : pub(crate) struct LayerAccessStatFullDetails {
     491              :     pub(crate) when: SystemTime,
     492              :     pub(crate) task_kind: TaskKind,
     493              :     pub(crate) access_kind: LayerAccessKind,
     494              : }
     495              : 
     496            0 : #[derive(Clone, Copy, strum_macros::EnumString)]
     497              : pub enum LayerAccessStatsReset {
     498              :     NoReset,
     499              :     JustTaskKindFlags,
     500              :     AllStats,
     501              : }
     502              : 
     503            0 : fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
     504            0 :     ts.duration_since(UNIX_EPOCH)
     505            0 :         .expect("better to die in this unlikely case than report false stats")
     506            0 :         .as_millis()
     507            0 :         .try_into()
     508            0 :         .expect("64 bits is enough for few more years")
     509            0 : }
     510              : 
     511              : impl LayerAccessStatFullDetails {
     512            0 :     fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
     513            0 :         let Self {
     514            0 :             when,
     515            0 :             task_kind,
     516            0 :             access_kind,
     517            0 :         } = self;
     518            0 :         pageserver_api::models::LayerAccessStatFullDetails {
     519            0 :             when_millis_since_epoch: system_time_to_millis_since_epoch(when),
     520            0 :             task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
     521            0 :             access_kind: *access_kind,
     522            0 :         }
     523            0 :     }
     524              : }
     525              : 
     526              : impl LayerAccessStats {
     527              :     /// Create an empty stats object.
     528              :     ///
     529              :     /// The caller is responsible for recording a residence event
     530              :     /// using [`record_residence_event`] before calling `latest_activity`.
     531              :     /// If they don't, [`latest_activity`] will return `None`.
     532              :     ///
     533              :     /// [`record_residence_event`]: Self::record_residence_event
     534              :     /// [`latest_activity`]: Self::latest_activity
     535         1590 :     pub(crate) fn empty_will_record_residence_event_later() -> Self {
     536         1590 :         LayerAccessStats(Mutex::default())
     537         1590 :     }
     538              : 
     539              :     /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
     540              :     ///
     541              :     /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
     542              :     ///
     543              :     /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
     544              :     /// [`record_residence_event`]: Self::record_residence_event
     545           24 :     pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
     546           24 :         let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
     547           24 :         new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
     548           24 :         new
     549           24 :     }
     550              : 
     551              :     /// Record a change in layer residency.
     552              :     ///
     553              :     /// Recording the event must happen while holding the layer map lock to
     554              :     /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
     555              :     /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
     556              :     ///
     557              :     /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
     558              :     /// the following race could happen:
     559              :     ///
     560              :     /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
     561              :     /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
     562              :     /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
     563              :     /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
     564              :     ///
     565         1638 :     pub(crate) fn record_residence_event(
     566         1638 :         &self,
     567         1638 :         status: LayerResidenceStatus,
     568         1638 :         reason: LayerResidenceEventReason,
     569         1638 :     ) {
     570         1638 :         let mut locked = self.0.lock().unwrap();
     571         3276 :         locked.iter_mut().for_each(|inner| {
     572         3276 :             inner
     573         3276 :                 .last_residence_changes
     574         3276 :                 .write(LayerResidenceEvent::new(status, reason))
     575         3276 :         });
     576         1638 :     }
     577              : 
     578       212130 :     fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
     579       212130 :         if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
     580          352 :             return;
     581       211778 :         }
     582       211778 : 
     583       211778 :         let this_access = LayerAccessStatFullDetails {
     584       211778 :             when: SystemTime::now(),
     585       211778 :             task_kind: ctx.task_kind(),
     586       211778 :             access_kind,
     587       211778 :         };
     588       211778 : 
     589       211778 :         let mut locked = self.0.lock().unwrap();
     590       423556 :         locked.iter_mut().for_each(|inner| {
     591       423556 :             inner.first_access.get_or_insert(this_access);
     592       423556 :             inner.count_by_access_kind[access_kind] += 1;
     593       423556 :             inner.task_kind_flag |= ctx.task_kind();
     594       423556 :             inner.last_accesses.write(this_access);
     595       423556 :         })
     596       212130 :     }
     597              : 
     598            0 :     fn as_api_model(
     599            0 :         &self,
     600            0 :         reset: LayerAccessStatsReset,
     601            0 :     ) -> pageserver_api::models::LayerAccessStats {
     602            0 :         let mut locked = self.0.lock().unwrap();
     603            0 :         let inner = &mut locked.for_scraping_api;
     604            0 :         let LayerAccessStatsInner {
     605            0 :             first_access,
     606            0 :             count_by_access_kind,
     607            0 :             task_kind_flag,
     608            0 :             last_accesses,
     609            0 :             last_residence_changes,
     610            0 :         } = inner;
     611            0 :         let ret = pageserver_api::models::LayerAccessStats {
     612            0 :             access_count_by_access_kind: count_by_access_kind
     613            0 :                 .iter()
     614            0 :                 .map(|(kind, count)| (kind, *count))
     615            0 :                 .collect(),
     616            0 :             task_kind_access_flag: task_kind_flag
     617            0 :                 .iter()
     618            0 :                 .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
     619            0 :                 .collect(),
     620            0 :             first: first_access.as_ref().map(|a| a.as_api_model()),
     621            0 :             accesses_history: last_accesses.map(|m| m.as_api_model()),
     622            0 :             residence_events_history: last_residence_changes.clone(),
     623            0 :         };
     624            0 :         match reset {
     625            0 :             LayerAccessStatsReset::NoReset => (),
     626            0 :             LayerAccessStatsReset::JustTaskKindFlags => {
     627            0 :                 inner.task_kind_flag.clear();
     628            0 :             }
     629            0 :             LayerAccessStatsReset::AllStats => {
     630            0 :                 *inner = LayerAccessStatsInner::default();
     631            0 :             }
     632              :         }
     633            0 :         ret
     634            0 :     }
     635              : 
     636              :     /// Get the latest access timestamp, falling back to latest residence event, further falling
     637              :     /// back to `SystemTime::now` for a usable timestamp for eviction.
     638            0 :     pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
     639            0 :         self.latest_activity().unwrap_or_else(SystemTime::now)
     640            0 :     }
     641              : 
     642              :     /// Get the latest access timestamp, falling back to latest residence event.
     643              :     ///
     644              :     /// This function can only return `None` if there has not yet been a call to the
     645              :     /// [`record_residence_event`] method. That would generally be considered an
     646              :     /// implementation error. This function logs a rate-limited warning in that case.
     647              :     ///
     648              :     /// TODO: use type system to avoid the need for `fallback`.
     649              :     /// The approach in <https://github.com/neondatabase/neon/pull/3775>
     650              :     /// could be used to enforce that a residence event is recorded
     651              :     /// before a layer is added to the layer map. We could also have
     652              :     /// a layer wrapper type that holds the LayerAccessStats, and ensure
     653              :     /// that that type can only be produced by inserting into the layer map.
     654              :     ///
     655              :     /// [`record_residence_event`]: Self::record_residence_event
     656            0 :     fn latest_activity(&self) -> Option<SystemTime> {
     657            0 :         let locked = self.0.lock().unwrap();
     658            0 :         let inner = &locked.for_eviction_policy;
     659            0 :         match inner.last_accesses.recent() {
     660            0 :             Some(a) => Some(a.when),
     661            0 :             None => match inner.last_residence_changes.recent() {
     662            0 :                 Some(e) => Some(e.timestamp),
     663              :                 None => {
     664              :                     static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
     665            0 :                         Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
     666            0 :                     let mut guard = WARN_RATE_LIMIT.lock().unwrap();
     667            0 :                     guard.0 += 1;
     668            0 :                     let occurences = guard.0;
     669            0 :                     guard.1.call(move || {
     670            0 :                         warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
     671            0 :                     });
     672            0 :                     None
     673              :                 }
     674              :             },
     675              :         }
     676            0 :     }
     677              : 
     678              :     /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
     679              :     ///
     680              :     /// This indicates whether the layer has been used for some purpose that would motivate
     681              :     /// us to keep it on disk, such as for serving a getpage request.
     682           18 :     fn accessed(&self) -> bool {
     683           18 :         let locked = self.0.lock().unwrap();
     684           18 :         let inner = &locked.for_eviction_policy;
     685           18 : 
     686           18 :         // Consider it accessed if the most recent access is more recent than
     687           18 :         // the most recent change in residence status.
     688           18 :         match (
     689           18 :             inner.last_accesses.recent(),
     690           18 :             inner.last_residence_changes.recent(),
     691              :         ) {
     692           14 :             (None, _) => false,
     693            0 :             (Some(_), None) => true,
     694            4 :             (Some(a), Some(r)) => a.when >= r.timestamp,
     695              :         }
     696           18 :     }
     697              : }
     698              : 
     699              : /// Get a layer descriptor from a layer.
     700              : pub trait AsLayerDesc {
     701              :     /// Get the layer descriptor.
     702              :     fn layer_desc(&self) -> &PersistentLayerDesc;
     703              : }
     704              : 
     705              : pub mod tests {
     706              :     use pageserver_api::shard::TenantShardId;
     707              : 
     708              :     use super::*;
     709              : 
     710              :     impl From<DeltaLayerName> for PersistentLayerDesc {
     711            0 :         fn from(value: DeltaLayerName) -> Self {
     712            0 :             PersistentLayerDesc::new_delta(
     713            0 :                 TenantShardId::from([0; 18]),
     714            0 :                 TimelineId::from_array([0; 16]),
     715            0 :                 value.key_range,
     716            0 :                 value.lsn_range,
     717            0 :                 233,
     718            0 :             )
     719            0 :         }
     720              :     }
     721              : 
     722              :     impl From<ImageLayerName> for PersistentLayerDesc {
     723            0 :         fn from(value: ImageLayerName) -> Self {
     724            0 :             PersistentLayerDesc::new_img(
     725            0 :                 TenantShardId::from([0; 18]),
     726            0 :                 TimelineId::from_array([0; 16]),
     727            0 :                 value.key_range,
     728            0 :                 value.lsn,
     729            0 :                 233,
     730            0 :             )
     731            0 :         }
     732              :     }
     733              : 
     734              :     impl From<LayerName> for PersistentLayerDesc {
     735            0 :         fn from(value: LayerName) -> Self {
     736            0 :             match value {
     737            0 :                 LayerName::Delta(d) => Self::from(d),
     738            0 :                 LayerName::Image(i) => Self::from(i),
     739              :             }
     740            0 :         }
     741              :     }
     742              : }
     743              : 
     744              : /// Range wrapping newtype, which uses display to render Debug.
     745              : ///
     746              : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
     747              : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
     748              : 
     749              : impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
     750            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     751            0 :         write!(f, "{}..{}", self.0.start, self.0.end)
     752            0 :     }
     753              : }
        

Generated by: LCOV version 2.1-beta