LCOV - code coverage report
Current view: top level - pageserver/src/tenant - storage_layer.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 55.4 % 323 179
Test Date: 2024-05-10 13:18:37 Functions: 54.9 % 51 28

            Line data    Source code
       1              : //! Common traits and structs for layers
       2              : 
       3              : pub mod delta_layer;
       4              : pub mod image_layer;
       5              : pub(crate) mod inmemory_layer;
       6              : pub(crate) mod layer;
       7              : mod layer_desc;
       8              : mod layer_name;
       9              : 
      10              : use crate::context::{AccessStatsBehavior, RequestContext};
      11              : use crate::repository::Value;
      12              : use crate::task_mgr::TaskKind;
      13              : use crate::walrecord::NeonWalRecord;
      14              : use bytes::Bytes;
      15              : use enum_map::EnumMap;
      16              : use enumset::EnumSet;
      17              : use once_cell::sync::Lazy;
      18              : use pageserver_api::key::Key;
      19              : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
      20              : use pageserver_api::models::{
      21              :     LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
      22              : };
      23              : use std::borrow::Cow;
      24              : use std::cmp::{Ordering, Reverse};
      25              : use std::collections::hash_map::Entry;
      26              : use std::collections::{BinaryHeap, HashMap};
      27              : use std::ops::Range;
      28              : use std::sync::{Arc, Mutex};
      29              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
      30              : use tracing::warn;
      31              : use utils::history_buffer::HistoryBufferWithDropCounter;
      32              : use utils::rate_limit::RateLimit;
      33              : 
      34              : use utils::{id::TimelineId, lsn::Lsn};
      35              : 
      36              : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
      37              : pub use image_layer::{ImageLayer, ImageLayerWriter};
      38              : pub use inmemory_layer::InMemoryLayer;
      39              : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
      40              : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
      41              : 
      42              : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
      43              : 
      44              : use self::inmemory_layer::InMemoryLayerFileId;
      45              : 
      46              : use super::timeline::GetVectoredError;
      47              : use super::PageReconstructError;
      48              : 
      49            0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
      50            0 : where
      51            0 :     T: PartialOrd<T>,
      52            0 : {
      53            0 :     if a.start < b.start {
      54            0 :         a.end > b.start
      55              :     } else {
      56            0 :         b.end > a.start
      57              :     }
      58            0 : }
      59              : 
      60              : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
      61              : ///
      62              : /// Before first call, you can fill in 'page_img' if you have an older cached
      63              : /// version of the page available. That can save work in
      64              : /// 'get_value_reconstruct_data', as it can stop searching for page versions
      65              : /// when all the WAL records going back to the cached image have been collected.
      66              : ///
      67              : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
      68              : /// of the page, or the oldest WAL record in 'records' is a will_init-type
      69              : /// record that initializes the page without requiring a previous image.
      70              : ///
      71              : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
      72              : /// been collected, but there are more records outside the current layer. Pass
      73              : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
      74              : /// call, to collect more records.
      75              : ///
      76              : #[derive(Debug, Default)]
      77              : pub struct ValueReconstructState {
      78              :     pub records: Vec<(Lsn, NeonWalRecord)>,
      79              :     pub img: Option<(Lsn, Bytes)>,
      80              : }
      81              : 
      82              : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
      83              : pub(crate) enum ValueReconstructSituation {
      84              :     Complete,
      85              :     #[default]
      86              :     Continue,
      87              : }
      88              : 
      89              : /// Reconstruct data accumulated for a single key during a vectored get
      90              : #[derive(Debug, Default, Clone)]
      91              : pub(crate) struct VectoredValueReconstructState {
      92              :     pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
      93              :     pub(crate) img: Option<(Lsn, Bytes)>,
      94              : 
      95              :     situation: ValueReconstructSituation,
      96              : }
      97              : 
      98              : impl VectoredValueReconstructState {
      99        61421 :     fn get_cached_lsn(&self) -> Option<Lsn> {
     100        61421 :         self.img.as_ref().map(|img| img.0)
     101        61421 :     }
     102              : }
     103              : 
     104              : impl From<VectoredValueReconstructState> for ValueReconstructState {
     105        20388 :     fn from(mut state: VectoredValueReconstructState) -> Self {
     106        20388 :         // walredo expects the records to be descending in terms of Lsn
     107        20388 :         state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
     108        20388 : 
     109        20388 :         ValueReconstructState {
     110        20388 :             records: state.records,
     111        20388 :             img: state.img,
     112        20388 :         }
     113        20388 :     }
     114              : }
     115              : 
     116              : /// Bag of data accumulated during a vectored get
     117              : pub(crate) struct ValuesReconstructState {
     118              :     pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
     119              : 
     120              :     keys_done: KeySpaceRandomAccum,
     121              :     layers_visited: u32,
     122              : }
     123              : 
     124              : impl ValuesReconstructState {
     125          266 :     pub(crate) fn new() -> Self {
     126          266 :         Self {
     127          266 :             keys: HashMap::new(),
     128          266 :             keys_done: KeySpaceRandomAccum::new(),
     129          266 :             layers_visited: 0,
     130          266 :         }
     131          266 :     }
     132              : 
     133              :     /// Associate a key with the error which it encountered and mark it as done
     134            0 :     pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
     135            0 :         let previous = self.keys.insert(key, Err(err));
     136            0 :         if let Some(Ok(state)) = previous {
     137            0 :             if state.situation == ValueReconstructSituation::Continue {
     138            0 :                 self.keys_done.add_key(key);
     139            0 :             }
     140            0 :         }
     141            0 :     }
     142              : 
     143          152 :     pub(crate) fn on_layer_visited(&mut self) {
     144          152 :         self.layers_visited += 1;
     145          152 :     }
     146              : 
     147           60 :     pub(crate) fn get_layers_visited(&self) -> u32 {
     148           60 :         self.layers_visited
     149           60 :     }
     150              : 
     151              :     /// This function is called after reading a keyspace from a layer.
     152              :     /// It checks if the read path has now moved past the cached Lsn for any keys.
     153              :     ///
     154              :     /// Implementation note: We intentionally iterate over the keys for which we've
     155              :     /// already collected some reconstruct data. This avoids scaling complexity with
     156              :     /// the size of the search space.
     157          148 :     pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
     158        83905 :         for (key, value) in self.keys.iter_mut() {
     159        83905 :             if !keyspace.contains(key) {
     160        63517 :                 continue;
     161        20388 :             }
     162              : 
     163        20388 :             if let Ok(state) = value {
     164        20388 :                 if state.situation != ValueReconstructSituation::Complete
     165            0 :                     && state.get_cached_lsn() >= Some(advanced_to)
     166            0 :                 {
     167            0 :                     state.situation = ValueReconstructSituation::Complete;
     168            0 :                     self.keys_done.add_key(*key);
     169        20388 :                 }
     170            0 :             }
     171              :         }
     172          148 :     }
     173              : 
     174              :     /// Update the state collected for a given key.
     175              :     /// Returns true if this was the last value needed for the key and false otherwise.
     176              :     ///
     177              :     /// If the key is done after the update, mark it as such.
     178        20388 :     pub(crate) fn update_key(
     179        20388 :         &mut self,
     180        20388 :         key: &Key,
     181        20388 :         lsn: Lsn,
     182        20388 :         value: Value,
     183        20388 :     ) -> ValueReconstructSituation {
     184        20388 :         let state = self
     185        20388 :             .keys
     186        20388 :             .entry(*key)
     187        20388 :             .or_insert(Ok(VectoredValueReconstructState::default()));
     188              : 
     189        20388 :         if let Ok(state) = state {
     190        20388 :             let key_done = match state.situation {
     191            0 :                 ValueReconstructSituation::Complete => unreachable!(),
     192        20388 :                 ValueReconstructSituation::Continue => match value {
     193        20388 :                     Value::Image(img) => {
     194        20388 :                         state.img = Some((lsn, img));
     195        20388 :                         true
     196              :                     }
     197            0 :                     Value::WalRecord(rec) => {
     198            0 :                         debug_assert!(
     199            0 :                             Some(lsn) > state.get_cached_lsn(),
     200            0 :                             "Attempt to collect a record below cached LSN for walredo: {} < {}",
     201            0 :                             lsn,
     202            0 :                             state
     203            0 :                                 .get_cached_lsn()
     204            0 :                                 .expect("Assertion can only fire if a cached lsn is present")
     205              :                         );
     206              : 
     207            0 :                         let will_init = rec.will_init();
     208            0 :                         state.records.push((lsn, rec));
     209            0 :                         will_init
     210              :                     }
     211              :                 },
     212              :             };
     213              : 
     214        20388 :             if key_done && state.situation == ValueReconstructSituation::Continue {
     215        20388 :                 state.situation = ValueReconstructSituation::Complete;
     216        20388 :                 self.keys_done.add_key(*key);
     217        20388 :             }
     218              : 
     219        20388 :             state.situation
     220              :         } else {
     221            0 :             ValueReconstructSituation::Complete
     222              :         }
     223        20388 :     }
     224              : 
     225              :     /// Returns the Lsn at which this key is cached if one exists.
     226              :     /// The read path should go no further than this Lsn for the given key.
     227       153275 :     pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
     228       153275 :         self.keys
     229       153275 :             .get(key)
     230       153275 :             .and_then(|k| k.as_ref().ok())
     231       153275 :             .and_then(|state| state.get_cached_lsn())
     232       153275 :     }
     233              : 
     234              :     /// Returns the key space describing the keys that have
     235              :     /// been marked as completed since the last call to this function.
     236          248 :     pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
     237          248 :         self.keys_done.consume_keyspace()
     238          248 :     }
     239              : }
     240              : 
     241              : impl Default for ValuesReconstructState {
     242           20 :     fn default() -> Self {
     243           20 :         Self::new()
     244           20 :     }
     245              : }
     246              : 
     247              : /// A key that uniquely identifies a layer in a timeline
     248              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     249              : pub(crate) enum LayerId {
     250              :     PersitentLayerId(PersistentLayerKey),
     251              :     InMemoryLayerId(InMemoryLayerFileId),
     252              : }
     253              : 
     254              : /// Layer wrapper for the read path. Note that it is valid
     255              : /// to use these layers even after external operations have
     256              : /// been performed on them (compaction, freeze, etc.).
     257              : #[derive(Debug)]
     258              : pub(crate) enum ReadableLayer {
     259              :     PersistentLayer(Layer),
     260              :     InMemoryLayer(Arc<InMemoryLayer>),
     261              : }
     262              : 
     263              : /// A partial description of a read to be done.
     264              : #[derive(Debug, Clone)]
     265              : struct ReadDesc {
     266              :     /// An id used to resolve the readable layer within the fringe
     267              :     layer_id: LayerId,
     268              :     /// Lsn range for the read, used for selecting the next read
     269              :     lsn_range: Range<Lsn>,
     270              : }
     271              : 
     272              : /// Data structure which maintains a fringe of layers for the
     273              : /// read path. The fringe is the set of layers which intersects
     274              : /// the current keyspace that the search is descending on.
     275              : /// Each layer tracks the keyspace that intersects it.
     276              : ///
     277              : /// The fringe must appear sorted by Lsn. Hence, it uses
     278              : /// a two layer indexing scheme.
     279              : #[derive(Debug)]
     280              : pub(crate) struct LayerFringe {
     281              :     planned_reads_by_lsn: BinaryHeap<ReadDesc>,
     282              :     layers: HashMap<LayerId, LayerKeyspace>,
     283              : }
     284              : 
     285              : #[derive(Debug)]
     286              : struct LayerKeyspace {
     287              :     layer: ReadableLayer,
     288              :     target_keyspace: KeySpace,
     289              : }
     290              : 
     291              : impl LayerFringe {
     292           96 :     pub(crate) fn new() -> Self {
     293           96 :         LayerFringe {
     294           96 :             planned_reads_by_lsn: BinaryHeap::new(),
     295           96 :             layers: HashMap::new(),
     296           96 :         }
     297           96 :     }
     298              : 
     299          248 :     pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
     300          248 :         let read_desc = match self.planned_reads_by_lsn.pop() {
     301          152 :             Some(desc) => desc,
     302           96 :             None => return None,
     303              :         };
     304              : 
     305          152 :         let removed = self.layers.remove_entry(&read_desc.layer_id);
     306          152 :         match removed {
     307              :             Some((
     308              :                 _,
     309              :                 LayerKeyspace {
     310          152 :                     layer,
     311          152 :                     target_keyspace,
     312          152 :                 },
     313          152 :             )) => Some((layer, target_keyspace, read_desc.lsn_range)),
     314            0 :             None => unreachable!("fringe internals are always consistent"),
     315              :         }
     316          248 :     }
     317              : 
     318        63526 :     pub(crate) fn update(
     319        63526 :         &mut self,
     320        63526 :         layer: ReadableLayer,
     321        63526 :         keyspace: KeySpace,
     322        63526 :         lsn_range: Range<Lsn>,
     323        63526 :     ) {
     324        63526 :         let layer_id = layer.id();
     325        63526 :         let entry = self.layers.entry(layer_id.clone());
     326        63526 :         match entry {
     327        63374 :             Entry::Occupied(mut entry) => {
     328        63374 :                 entry.get_mut().target_keyspace.merge(&keyspace);
     329        63374 :             }
     330          152 :             Entry::Vacant(entry) => {
     331          152 :                 self.planned_reads_by_lsn.push(ReadDesc {
     332          152 :                     lsn_range,
     333          152 :                     layer_id: layer_id.clone(),
     334          152 :                 });
     335          152 :                 entry.insert(LayerKeyspace {
     336          152 :                     layer,
     337          152 :                     target_keyspace: keyspace,
     338          152 :                 });
     339          152 :             }
     340              :         }
     341        63526 :     }
     342              : }
     343              : 
     344              : impl Default for LayerFringe {
     345            0 :     fn default() -> Self {
     346            0 :         Self::new()
     347            0 :     }
     348              : }
     349              : 
     350              : impl Ord for ReadDesc {
     351            6 :     fn cmp(&self, other: &Self) -> Ordering {
     352            6 :         let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
     353            6 :         if ord == std::cmp::Ordering::Equal {
     354            6 :             self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
     355              :         } else {
     356            0 :             ord
     357              :         }
     358            6 :     }
     359              : }
     360              : 
     361              : impl PartialOrd for ReadDesc {
     362            6 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     363            6 :         Some(self.cmp(other))
     364            6 :     }
     365              : }
     366              : 
     367              : impl PartialEq for ReadDesc {
     368            0 :     fn eq(&self, other: &Self) -> bool {
     369            0 :         self.lsn_range == other.lsn_range
     370            0 :     }
     371              : }
     372              : 
     373              : impl Eq for ReadDesc {}
     374              : 
     375              : impl ReadableLayer {
     376        63526 :     pub(crate) fn id(&self) -> LayerId {
     377        63526 :         match self {
     378        63524 :             Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
     379            2 :             Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
     380              :         }
     381        63526 :     }
     382              : 
     383          152 :     pub(crate) async fn get_values_reconstruct_data(
     384          152 :         &self,
     385          152 :         keyspace: KeySpace,
     386          152 :         lsn_range: Range<Lsn>,
     387          152 :         reconstruct_state: &mut ValuesReconstructState,
     388          152 :         ctx: &RequestContext,
     389          152 :     ) -> Result<(), GetVectoredError> {
     390          152 :         match self {
     391          150 :             ReadableLayer::PersistentLayer(layer) => {
     392          150 :                 layer
     393          150 :                     .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
     394         7385 :                     .await
     395              :             }
     396            2 :             ReadableLayer::InMemoryLayer(layer) => {
     397            2 :                 layer
     398            2 :                     .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
     399           26 :                     .await
     400              :             }
     401              :         }
     402          152 :     }
     403              : }
     404              : 
     405              : /// Return value from [`Layer::get_value_reconstruct_data`]
     406              : #[derive(Clone, Copy, Debug)]
     407              : pub enum ValueReconstructResult {
     408              :     /// Got all the data needed to reconstruct the requested page
     409              :     Complete,
     410              :     /// This layer didn't contain all the required data, the caller should look up
     411              :     /// the predecessor layer at the returned LSN and collect more data from there.
     412              :     Continue,
     413              : 
     414              :     /// This layer didn't contain data needed to reconstruct the page version at
     415              :     /// the returned LSN. This is usually considered an error, but might be OK
     416              :     /// in some circumstances.
     417              :     Missing,
     418              : }
     419              : 
     420              : #[derive(Debug)]
     421              : pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
     422              : 
     423              : /// This struct holds two instances of [`LayerAccessStatsInner`].
     424              : /// Accesses are recorded to both instances.
     425              : /// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
     426              : /// The `for_eviction_policy` is never reset.
     427              : #[derive(Debug, Default, Clone)]
     428              : struct LayerAccessStatsLocked {
     429              :     for_scraping_api: LayerAccessStatsInner,
     430              :     for_eviction_policy: LayerAccessStatsInner,
     431              : }
     432              : 
     433              : impl LayerAccessStatsLocked {
     434       210946 :     fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
     435       210946 :         [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
     436       210946 :     }
     437              : }
     438              : 
     439              : #[derive(Debug, Default, Clone)]
     440              : struct LayerAccessStatsInner {
     441              :     first_access: Option<LayerAccessStatFullDetails>,
     442              :     count_by_access_kind: EnumMap<LayerAccessKind, u64>,
     443              :     task_kind_flag: EnumSet<TaskKind>,
     444              :     last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
     445              :     last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
     446              : }
     447              : 
     448              : #[derive(Debug, Clone, Copy)]
     449              : pub(crate) struct LayerAccessStatFullDetails {
     450              :     pub(crate) when: SystemTime,
     451              :     pub(crate) task_kind: TaskKind,
     452              :     pub(crate) access_kind: LayerAccessKind,
     453              : }
     454              : 
     455            0 : #[derive(Clone, Copy, strum_macros::EnumString)]
     456              : pub enum LayerAccessStatsReset {
     457              :     NoReset,
     458              :     JustTaskKindFlags,
     459              :     AllStats,
     460              : }
     461              : 
     462            0 : fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
     463            0 :     ts.duration_since(UNIX_EPOCH)
     464            0 :         .expect("better to die in this unlikely case than report false stats")
     465            0 :         .as_millis()
     466            0 :         .try_into()
     467            0 :         .expect("64 bits is enough for few more years")
     468            0 : }
     469              : 
     470              : impl LayerAccessStatFullDetails {
     471            0 :     fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
     472            0 :         let Self {
     473            0 :             when,
     474            0 :             task_kind,
     475            0 :             access_kind,
     476            0 :         } = self;
     477            0 :         pageserver_api::models::LayerAccessStatFullDetails {
     478            0 :             when_millis_since_epoch: system_time_to_millis_since_epoch(when),
     479            0 :             task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
     480            0 :             access_kind: *access_kind,
     481            0 :         }
     482            0 :     }
     483              : }
     484              : 
     485              : impl LayerAccessStats {
     486              :     /// Create an empty stats object.
     487              :     ///
     488              :     /// The caller is responsible for recording a residence event
     489              :     /// using [`record_residence_event`] before calling `latest_activity`.
     490              :     /// If they don't, [`latest_activity`] will return `None`.
     491              :     ///
     492              :     /// [`record_residence_event`]: Self::record_residence_event
     493              :     /// [`latest_activity`]: Self::latest_activity
     494         1178 :     pub(crate) fn empty_will_record_residence_event_later() -> Self {
     495         1178 :         LayerAccessStats(Mutex::default())
     496         1178 :     }
     497              : 
     498              :     /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
     499              :     ///
     500              :     /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
     501              :     ///
     502              :     /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
     503              :     /// [`record_residence_event`]: Self::record_residence_event
     504           24 :     pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
     505           24 :         let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
     506           24 :         new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
     507           24 :         new
     508           24 :     }
     509              : 
     510              :     /// Record a change in layer residency.
     511              :     ///
     512              :     /// Recording the event must happen while holding the layer map lock to
     513              :     /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
     514              :     /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
     515              :     ///
     516              :     /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
     517              :     /// the following race could happen:
     518              :     ///
     519              :     /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
     520              :     /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
     521              :     /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
     522              :     /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
     523              :     ///
     524         1226 :     pub(crate) fn record_residence_event(
     525         1226 :         &self,
     526         1226 :         status: LayerResidenceStatus,
     527         1226 :         reason: LayerResidenceEventReason,
     528         1226 :     ) {
     529         1226 :         let mut locked = self.0.lock().unwrap();
     530         2452 :         locked.iter_mut().for_each(|inner| {
     531         2452 :             inner
     532         2452 :                 .last_residence_changes
     533         2452 :                 .write(LayerResidenceEvent::new(status, reason))
     534         2452 :         });
     535         1226 :     }
     536              : 
     537       209720 :     fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
     538       209720 :         if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
     539            0 :             return;
     540       209720 :         }
     541       209720 : 
     542       209720 :         let this_access = LayerAccessStatFullDetails {
     543       209720 :             when: SystemTime::now(),
     544       209720 :             task_kind: ctx.task_kind(),
     545       209720 :             access_kind,
     546       209720 :         };
     547       209720 : 
     548       209720 :         let mut locked = self.0.lock().unwrap();
     549       419440 :         locked.iter_mut().for_each(|inner| {
     550       419440 :             inner.first_access.get_or_insert(this_access);
     551       419440 :             inner.count_by_access_kind[access_kind] += 1;
     552       419440 :             inner.task_kind_flag |= ctx.task_kind();
     553       419440 :             inner.last_accesses.write(this_access);
     554       419440 :         })
     555       209720 :     }
     556              : 
     557            0 :     fn as_api_model(
     558            0 :         &self,
     559            0 :         reset: LayerAccessStatsReset,
     560            0 :     ) -> pageserver_api::models::LayerAccessStats {
     561            0 :         let mut locked = self.0.lock().unwrap();
     562            0 :         let inner = &mut locked.for_scraping_api;
     563            0 :         let LayerAccessStatsInner {
     564            0 :             first_access,
     565            0 :             count_by_access_kind,
     566            0 :             task_kind_flag,
     567            0 :             last_accesses,
     568            0 :             last_residence_changes,
     569            0 :         } = inner;
     570            0 :         let ret = pageserver_api::models::LayerAccessStats {
     571            0 :             access_count_by_access_kind: count_by_access_kind
     572            0 :                 .iter()
     573            0 :                 .map(|(kind, count)| (kind, *count))
     574            0 :                 .collect(),
     575            0 :             task_kind_access_flag: task_kind_flag
     576            0 :                 .iter()
     577            0 :                 .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
     578            0 :                 .collect(),
     579            0 :             first: first_access.as_ref().map(|a| a.as_api_model()),
     580            0 :             accesses_history: last_accesses.map(|m| m.as_api_model()),
     581            0 :             residence_events_history: last_residence_changes.clone(),
     582            0 :         };
     583            0 :         match reset {
     584            0 :             LayerAccessStatsReset::NoReset => (),
     585            0 :             LayerAccessStatsReset::JustTaskKindFlags => {
     586            0 :                 inner.task_kind_flag.clear();
     587            0 :             }
     588            0 :             LayerAccessStatsReset::AllStats => {
     589            0 :                 *inner = LayerAccessStatsInner::default();
     590            0 :             }
     591              :         }
     592            0 :         ret
     593            0 :     }
     594              : 
     595              :     /// Get the latest access timestamp, falling back to latest residence event, further falling
     596              :     /// back to `SystemTime::now` for a usable timestamp for eviction.
     597            0 :     pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
     598            0 :         self.latest_activity().unwrap_or_else(SystemTime::now)
     599            0 :     }
     600              : 
     601              :     /// Get the latest access timestamp, falling back to latest residence event.
     602              :     ///
     603              :     /// This function can only return `None` if there has not yet been a call to the
     604              :     /// [`record_residence_event`] method. That would generally be considered an
     605              :     /// implementation error. This function logs a rate-limited warning in that case.
     606              :     ///
     607              :     /// TODO: use type system to avoid the need for `fallback`.
     608              :     /// The approach in <https://github.com/neondatabase/neon/pull/3775>
     609              :     /// could be used to enforce that a residence event is recorded
     610              :     /// before a layer is added to the layer map. We could also have
     611              :     /// a layer wrapper type that holds the LayerAccessStats, and ensure
     612              :     /// that that type can only be produced by inserting into the layer map.
     613              :     ///
     614              :     /// [`record_residence_event`]: Self::record_residence_event
     615            0 :     fn latest_activity(&self) -> Option<SystemTime> {
     616            0 :         let locked = self.0.lock().unwrap();
     617            0 :         let inner = &locked.for_eviction_policy;
     618            0 :         match inner.last_accesses.recent() {
     619            0 :             Some(a) => Some(a.when),
     620            0 :             None => match inner.last_residence_changes.recent() {
     621            0 :                 Some(e) => Some(e.timestamp),
     622              :                 None => {
     623              :                     static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
     624            0 :                         Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
     625            0 :                     let mut guard = WARN_RATE_LIMIT.lock().unwrap();
     626            0 :                     guard.0 += 1;
     627            0 :                     let occurences = guard.0;
     628            0 :                     guard.1.call(move || {
     629            0 :                         warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
     630            0 :                     });
     631            0 :                     None
     632              :                 }
     633              :             },
     634              :         }
     635            0 :     }
     636              : }
     637              : 
     638              : /// Get a layer descriptor from a layer.
     639              : pub trait AsLayerDesc {
     640              :     /// Get the layer descriptor.
     641              :     fn layer_desc(&self) -> &PersistentLayerDesc;
     642              : }
     643              : 
     644              : pub mod tests {
     645              :     use pageserver_api::shard::TenantShardId;
     646              : 
     647              :     use super::*;
     648              : 
     649              :     impl From<DeltaLayerName> for PersistentLayerDesc {
     650            0 :         fn from(value: DeltaLayerName) -> Self {
     651            0 :             PersistentLayerDesc::new_delta(
     652            0 :                 TenantShardId::from([0; 18]),
     653            0 :                 TimelineId::from_array([0; 16]),
     654            0 :                 value.key_range,
     655            0 :                 value.lsn_range,
     656            0 :                 233,
     657            0 :             )
     658            0 :         }
     659              :     }
     660              : 
     661              :     impl From<ImageLayerName> for PersistentLayerDesc {
     662            0 :         fn from(value: ImageLayerName) -> Self {
     663            0 :             PersistentLayerDesc::new_img(
     664            0 :                 TenantShardId::from([0; 18]),
     665            0 :                 TimelineId::from_array([0; 16]),
     666            0 :                 value.key_range,
     667            0 :                 value.lsn,
     668            0 :                 233,
     669            0 :             )
     670            0 :         }
     671              :     }
     672              : 
     673              :     impl From<LayerName> for PersistentLayerDesc {
     674            0 :         fn from(value: LayerName) -> Self {
     675            0 :             match value {
     676            0 :                 LayerName::Delta(d) => Self::from(d),
     677            0 :                 LayerName::Image(i) => Self::from(i),
     678              :             }
     679            0 :         }
     680              :     }
     681              : }
     682              : 
     683              : /// Range wrapping newtype, which uses display to render Debug.
     684              : ///
     685              : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
     686              : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
     687              : 
     688              : impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
     689            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     690            0 :         write!(f, "{}..{}", self.0.start, self.0.end)
     691            0 :     }
     692              : }
        

Generated by: LCOV version 2.1-beta