Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod delta_layer;
4 : pub mod image_layer;
5 : pub(crate) mod inmemory_layer;
6 : pub(crate) mod layer;
7 : mod layer_desc;
8 : mod layer_name;
9 :
10 : use crate::context::{AccessStatsBehavior, RequestContext};
11 : use crate::repository::Value;
12 : use crate::task_mgr::TaskKind;
13 : use crate::walrecord::NeonWalRecord;
14 : use bytes::Bytes;
15 : use enum_map::EnumMap;
16 : use enumset::EnumSet;
17 : use once_cell::sync::Lazy;
18 : use pageserver_api::key::Key;
19 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
20 : use pageserver_api::models::{
21 : LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
22 : };
23 : use std::borrow::Cow;
24 : use std::cmp::{Ordering, Reverse};
25 : use std::collections::hash_map::Entry;
26 : use std::collections::{BinaryHeap, HashMap};
27 : use std::ops::Range;
28 : use std::sync::{Arc, Mutex};
29 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
30 : use tracing::warn;
31 : use utils::history_buffer::HistoryBufferWithDropCounter;
32 : use utils::rate_limit::RateLimit;
33 :
34 : use utils::{id::TimelineId, lsn::Lsn};
35 :
36 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
37 : pub use image_layer::{ImageLayer, ImageLayerWriter};
38 : pub use inmemory_layer::InMemoryLayer;
39 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
40 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
41 :
42 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
43 :
44 : use self::inmemory_layer::InMemoryLayerFileId;
45 :
46 : use super::timeline::GetVectoredError;
47 : use super::PageReconstructError;
48 :
49 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
50 0 : where
51 0 : T: PartialOrd<T>,
52 0 : {
53 0 : if a.start < b.start {
54 0 : a.end > b.start
55 : } else {
56 0 : b.end > a.start
57 : }
58 0 : }
59 :
60 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
61 : ///
62 : /// Before first call, you can fill in 'page_img' if you have an older cached
63 : /// version of the page available. That can save work in
64 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
65 : /// when all the WAL records going back to the cached image have been collected.
66 : ///
67 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
68 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
69 : /// record that initializes the page without requiring a previous image.
70 : ///
71 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
72 : /// been collected, but there are more records outside the current layer. Pass
73 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
74 : /// call, to collect more records.
75 : ///
76 : #[derive(Debug, Default)]
77 : pub struct ValueReconstructState {
78 : pub records: Vec<(Lsn, NeonWalRecord)>,
79 : pub img: Option<(Lsn, Bytes)>,
80 : }
81 :
82 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
83 : pub(crate) enum ValueReconstructSituation {
84 : Complete,
85 : #[default]
86 : Continue,
87 : }
88 :
89 : /// Reconstruct data accumulated for a single key during a vectored get
90 : #[derive(Debug, Default, Clone)]
91 : pub(crate) struct VectoredValueReconstructState {
92 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
93 : pub(crate) img: Option<(Lsn, Bytes)>,
94 :
95 : situation: ValueReconstructSituation,
96 : }
97 :
98 : impl VectoredValueReconstructState {
99 61421 : fn get_cached_lsn(&self) -> Option<Lsn> {
100 61421 : self.img.as_ref().map(|img| img.0)
101 61421 : }
102 : }
103 :
104 : impl From<VectoredValueReconstructState> for ValueReconstructState {
105 20388 : fn from(mut state: VectoredValueReconstructState) -> Self {
106 20388 : // walredo expects the records to be descending in terms of Lsn
107 20388 : state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
108 20388 :
109 20388 : ValueReconstructState {
110 20388 : records: state.records,
111 20388 : img: state.img,
112 20388 : }
113 20388 : }
114 : }
115 :
116 : /// Bag of data accumulated during a vectored get
117 : pub(crate) struct ValuesReconstructState {
118 : pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
119 :
120 : keys_done: KeySpaceRandomAccum,
121 : layers_visited: u32,
122 : }
123 :
124 : impl ValuesReconstructState {
125 266 : pub(crate) fn new() -> Self {
126 266 : Self {
127 266 : keys: HashMap::new(),
128 266 : keys_done: KeySpaceRandomAccum::new(),
129 266 : layers_visited: 0,
130 266 : }
131 266 : }
132 :
133 : /// Associate a key with the error which it encountered and mark it as done
134 0 : pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
135 0 : let previous = self.keys.insert(key, Err(err));
136 0 : if let Some(Ok(state)) = previous {
137 0 : if state.situation == ValueReconstructSituation::Continue {
138 0 : self.keys_done.add_key(key);
139 0 : }
140 0 : }
141 0 : }
142 :
143 152 : pub(crate) fn on_layer_visited(&mut self) {
144 152 : self.layers_visited += 1;
145 152 : }
146 :
147 60 : pub(crate) fn get_layers_visited(&self) -> u32 {
148 60 : self.layers_visited
149 60 : }
150 :
151 : /// This function is called after reading a keyspace from a layer.
152 : /// It checks if the read path has now moved past the cached Lsn for any keys.
153 : ///
154 : /// Implementation note: We intentionally iterate over the keys for which we've
155 : /// already collected some reconstruct data. This avoids scaling complexity with
156 : /// the size of the search space.
157 148 : pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
158 83905 : for (key, value) in self.keys.iter_mut() {
159 83905 : if !keyspace.contains(key) {
160 63517 : continue;
161 20388 : }
162 :
163 20388 : if let Ok(state) = value {
164 20388 : if state.situation != ValueReconstructSituation::Complete
165 0 : && state.get_cached_lsn() >= Some(advanced_to)
166 0 : {
167 0 : state.situation = ValueReconstructSituation::Complete;
168 0 : self.keys_done.add_key(*key);
169 20388 : }
170 0 : }
171 : }
172 148 : }
173 :
174 : /// Update the state collected for a given key.
175 : /// Returns true if this was the last value needed for the key and false otherwise.
176 : ///
177 : /// If the key is done after the update, mark it as such.
178 20388 : pub(crate) fn update_key(
179 20388 : &mut self,
180 20388 : key: &Key,
181 20388 : lsn: Lsn,
182 20388 : value: Value,
183 20388 : ) -> ValueReconstructSituation {
184 20388 : let state = self
185 20388 : .keys
186 20388 : .entry(*key)
187 20388 : .or_insert(Ok(VectoredValueReconstructState::default()));
188 :
189 20388 : if let Ok(state) = state {
190 20388 : let key_done = match state.situation {
191 0 : ValueReconstructSituation::Complete => unreachable!(),
192 20388 : ValueReconstructSituation::Continue => match value {
193 20388 : Value::Image(img) => {
194 20388 : state.img = Some((lsn, img));
195 20388 : true
196 : }
197 0 : Value::WalRecord(rec) => {
198 0 : debug_assert!(
199 0 : Some(lsn) > state.get_cached_lsn(),
200 0 : "Attempt to collect a record below cached LSN for walredo: {} < {}",
201 0 : lsn,
202 0 : state
203 0 : .get_cached_lsn()
204 0 : .expect("Assertion can only fire if a cached lsn is present")
205 : );
206 :
207 0 : let will_init = rec.will_init();
208 0 : state.records.push((lsn, rec));
209 0 : will_init
210 : }
211 : },
212 : };
213 :
214 20388 : if key_done && state.situation == ValueReconstructSituation::Continue {
215 20388 : state.situation = ValueReconstructSituation::Complete;
216 20388 : self.keys_done.add_key(*key);
217 20388 : }
218 :
219 20388 : state.situation
220 : } else {
221 0 : ValueReconstructSituation::Complete
222 : }
223 20388 : }
224 :
225 : /// Returns the Lsn at which this key is cached if one exists.
226 : /// The read path should go no further than this Lsn for the given key.
227 153275 : pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
228 153275 : self.keys
229 153275 : .get(key)
230 153275 : .and_then(|k| k.as_ref().ok())
231 153275 : .and_then(|state| state.get_cached_lsn())
232 153275 : }
233 :
234 : /// Returns the key space describing the keys that have
235 : /// been marked as completed since the last call to this function.
236 248 : pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
237 248 : self.keys_done.consume_keyspace()
238 248 : }
239 : }
240 :
241 : impl Default for ValuesReconstructState {
242 20 : fn default() -> Self {
243 20 : Self::new()
244 20 : }
245 : }
246 :
247 : /// A key that uniquely identifies a layer in a timeline
248 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
249 : pub(crate) enum LayerId {
250 : PersitentLayerId(PersistentLayerKey),
251 : InMemoryLayerId(InMemoryLayerFileId),
252 : }
253 :
254 : /// Layer wrapper for the read path. Note that it is valid
255 : /// to use these layers even after external operations have
256 : /// been performed on them (compaction, freeze, etc.).
257 : #[derive(Debug)]
258 : pub(crate) enum ReadableLayer {
259 : PersistentLayer(Layer),
260 : InMemoryLayer(Arc<InMemoryLayer>),
261 : }
262 :
263 : /// A partial description of a read to be done.
264 : #[derive(Debug, Clone)]
265 : struct ReadDesc {
266 : /// An id used to resolve the readable layer within the fringe
267 : layer_id: LayerId,
268 : /// Lsn range for the read, used for selecting the next read
269 : lsn_range: Range<Lsn>,
270 : }
271 :
272 : /// Data structure which maintains a fringe of layers for the
273 : /// read path. The fringe is the set of layers which intersects
274 : /// the current keyspace that the search is descending on.
275 : /// Each layer tracks the keyspace that intersects it.
276 : ///
277 : /// The fringe must appear sorted by Lsn. Hence, it uses
278 : /// a two layer indexing scheme.
279 : #[derive(Debug)]
280 : pub(crate) struct LayerFringe {
281 : planned_reads_by_lsn: BinaryHeap<ReadDesc>,
282 : layers: HashMap<LayerId, LayerKeyspace>,
283 : }
284 :
285 : #[derive(Debug)]
286 : struct LayerKeyspace {
287 : layer: ReadableLayer,
288 : target_keyspace: KeySpace,
289 : }
290 :
291 : impl LayerFringe {
292 96 : pub(crate) fn new() -> Self {
293 96 : LayerFringe {
294 96 : planned_reads_by_lsn: BinaryHeap::new(),
295 96 : layers: HashMap::new(),
296 96 : }
297 96 : }
298 :
299 248 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
300 248 : let read_desc = match self.planned_reads_by_lsn.pop() {
301 152 : Some(desc) => desc,
302 96 : None => return None,
303 : };
304 :
305 152 : let removed = self.layers.remove_entry(&read_desc.layer_id);
306 152 : match removed {
307 : Some((
308 : _,
309 : LayerKeyspace {
310 152 : layer,
311 152 : target_keyspace,
312 152 : },
313 152 : )) => Some((layer, target_keyspace, read_desc.lsn_range)),
314 0 : None => unreachable!("fringe internals are always consistent"),
315 : }
316 248 : }
317 :
318 63526 : pub(crate) fn update(
319 63526 : &mut self,
320 63526 : layer: ReadableLayer,
321 63526 : keyspace: KeySpace,
322 63526 : lsn_range: Range<Lsn>,
323 63526 : ) {
324 63526 : let layer_id = layer.id();
325 63526 : let entry = self.layers.entry(layer_id.clone());
326 63526 : match entry {
327 63374 : Entry::Occupied(mut entry) => {
328 63374 : entry.get_mut().target_keyspace.merge(&keyspace);
329 63374 : }
330 152 : Entry::Vacant(entry) => {
331 152 : self.planned_reads_by_lsn.push(ReadDesc {
332 152 : lsn_range,
333 152 : layer_id: layer_id.clone(),
334 152 : });
335 152 : entry.insert(LayerKeyspace {
336 152 : layer,
337 152 : target_keyspace: keyspace,
338 152 : });
339 152 : }
340 : }
341 63526 : }
342 : }
343 :
344 : impl Default for LayerFringe {
345 0 : fn default() -> Self {
346 0 : Self::new()
347 0 : }
348 : }
349 :
350 : impl Ord for ReadDesc {
351 6 : fn cmp(&self, other: &Self) -> Ordering {
352 6 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
353 6 : if ord == std::cmp::Ordering::Equal {
354 6 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
355 : } else {
356 0 : ord
357 : }
358 6 : }
359 : }
360 :
361 : impl PartialOrd for ReadDesc {
362 6 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
363 6 : Some(self.cmp(other))
364 6 : }
365 : }
366 :
367 : impl PartialEq for ReadDesc {
368 0 : fn eq(&self, other: &Self) -> bool {
369 0 : self.lsn_range == other.lsn_range
370 0 : }
371 : }
372 :
373 : impl Eq for ReadDesc {}
374 :
375 : impl ReadableLayer {
376 63526 : pub(crate) fn id(&self) -> LayerId {
377 63526 : match self {
378 63524 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
379 2 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
380 : }
381 63526 : }
382 :
383 152 : pub(crate) async fn get_values_reconstruct_data(
384 152 : &self,
385 152 : keyspace: KeySpace,
386 152 : lsn_range: Range<Lsn>,
387 152 : reconstruct_state: &mut ValuesReconstructState,
388 152 : ctx: &RequestContext,
389 152 : ) -> Result<(), GetVectoredError> {
390 152 : match self {
391 150 : ReadableLayer::PersistentLayer(layer) => {
392 150 : layer
393 150 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
394 7385 : .await
395 : }
396 2 : ReadableLayer::InMemoryLayer(layer) => {
397 2 : layer
398 2 : .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
399 26 : .await
400 : }
401 : }
402 152 : }
403 : }
404 :
405 : /// Return value from [`Layer::get_value_reconstruct_data`]
406 : #[derive(Clone, Copy, Debug)]
407 : pub enum ValueReconstructResult {
408 : /// Got all the data needed to reconstruct the requested page
409 : Complete,
410 : /// This layer didn't contain all the required data, the caller should look up
411 : /// the predecessor layer at the returned LSN and collect more data from there.
412 : Continue,
413 :
414 : /// This layer didn't contain data needed to reconstruct the page version at
415 : /// the returned LSN. This is usually considered an error, but might be OK
416 : /// in some circumstances.
417 : Missing,
418 : }
419 :
420 : #[derive(Debug)]
421 : pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
422 :
423 : /// This struct holds two instances of [`LayerAccessStatsInner`].
424 : /// Accesses are recorded to both instances.
425 : /// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
426 : /// The `for_eviction_policy` is never reset.
427 : #[derive(Debug, Default, Clone)]
428 : struct LayerAccessStatsLocked {
429 : for_scraping_api: LayerAccessStatsInner,
430 : for_eviction_policy: LayerAccessStatsInner,
431 : }
432 :
433 : impl LayerAccessStatsLocked {
434 210946 : fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
435 210946 : [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
436 210946 : }
437 : }
438 :
439 : #[derive(Debug, Default, Clone)]
440 : struct LayerAccessStatsInner {
441 : first_access: Option<LayerAccessStatFullDetails>,
442 : count_by_access_kind: EnumMap<LayerAccessKind, u64>,
443 : task_kind_flag: EnumSet<TaskKind>,
444 : last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
445 : last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
446 : }
447 :
448 : #[derive(Debug, Clone, Copy)]
449 : pub(crate) struct LayerAccessStatFullDetails {
450 : pub(crate) when: SystemTime,
451 : pub(crate) task_kind: TaskKind,
452 : pub(crate) access_kind: LayerAccessKind,
453 : }
454 :
455 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
456 : pub enum LayerAccessStatsReset {
457 : NoReset,
458 : JustTaskKindFlags,
459 : AllStats,
460 : }
461 :
462 0 : fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
463 0 : ts.duration_since(UNIX_EPOCH)
464 0 : .expect("better to die in this unlikely case than report false stats")
465 0 : .as_millis()
466 0 : .try_into()
467 0 : .expect("64 bits is enough for few more years")
468 0 : }
469 :
470 : impl LayerAccessStatFullDetails {
471 0 : fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
472 0 : let Self {
473 0 : when,
474 0 : task_kind,
475 0 : access_kind,
476 0 : } = self;
477 0 : pageserver_api::models::LayerAccessStatFullDetails {
478 0 : when_millis_since_epoch: system_time_to_millis_since_epoch(when),
479 0 : task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
480 0 : access_kind: *access_kind,
481 0 : }
482 0 : }
483 : }
484 :
485 : impl LayerAccessStats {
486 : /// Create an empty stats object.
487 : ///
488 : /// The caller is responsible for recording a residence event
489 : /// using [`record_residence_event`] before calling `latest_activity`.
490 : /// If they don't, [`latest_activity`] will return `None`.
491 : ///
492 : /// [`record_residence_event`]: Self::record_residence_event
493 : /// [`latest_activity`]: Self::latest_activity
494 1178 : pub(crate) fn empty_will_record_residence_event_later() -> Self {
495 1178 : LayerAccessStats(Mutex::default())
496 1178 : }
497 :
498 : /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
499 : ///
500 : /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
501 : ///
502 : /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
503 : /// [`record_residence_event`]: Self::record_residence_event
504 24 : pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
505 24 : let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
506 24 : new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
507 24 : new
508 24 : }
509 :
510 : /// Record a change in layer residency.
511 : ///
512 : /// Recording the event must happen while holding the layer map lock to
513 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
514 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
515 : ///
516 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
517 : /// the following race could happen:
518 : ///
519 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
520 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
521 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
522 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
523 : ///
524 1226 : pub(crate) fn record_residence_event(
525 1226 : &self,
526 1226 : status: LayerResidenceStatus,
527 1226 : reason: LayerResidenceEventReason,
528 1226 : ) {
529 1226 : let mut locked = self.0.lock().unwrap();
530 2452 : locked.iter_mut().for_each(|inner| {
531 2452 : inner
532 2452 : .last_residence_changes
533 2452 : .write(LayerResidenceEvent::new(status, reason))
534 2452 : });
535 1226 : }
536 :
537 209720 : fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
538 209720 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
539 0 : return;
540 209720 : }
541 209720 :
542 209720 : let this_access = LayerAccessStatFullDetails {
543 209720 : when: SystemTime::now(),
544 209720 : task_kind: ctx.task_kind(),
545 209720 : access_kind,
546 209720 : };
547 209720 :
548 209720 : let mut locked = self.0.lock().unwrap();
549 419440 : locked.iter_mut().for_each(|inner| {
550 419440 : inner.first_access.get_or_insert(this_access);
551 419440 : inner.count_by_access_kind[access_kind] += 1;
552 419440 : inner.task_kind_flag |= ctx.task_kind();
553 419440 : inner.last_accesses.write(this_access);
554 419440 : })
555 209720 : }
556 :
557 0 : fn as_api_model(
558 0 : &self,
559 0 : reset: LayerAccessStatsReset,
560 0 : ) -> pageserver_api::models::LayerAccessStats {
561 0 : let mut locked = self.0.lock().unwrap();
562 0 : let inner = &mut locked.for_scraping_api;
563 0 : let LayerAccessStatsInner {
564 0 : first_access,
565 0 : count_by_access_kind,
566 0 : task_kind_flag,
567 0 : last_accesses,
568 0 : last_residence_changes,
569 0 : } = inner;
570 0 : let ret = pageserver_api::models::LayerAccessStats {
571 0 : access_count_by_access_kind: count_by_access_kind
572 0 : .iter()
573 0 : .map(|(kind, count)| (kind, *count))
574 0 : .collect(),
575 0 : task_kind_access_flag: task_kind_flag
576 0 : .iter()
577 0 : .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
578 0 : .collect(),
579 0 : first: first_access.as_ref().map(|a| a.as_api_model()),
580 0 : accesses_history: last_accesses.map(|m| m.as_api_model()),
581 0 : residence_events_history: last_residence_changes.clone(),
582 0 : };
583 0 : match reset {
584 0 : LayerAccessStatsReset::NoReset => (),
585 0 : LayerAccessStatsReset::JustTaskKindFlags => {
586 0 : inner.task_kind_flag.clear();
587 0 : }
588 0 : LayerAccessStatsReset::AllStats => {
589 0 : *inner = LayerAccessStatsInner::default();
590 0 : }
591 : }
592 0 : ret
593 0 : }
594 :
595 : /// Get the latest access timestamp, falling back to latest residence event, further falling
596 : /// back to `SystemTime::now` for a usable timestamp for eviction.
597 0 : pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
598 0 : self.latest_activity().unwrap_or_else(SystemTime::now)
599 0 : }
600 :
601 : /// Get the latest access timestamp, falling back to latest residence event.
602 : ///
603 : /// This function can only return `None` if there has not yet been a call to the
604 : /// [`record_residence_event`] method. That would generally be considered an
605 : /// implementation error. This function logs a rate-limited warning in that case.
606 : ///
607 : /// TODO: use type system to avoid the need for `fallback`.
608 : /// The approach in <https://github.com/neondatabase/neon/pull/3775>
609 : /// could be used to enforce that a residence event is recorded
610 : /// before a layer is added to the layer map. We could also have
611 : /// a layer wrapper type that holds the LayerAccessStats, and ensure
612 : /// that that type can only be produced by inserting into the layer map.
613 : ///
614 : /// [`record_residence_event`]: Self::record_residence_event
615 0 : fn latest_activity(&self) -> Option<SystemTime> {
616 0 : let locked = self.0.lock().unwrap();
617 0 : let inner = &locked.for_eviction_policy;
618 0 : match inner.last_accesses.recent() {
619 0 : Some(a) => Some(a.when),
620 0 : None => match inner.last_residence_changes.recent() {
621 0 : Some(e) => Some(e.timestamp),
622 : None => {
623 : static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
624 0 : Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
625 0 : let mut guard = WARN_RATE_LIMIT.lock().unwrap();
626 0 : guard.0 += 1;
627 0 : let occurences = guard.0;
628 0 : guard.1.call(move || {
629 0 : warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
630 0 : });
631 0 : None
632 : }
633 : },
634 : }
635 0 : }
636 : }
637 :
638 : /// Get a layer descriptor from a layer.
639 : pub trait AsLayerDesc {
640 : /// Get the layer descriptor.
641 : fn layer_desc(&self) -> &PersistentLayerDesc;
642 : }
643 :
644 : pub mod tests {
645 : use pageserver_api::shard::TenantShardId;
646 :
647 : use super::*;
648 :
649 : impl From<DeltaLayerName> for PersistentLayerDesc {
650 0 : fn from(value: DeltaLayerName) -> Self {
651 0 : PersistentLayerDesc::new_delta(
652 0 : TenantShardId::from([0; 18]),
653 0 : TimelineId::from_array([0; 16]),
654 0 : value.key_range,
655 0 : value.lsn_range,
656 0 : 233,
657 0 : )
658 0 : }
659 : }
660 :
661 : impl From<ImageLayerName> for PersistentLayerDesc {
662 0 : fn from(value: ImageLayerName) -> Self {
663 0 : PersistentLayerDesc::new_img(
664 0 : TenantShardId::from([0; 18]),
665 0 : TimelineId::from_array([0; 16]),
666 0 : value.key_range,
667 0 : value.lsn,
668 0 : 233,
669 0 : )
670 0 : }
671 : }
672 :
673 : impl From<LayerName> for PersistentLayerDesc {
674 0 : fn from(value: LayerName) -> Self {
675 0 : match value {
676 0 : LayerName::Delta(d) => Self::from(d),
677 0 : LayerName::Image(i) => Self::from(i),
678 : }
679 0 : }
680 : }
681 : }
682 :
683 : /// Range wrapping newtype, which uses display to render Debug.
684 : ///
685 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
686 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
687 :
688 : impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
689 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 0 : write!(f, "{}..{}", self.0.start, self.0.end)
691 0 : }
692 : }
|