Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod delta_layer;
4 : pub mod image_layer;
5 : pub(crate) mod inmemory_layer;
6 : pub(crate) mod layer;
7 : mod layer_desc;
8 : mod layer_name;
9 : pub mod merge_iterator;
10 :
11 : use crate::context::{AccessStatsBehavior, RequestContext};
12 : use crate::repository::Value;
13 : use crate::task_mgr::TaskKind;
14 : use crate::walrecord::NeonWalRecord;
15 : use bytes::Bytes;
16 : use enum_map::EnumMap;
17 : use enumset::EnumSet;
18 : use once_cell::sync::Lazy;
19 : use pageserver_api::key::Key;
20 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
21 : use pageserver_api::models::{
22 : LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
23 : };
24 : use std::borrow::Cow;
25 : use std::cmp::{Ordering, Reverse};
26 : use std::collections::hash_map::Entry;
27 : use std::collections::{BinaryHeap, HashMap};
28 : use std::ops::Range;
29 : use std::sync::{Arc, Mutex};
30 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
31 : use tracing::warn;
32 : use utils::history_buffer::HistoryBufferWithDropCounter;
33 : use utils::rate_limit::RateLimit;
34 :
35 : use utils::{id::TimelineId, lsn::Lsn};
36 :
37 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
38 : pub use image_layer::{ImageLayer, ImageLayerWriter};
39 : pub use inmemory_layer::InMemoryLayer;
40 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
41 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
42 :
43 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
44 :
45 : use self::inmemory_layer::InMemoryLayerFileId;
46 :
47 : use super::timeline::GetVectoredError;
48 : use super::PageReconstructError;
49 :
50 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
51 0 : where
52 0 : T: PartialOrd<T>,
53 0 : {
54 0 : if a.start < b.start {
55 0 : a.end > b.start
56 : } else {
57 0 : b.end > a.start
58 : }
59 0 : }
60 :
61 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
62 : ///
63 : /// Before first call, you can fill in 'page_img' if you have an older cached
64 : /// version of the page available. That can save work in
65 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
66 : /// when all the WAL records going back to the cached image have been collected.
67 : ///
68 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
69 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
70 : /// record that initializes the page without requiring a previous image.
71 : ///
72 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
73 : /// been collected, but there are more records outside the current layer. Pass
74 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
75 : /// call, to collect more records.
76 : ///
77 : #[derive(Debug, Default)]
78 : pub struct ValueReconstructState {
79 : pub records: Vec<(Lsn, NeonWalRecord)>,
80 : pub img: Option<(Lsn, Bytes)>,
81 : }
82 :
83 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
84 : pub(crate) enum ValueReconstructSituation {
85 : Complete,
86 : #[default]
87 : Continue,
88 : }
89 :
90 : /// Reconstruct data accumulated for a single key during a vectored get
91 : #[derive(Debug, Default, Clone)]
92 : pub(crate) struct VectoredValueReconstructState {
93 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
94 : pub(crate) img: Option<(Lsn, Bytes)>,
95 :
96 : situation: ValueReconstructSituation,
97 : }
98 :
99 : impl VectoredValueReconstructState {
100 39816 : fn get_cached_lsn(&self) -> Option<Lsn> {
101 39816 : self.img.as_ref().map(|img| img.0)
102 39816 : }
103 : }
104 :
105 : impl From<VectoredValueReconstructState> for ValueReconstructState {
106 40436 : fn from(mut state: VectoredValueReconstructState) -> Self {
107 40436 : // walredo expects the records to be descending in terms of Lsn
108 40436 : state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
109 40436 :
110 40436 : ValueReconstructState {
111 40436 : records: state.records,
112 40436 : img: state.img,
113 40436 : }
114 40436 : }
115 : }
116 :
117 : /// Bag of data accumulated during a vectored get..
118 : pub(crate) struct ValuesReconstructState {
119 : /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
120 : /// should not expect to get anything from this hashmap.
121 : pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
122 : /// The keys which are already retrieved
123 : keys_done: KeySpaceRandomAccum,
124 :
125 : /// The keys covered by the image layers
126 : keys_with_image_coverage: Option<Range<Key>>,
127 :
128 : // Statistics that are still accessible as a caller of `get_vectored_impl`.
129 : layers_visited: u32,
130 : delta_layers_visited: u32,
131 : }
132 :
133 : impl ValuesReconstructState {
134 348 : pub(crate) fn new() -> Self {
135 348 : Self {
136 348 : keys: HashMap::new(),
137 348 : keys_done: KeySpaceRandomAccum::new(),
138 348 : keys_with_image_coverage: None,
139 348 : layers_visited: 0,
140 348 : delta_layers_visited: 0,
141 348 : }
142 348 : }
143 :
144 : /// Associate a key with the error which it encountered and mark it as done
145 0 : pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
146 0 : let previous = self.keys.insert(key, Err(err));
147 0 : if let Some(Ok(state)) = previous {
148 0 : if state.situation == ValueReconstructSituation::Continue {
149 0 : self.keys_done.add_key(key);
150 0 : }
151 0 : }
152 0 : }
153 :
154 232 : pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
155 232 : self.layers_visited += 1;
156 232 : if let ReadableLayer::PersistentLayer(layer) = layer {
157 218 : if layer.layer_desc().is_delta() {
158 152 : self.delta_layers_visited += 1;
159 152 : }
160 14 : }
161 232 : }
162 :
163 24 : pub(crate) fn get_delta_layers_visited(&self) -> u32 {
164 24 : self.delta_layers_visited
165 24 : }
166 :
167 128 : pub(crate) fn get_layers_visited(&self) -> u32 {
168 128 : self.layers_visited
169 128 : }
170 :
171 : /// This function is called after reading a keyspace from a layer.
172 : /// It checks if the read path has now moved past the cached Lsn for any keys.
173 : ///
174 : /// Implementation note: We intentionally iterate over the keys for which we've
175 : /// already collected some reconstruct data. This avoids scaling complexity with
176 : /// the size of the search space.
177 166 : pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
178 72391 : for (key, value) in self.keys.iter_mut() {
179 72391 : if !keyspace.contains(key) {
180 42253 : continue;
181 30138 : }
182 :
183 30138 : if let Ok(state) = value {
184 30138 : if state.situation != ValueReconstructSituation::Complete
185 0 : && state.get_cached_lsn() >= Some(advanced_to)
186 0 : {
187 0 : state.situation = ValueReconstructSituation::Complete;
188 0 : self.keys_done.add_key(*key);
189 30138 : }
190 0 : }
191 : }
192 166 : }
193 :
194 : /// On hitting image layer, we can mark all keys in this range as done, because
195 : /// if the image layer does not contain a key, it is deleted/never added.
196 74 : pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
197 74 : let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
198 74 : assert_eq!(
199 : prev_val, None,
200 0 : "should consume the keyspace before the next iteration"
201 : );
202 74 : }
203 :
204 : /// Update the state collected for a given key.
205 : /// Returns true if this was the last value needed for the key and false otherwise.
206 : ///
207 : /// If the key is done after the update, mark it as such.
208 40508 : pub(crate) fn update_key(
209 40508 : &mut self,
210 40508 : key: &Key,
211 40508 : lsn: Lsn,
212 40508 : value: Value,
213 40508 : ) -> ValueReconstructSituation {
214 40508 : let state = self
215 40508 : .keys
216 40508 : .entry(*key)
217 40508 : .or_insert(Ok(VectoredValueReconstructState::default()));
218 :
219 40508 : if let Ok(state) = state {
220 40508 : let key_done = match state.situation {
221 0 : ValueReconstructSituation::Complete => unreachable!(),
222 40508 : ValueReconstructSituation::Continue => match value {
223 40508 : Value::Image(img) => {
224 40508 : state.img = Some((lsn, img));
225 40508 : true
226 : }
227 0 : Value::WalRecord(rec) => {
228 0 : debug_assert!(
229 0 : Some(lsn) > state.get_cached_lsn(),
230 0 : "Attempt to collect a record below cached LSN for walredo: {} < {}",
231 0 : lsn,
232 0 : state
233 0 : .get_cached_lsn()
234 0 : .expect("Assertion can only fire if a cached lsn is present")
235 : );
236 :
237 0 : let will_init = rec.will_init();
238 0 : state.records.push((lsn, rec));
239 0 : will_init
240 : }
241 : },
242 : };
243 :
244 40508 : if key_done && state.situation == ValueReconstructSituation::Continue {
245 40508 : state.situation = ValueReconstructSituation::Complete;
246 40508 : self.keys_done.add_key(*key);
247 40508 : }
248 :
249 40508 : state.situation
250 : } else {
251 0 : ValueReconstructSituation::Complete
252 : }
253 40508 : }
254 :
255 : /// Returns the Lsn at which this key is cached if one exists.
256 : /// The read path should go no further than this Lsn for the given key.
257 191617 : pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
258 191617 : self.keys
259 191617 : .get(key)
260 191617 : .and_then(|k| k.as_ref().ok())
261 191617 : .and_then(|state| state.get_cached_lsn())
262 191617 : }
263 :
264 : /// Returns the key space describing the keys that have
265 : /// been marked as completed since the last call to this function.
266 : /// Returns individual keys done, and the image layer coverage.
267 406 : pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
268 406 : (
269 406 : self.keys_done.consume_keyspace(),
270 406 : self.keys_with_image_coverage.take(),
271 406 : )
272 406 : }
273 : }
274 :
275 : impl Default for ValuesReconstructState {
276 66 : fn default() -> Self {
277 66 : Self::new()
278 66 : }
279 : }
280 :
281 : /// A key that uniquely identifies a layer in a timeline
282 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
283 : pub(crate) enum LayerId {
284 : PersitentLayerId(PersistentLayerKey),
285 : InMemoryLayerId(InMemoryLayerFileId),
286 : }
287 :
288 : /// Layer wrapper for the read path. Note that it is valid
289 : /// to use these layers even after external operations have
290 : /// been performed on them (compaction, freeze, etc.).
291 : #[derive(Debug)]
292 : pub(crate) enum ReadableLayer {
293 : PersistentLayer(Layer),
294 : InMemoryLayer(Arc<InMemoryLayer>),
295 : }
296 :
297 : /// A partial description of a read to be done.
298 : #[derive(Debug, Clone)]
299 : struct ReadDesc {
300 : /// An id used to resolve the readable layer within the fringe
301 : layer_id: LayerId,
302 : /// Lsn range for the read, used for selecting the next read
303 : lsn_range: Range<Lsn>,
304 : }
305 :
306 : /// Data structure which maintains a fringe of layers for the
307 : /// read path. The fringe is the set of layers which intersects
308 : /// the current keyspace that the search is descending on.
309 : /// Each layer tracks the keyspace that intersects it.
310 : ///
311 : /// The fringe must appear sorted by Lsn. Hence, it uses
312 : /// a two layer indexing scheme.
313 : #[derive(Debug)]
314 : pub(crate) struct LayerFringe {
315 : planned_reads_by_lsn: BinaryHeap<ReadDesc>,
316 : layers: HashMap<LayerId, LayerKeyspace>,
317 : }
318 :
319 : #[derive(Debug)]
320 : struct LayerKeyspace {
321 : layer: ReadableLayer,
322 : target_keyspace: KeySpaceRandomAccum,
323 : }
324 :
325 : impl LayerFringe {
326 174 : pub(crate) fn new() -> Self {
327 174 : LayerFringe {
328 174 : planned_reads_by_lsn: BinaryHeap::new(),
329 174 : layers: HashMap::new(),
330 174 : }
331 174 : }
332 :
333 406 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
334 406 : let read_desc = match self.planned_reads_by_lsn.pop() {
335 232 : Some(desc) => desc,
336 174 : None => return None,
337 : };
338 :
339 232 : let removed = self.layers.remove_entry(&read_desc.layer_id);
340 232 :
341 232 : match removed {
342 : Some((
343 : _,
344 : LayerKeyspace {
345 232 : layer,
346 232 : mut target_keyspace,
347 232 : },
348 232 : )) => Some((
349 232 : layer,
350 232 : target_keyspace.consume_keyspace(),
351 232 : read_desc.lsn_range,
352 232 : )),
353 0 : None => unreachable!("fringe internals are always consistent"),
354 : }
355 406 : }
356 :
357 64050 : pub(crate) fn update(
358 64050 : &mut self,
359 64050 : layer: ReadableLayer,
360 64050 : keyspace: KeySpace,
361 64050 : lsn_range: Range<Lsn>,
362 64050 : ) {
363 64050 : let layer_id = layer.id();
364 64050 : let entry = self.layers.entry(layer_id.clone());
365 64050 : match entry {
366 63818 : Entry::Occupied(mut entry) => {
367 63818 : entry.get_mut().target_keyspace.add_keyspace(keyspace);
368 63818 : }
369 232 : Entry::Vacant(entry) => {
370 232 : self.planned_reads_by_lsn.push(ReadDesc {
371 232 : lsn_range,
372 232 : layer_id: layer_id.clone(),
373 232 : });
374 232 : let mut accum = KeySpaceRandomAccum::new();
375 232 : accum.add_keyspace(keyspace);
376 232 : entry.insert(LayerKeyspace {
377 232 : layer,
378 232 : target_keyspace: accum,
379 232 : });
380 232 : }
381 : }
382 64050 : }
383 : }
384 :
385 : impl Default for LayerFringe {
386 0 : fn default() -> Self {
387 0 : Self::new()
388 0 : }
389 : }
390 :
391 : impl Ord for ReadDesc {
392 16 : fn cmp(&self, other: &Self) -> Ordering {
393 16 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
394 16 : if ord == std::cmp::Ordering::Equal {
395 16 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
396 : } else {
397 0 : ord
398 : }
399 16 : }
400 : }
401 :
402 : impl PartialOrd for ReadDesc {
403 16 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
404 16 : Some(self.cmp(other))
405 16 : }
406 : }
407 :
408 : impl PartialEq for ReadDesc {
409 0 : fn eq(&self, other: &Self) -> bool {
410 0 : self.lsn_range == other.lsn_range
411 0 : }
412 : }
413 :
414 : impl Eq for ReadDesc {}
415 :
416 : impl ReadableLayer {
417 64050 : pub(crate) fn id(&self) -> LayerId {
418 64050 : match self {
419 64036 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
420 14 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
421 : }
422 64050 : }
423 :
424 232 : pub(crate) async fn get_values_reconstruct_data(
425 232 : &self,
426 232 : keyspace: KeySpace,
427 232 : lsn_range: Range<Lsn>,
428 232 : reconstruct_state: &mut ValuesReconstructState,
429 232 : ctx: &RequestContext,
430 232 : ) -> Result<(), GetVectoredError> {
431 232 : match self {
432 218 : ReadableLayer::PersistentLayer(layer) => {
433 218 : layer
434 218 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
435 11401 : .await
436 : }
437 14 : ReadableLayer::InMemoryLayer(layer) => {
438 14 : layer
439 14 : .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
440 27 : .await
441 : }
442 : }
443 232 : }
444 : }
445 :
446 : /// Return value from [`Layer::get_value_reconstruct_data`]
447 : #[derive(Clone, Copy, Debug)]
448 : pub enum ValueReconstructResult {
449 : /// Got all the data needed to reconstruct the requested page
450 : Complete,
451 : /// This layer didn't contain all the required data, the caller should look up
452 : /// the predecessor layer at the returned LSN and collect more data from there.
453 : Continue,
454 :
455 : /// This layer didn't contain data needed to reconstruct the page version at
456 : /// the returned LSN. This is usually considered an error, but might be OK
457 : /// in some circumstances.
458 : Missing,
459 : }
460 :
461 : #[derive(Debug)]
462 : pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
463 :
464 : /// This struct holds two instances of [`LayerAccessStatsInner`].
465 : /// Accesses are recorded to both instances.
466 : /// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
467 : /// The `for_eviction_policy` is never reset.
468 : #[derive(Debug, Default, Clone)]
469 : struct LayerAccessStatsLocked {
470 : for_scraping_api: LayerAccessStatsInner,
471 : for_eviction_policy: LayerAccessStatsInner,
472 : }
473 :
474 : impl LayerAccessStatsLocked {
475 213416 : fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
476 213416 : [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
477 213416 : }
478 : }
479 :
480 : #[derive(Debug, Default, Clone)]
481 : struct LayerAccessStatsInner {
482 : first_access: Option<LayerAccessStatFullDetails>,
483 : count_by_access_kind: EnumMap<LayerAccessKind, u64>,
484 : task_kind_flag: EnumSet<TaskKind>,
485 : last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
486 : last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
487 : }
488 :
489 : #[derive(Debug, Clone, Copy)]
490 : pub(crate) struct LayerAccessStatFullDetails {
491 : pub(crate) when: SystemTime,
492 : pub(crate) task_kind: TaskKind,
493 : pub(crate) access_kind: LayerAccessKind,
494 : }
495 :
496 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
497 : pub enum LayerAccessStatsReset {
498 : NoReset,
499 : JustTaskKindFlags,
500 : AllStats,
501 : }
502 :
503 0 : fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
504 0 : ts.duration_since(UNIX_EPOCH)
505 0 : .expect("better to die in this unlikely case than report false stats")
506 0 : .as_millis()
507 0 : .try_into()
508 0 : .expect("64 bits is enough for few more years")
509 0 : }
510 :
511 : impl LayerAccessStatFullDetails {
512 0 : fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
513 0 : let Self {
514 0 : when,
515 0 : task_kind,
516 0 : access_kind,
517 0 : } = self;
518 0 : pageserver_api::models::LayerAccessStatFullDetails {
519 0 : when_millis_since_epoch: system_time_to_millis_since_epoch(when),
520 0 : task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
521 0 : access_kind: *access_kind,
522 0 : }
523 0 : }
524 : }
525 :
526 : impl LayerAccessStats {
527 : /// Create an empty stats object.
528 : ///
529 : /// The caller is responsible for recording a residence event
530 : /// using [`record_residence_event`] before calling `latest_activity`.
531 : /// If they don't, [`latest_activity`] will return `None`.
532 : ///
533 : /// [`record_residence_event`]: Self::record_residence_event
534 : /// [`latest_activity`]: Self::latest_activity
535 1590 : pub(crate) fn empty_will_record_residence_event_later() -> Self {
536 1590 : LayerAccessStats(Mutex::default())
537 1590 : }
538 :
539 : /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
540 : ///
541 : /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
542 : ///
543 : /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
544 : /// [`record_residence_event`]: Self::record_residence_event
545 24 : pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
546 24 : let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
547 24 : new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
548 24 : new
549 24 : }
550 :
551 : /// Record a change in layer residency.
552 : ///
553 : /// Recording the event must happen while holding the layer map lock to
554 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
555 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
556 : ///
557 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
558 : /// the following race could happen:
559 : ///
560 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
561 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
562 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
563 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
564 : ///
565 1638 : pub(crate) fn record_residence_event(
566 1638 : &self,
567 1638 : status: LayerResidenceStatus,
568 1638 : reason: LayerResidenceEventReason,
569 1638 : ) {
570 1638 : let mut locked = self.0.lock().unwrap();
571 3276 : locked.iter_mut().for_each(|inner| {
572 3276 : inner
573 3276 : .last_residence_changes
574 3276 : .write(LayerResidenceEvent::new(status, reason))
575 3276 : });
576 1638 : }
577 :
578 212130 : fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
579 212130 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
580 352 : return;
581 211778 : }
582 211778 :
583 211778 : let this_access = LayerAccessStatFullDetails {
584 211778 : when: SystemTime::now(),
585 211778 : task_kind: ctx.task_kind(),
586 211778 : access_kind,
587 211778 : };
588 211778 :
589 211778 : let mut locked = self.0.lock().unwrap();
590 423556 : locked.iter_mut().for_each(|inner| {
591 423556 : inner.first_access.get_or_insert(this_access);
592 423556 : inner.count_by_access_kind[access_kind] += 1;
593 423556 : inner.task_kind_flag |= ctx.task_kind();
594 423556 : inner.last_accesses.write(this_access);
595 423556 : })
596 212130 : }
597 :
598 0 : fn as_api_model(
599 0 : &self,
600 0 : reset: LayerAccessStatsReset,
601 0 : ) -> pageserver_api::models::LayerAccessStats {
602 0 : let mut locked = self.0.lock().unwrap();
603 0 : let inner = &mut locked.for_scraping_api;
604 0 : let LayerAccessStatsInner {
605 0 : first_access,
606 0 : count_by_access_kind,
607 0 : task_kind_flag,
608 0 : last_accesses,
609 0 : last_residence_changes,
610 0 : } = inner;
611 0 : let ret = pageserver_api::models::LayerAccessStats {
612 0 : access_count_by_access_kind: count_by_access_kind
613 0 : .iter()
614 0 : .map(|(kind, count)| (kind, *count))
615 0 : .collect(),
616 0 : task_kind_access_flag: task_kind_flag
617 0 : .iter()
618 0 : .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
619 0 : .collect(),
620 0 : first: first_access.as_ref().map(|a| a.as_api_model()),
621 0 : accesses_history: last_accesses.map(|m| m.as_api_model()),
622 0 : residence_events_history: last_residence_changes.clone(),
623 0 : };
624 0 : match reset {
625 0 : LayerAccessStatsReset::NoReset => (),
626 0 : LayerAccessStatsReset::JustTaskKindFlags => {
627 0 : inner.task_kind_flag.clear();
628 0 : }
629 0 : LayerAccessStatsReset::AllStats => {
630 0 : *inner = LayerAccessStatsInner::default();
631 0 : }
632 : }
633 0 : ret
634 0 : }
635 :
636 : /// Get the latest access timestamp, falling back to latest residence event, further falling
637 : /// back to `SystemTime::now` for a usable timestamp for eviction.
638 0 : pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
639 0 : self.latest_activity().unwrap_or_else(SystemTime::now)
640 0 : }
641 :
642 : /// Get the latest access timestamp, falling back to latest residence event.
643 : ///
644 : /// This function can only return `None` if there has not yet been a call to the
645 : /// [`record_residence_event`] method. That would generally be considered an
646 : /// implementation error. This function logs a rate-limited warning in that case.
647 : ///
648 : /// TODO: use type system to avoid the need for `fallback`.
649 : /// The approach in <https://github.com/neondatabase/neon/pull/3775>
650 : /// could be used to enforce that a residence event is recorded
651 : /// before a layer is added to the layer map. We could also have
652 : /// a layer wrapper type that holds the LayerAccessStats, and ensure
653 : /// that that type can only be produced by inserting into the layer map.
654 : ///
655 : /// [`record_residence_event`]: Self::record_residence_event
656 0 : fn latest_activity(&self) -> Option<SystemTime> {
657 0 : let locked = self.0.lock().unwrap();
658 0 : let inner = &locked.for_eviction_policy;
659 0 : match inner.last_accesses.recent() {
660 0 : Some(a) => Some(a.when),
661 0 : None => match inner.last_residence_changes.recent() {
662 0 : Some(e) => Some(e.timestamp),
663 : None => {
664 : static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
665 0 : Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
666 0 : let mut guard = WARN_RATE_LIMIT.lock().unwrap();
667 0 : guard.0 += 1;
668 0 : let occurences = guard.0;
669 0 : guard.1.call(move || {
670 0 : warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
671 0 : });
672 0 : None
673 : }
674 : },
675 : }
676 0 : }
677 :
678 : /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
679 : ///
680 : /// This indicates whether the layer has been used for some purpose that would motivate
681 : /// us to keep it on disk, such as for serving a getpage request.
682 18 : fn accessed(&self) -> bool {
683 18 : let locked = self.0.lock().unwrap();
684 18 : let inner = &locked.for_eviction_policy;
685 18 :
686 18 : // Consider it accessed if the most recent access is more recent than
687 18 : // the most recent change in residence status.
688 18 : match (
689 18 : inner.last_accesses.recent(),
690 18 : inner.last_residence_changes.recent(),
691 : ) {
692 14 : (None, _) => false,
693 0 : (Some(_), None) => true,
694 4 : (Some(a), Some(r)) => a.when >= r.timestamp,
695 : }
696 18 : }
697 : }
698 :
699 : /// Get a layer descriptor from a layer.
700 : pub trait AsLayerDesc {
701 : /// Get the layer descriptor.
702 : fn layer_desc(&self) -> &PersistentLayerDesc;
703 : }
704 :
705 : pub mod tests {
706 : use pageserver_api::shard::TenantShardId;
707 :
708 : use super::*;
709 :
710 : impl From<DeltaLayerName> for PersistentLayerDesc {
711 0 : fn from(value: DeltaLayerName) -> Self {
712 0 : PersistentLayerDesc::new_delta(
713 0 : TenantShardId::from([0; 18]),
714 0 : TimelineId::from_array([0; 16]),
715 0 : value.key_range,
716 0 : value.lsn_range,
717 0 : 233,
718 0 : )
719 0 : }
720 : }
721 :
722 : impl From<ImageLayerName> for PersistentLayerDesc {
723 0 : fn from(value: ImageLayerName) -> Self {
724 0 : PersistentLayerDesc::new_img(
725 0 : TenantShardId::from([0; 18]),
726 0 : TimelineId::from_array([0; 16]),
727 0 : value.key_range,
728 0 : value.lsn,
729 0 : 233,
730 0 : )
731 0 : }
732 : }
733 :
734 : impl From<LayerName> for PersistentLayerDesc {
735 0 : fn from(value: LayerName) -> Self {
736 0 : match value {
737 0 : LayerName::Delta(d) => Self::from(d),
738 0 : LayerName::Image(i) => Self::from(i),
739 : }
740 0 : }
741 : }
742 : }
743 :
744 : /// Range wrapping newtype, which uses display to render Debug.
745 : ///
746 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
747 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
748 :
749 : impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
750 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
751 0 : write!(f, "{}..{}", self.0.start, self.0.end)
752 0 : }
753 : }
|