Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod delta_layer;
4 : pub mod image_layer;
5 : pub(crate) mod inmemory_layer;
6 : pub(crate) mod layer;
7 : mod layer_desc;
8 : mod layer_name;
9 :
10 : use crate::context::{AccessStatsBehavior, RequestContext};
11 : use crate::repository::Value;
12 : use crate::task_mgr::TaskKind;
13 : use crate::walrecord::NeonWalRecord;
14 : use bytes::Bytes;
15 : use enum_map::EnumMap;
16 : use enumset::EnumSet;
17 : use once_cell::sync::Lazy;
18 : use pageserver_api::key::Key;
19 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
20 : use pageserver_api::models::{
21 : LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
22 : };
23 : use std::borrow::Cow;
24 : use std::cmp::{Ordering, Reverse};
25 : use std::collections::hash_map::Entry;
26 : use std::collections::{BinaryHeap, HashMap};
27 : use std::ops::Range;
28 : use std::sync::{Arc, Mutex};
29 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
30 : use tracing::warn;
31 : use utils::history_buffer::HistoryBufferWithDropCounter;
32 : use utils::rate_limit::RateLimit;
33 :
34 : use utils::{id::TimelineId, lsn::Lsn};
35 :
36 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
37 : pub use image_layer::{ImageLayer, ImageLayerWriter};
38 : pub use inmemory_layer::InMemoryLayer;
39 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
40 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
41 :
42 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
43 :
44 : use self::inmemory_layer::InMemoryLayerFileId;
45 :
46 : use super::timeline::GetVectoredError;
47 : use super::PageReconstructError;
48 :
49 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
50 0 : where
51 0 : T: PartialOrd<T>,
52 0 : {
53 0 : if a.start < b.start {
54 0 : a.end > b.start
55 : } else {
56 0 : b.end > a.start
57 : }
58 0 : }
59 :
60 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
61 : ///
62 : /// Before first call, you can fill in 'page_img' if you have an older cached
63 : /// version of the page available. That can save work in
64 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
65 : /// when all the WAL records going back to the cached image have been collected.
66 : ///
67 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
68 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
69 : /// record that initializes the page without requiring a previous image.
70 : ///
71 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
72 : /// been collected, but there are more records outside the current layer. Pass
73 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
74 : /// call, to collect more records.
75 : ///
76 : #[derive(Debug, Default)]
77 : pub struct ValueReconstructState {
78 : pub records: Vec<(Lsn, NeonWalRecord)>,
79 : pub img: Option<(Lsn, Bytes)>,
80 : }
81 :
82 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
83 : pub(crate) enum ValueReconstructSituation {
84 : Complete,
85 : #[default]
86 : Continue,
87 : }
88 :
89 : /// Reconstruct data accumulated for a single key during a vectored get
90 : #[derive(Debug, Default, Clone)]
91 : pub(crate) struct VectoredValueReconstructState {
92 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
93 : pub(crate) img: Option<(Lsn, Bytes)>,
94 :
95 : situation: ValueReconstructSituation,
96 : }
97 :
98 : impl VectoredValueReconstructState {
99 39635 : fn get_cached_lsn(&self) -> Option<Lsn> {
100 39635 : self.img.as_ref().map(|img| img.0)
101 39635 : }
102 : }
103 :
104 : impl From<VectoredValueReconstructState> for ValueReconstructState {
105 40436 : fn from(mut state: VectoredValueReconstructState) -> Self {
106 40436 : // walredo expects the records to be descending in terms of Lsn
107 40436 : state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
108 40436 :
109 40436 : ValueReconstructState {
110 40436 : records: state.records,
111 40436 : img: state.img,
112 40436 : }
113 40436 : }
114 : }
115 :
116 : /// Bag of data accumulated during a vectored get..
117 : pub(crate) struct ValuesReconstructState {
118 : /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
119 : /// should not expect to get anything from this hashmap.
120 : pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
121 : /// The keys which are already retrieved
122 : keys_done: KeySpaceRandomAccum,
123 :
124 : /// The keys covered by the image layers
125 : keys_with_image_coverage: Option<Range<Key>>,
126 :
127 : // Statistics that are still accessible as a caller of `get_vectored_impl`.
128 : layers_visited: u32,
129 : delta_layers_visited: u32,
130 : }
131 :
132 : impl ValuesReconstructState {
133 348 : pub(crate) fn new() -> Self {
134 348 : Self {
135 348 : keys: HashMap::new(),
136 348 : keys_done: KeySpaceRandomAccum::new(),
137 348 : keys_with_image_coverage: None,
138 348 : layers_visited: 0,
139 348 : delta_layers_visited: 0,
140 348 : }
141 348 : }
142 :
143 : /// Associate a key with the error which it encountered and mark it as done
144 0 : pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
145 0 : let previous = self.keys.insert(key, Err(err));
146 0 : if let Some(Ok(state)) = previous {
147 0 : if state.situation == ValueReconstructSituation::Continue {
148 0 : self.keys_done.add_key(key);
149 0 : }
150 0 : }
151 0 : }
152 :
153 234 : pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
154 234 : self.layers_visited += 1;
155 234 : if let ReadableLayer::PersistentLayer(layer) = layer {
156 220 : if layer.layer_desc().is_delta() {
157 154 : self.delta_layers_visited += 1;
158 154 : }
159 14 : }
160 234 : }
161 :
162 24 : pub(crate) fn get_delta_layers_visited(&self) -> u32 {
163 24 : self.delta_layers_visited
164 24 : }
165 :
166 128 : pub(crate) fn get_layers_visited(&self) -> u32 {
167 128 : self.layers_visited
168 128 : }
169 :
170 : /// This function is called after reading a keyspace from a layer.
171 : /// It checks if the read path has now moved past the cached Lsn for any keys.
172 : ///
173 : /// Implementation note: We intentionally iterate over the keys for which we've
174 : /// already collected some reconstruct data. This avoids scaling complexity with
175 : /// the size of the search space.
176 168 : pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
177 74150 : for (key, value) in self.keys.iter_mut() {
178 74150 : if !keyspace.contains(key) {
179 44054 : continue;
180 30096 : }
181 :
182 30096 : if let Ok(state) = value {
183 30096 : if state.situation != ValueReconstructSituation::Complete
184 0 : && state.get_cached_lsn() >= Some(advanced_to)
185 0 : {
186 0 : state.situation = ValueReconstructSituation::Complete;
187 0 : self.keys_done.add_key(*key);
188 30096 : }
189 0 : }
190 : }
191 168 : }
192 :
193 : /// On hitting image layer, we can mark all keys in this range as done, because
194 : /// if the image layer does not contain a key, it is deleted/never added.
195 74 : pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
196 74 : let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
197 74 : assert_eq!(
198 : prev_val, None,
199 0 : "should consume the keyspace before the next iteration"
200 : );
201 74 : }
202 :
203 : /// Update the state collected for a given key.
204 : /// Returns true if this was the last value needed for the key and false otherwise.
205 : ///
206 : /// If the key is done after the update, mark it as such.
207 40508 : pub(crate) fn update_key(
208 40508 : &mut self,
209 40508 : key: &Key,
210 40508 : lsn: Lsn,
211 40508 : value: Value,
212 40508 : ) -> ValueReconstructSituation {
213 40508 : let state = self
214 40508 : .keys
215 40508 : .entry(*key)
216 40508 : .or_insert(Ok(VectoredValueReconstructState::default()));
217 :
218 40508 : if let Ok(state) = state {
219 40508 : let key_done = match state.situation {
220 0 : ValueReconstructSituation::Complete => unreachable!(),
221 40508 : ValueReconstructSituation::Continue => match value {
222 40508 : Value::Image(img) => {
223 40508 : state.img = Some((lsn, img));
224 40508 : true
225 : }
226 0 : Value::WalRecord(rec) => {
227 0 : debug_assert!(
228 0 : Some(lsn) > state.get_cached_lsn(),
229 0 : "Attempt to collect a record below cached LSN for walredo: {} < {}",
230 0 : lsn,
231 0 : state
232 0 : .get_cached_lsn()
233 0 : .expect("Assertion can only fire if a cached lsn is present")
234 : );
235 :
236 0 : let will_init = rec.will_init();
237 0 : state.records.push((lsn, rec));
238 0 : will_init
239 : }
240 : },
241 : };
242 :
243 40508 : if key_done && state.situation == ValueReconstructSituation::Continue {
244 40508 : state.situation = ValueReconstructSituation::Complete;
245 40508 : self.keys_done.add_key(*key);
246 40508 : }
247 :
248 40508 : state.situation
249 : } else {
250 0 : ValueReconstructSituation::Complete
251 : }
252 40508 : }
253 :
254 : /// Returns the Lsn at which this key is cached if one exists.
255 : /// The read path should go no further than this Lsn for the given key.
256 191353 : pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
257 191353 : self.keys
258 191353 : .get(key)
259 191353 : .and_then(|k| k.as_ref().ok())
260 191353 : .and_then(|state| state.get_cached_lsn())
261 191353 : }
262 :
263 : /// Returns the key space describing the keys that have
264 : /// been marked as completed since the last call to this function.
265 : /// Returns individual keys done, and the image layer coverage.
266 408 : pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
267 408 : (
268 408 : self.keys_done.consume_keyspace(),
269 408 : self.keys_with_image_coverage.take(),
270 408 : )
271 408 : }
272 : }
273 :
274 : impl Default for ValuesReconstructState {
275 66 : fn default() -> Self {
276 66 : Self::new()
277 66 : }
278 : }
279 :
280 : /// A key that uniquely identifies a layer in a timeline
281 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
282 : pub(crate) enum LayerId {
283 : PersitentLayerId(PersistentLayerKey),
284 : InMemoryLayerId(InMemoryLayerFileId),
285 : }
286 :
287 : /// Layer wrapper for the read path. Note that it is valid
288 : /// to use these layers even after external operations have
289 : /// been performed on them (compaction, freeze, etc.).
290 : #[derive(Debug)]
291 : pub(crate) enum ReadableLayer {
292 : PersistentLayer(Layer),
293 : InMemoryLayer(Arc<InMemoryLayer>),
294 : }
295 :
296 : /// A partial description of a read to be done.
297 : #[derive(Debug, Clone)]
298 : struct ReadDesc {
299 : /// An id used to resolve the readable layer within the fringe
300 : layer_id: LayerId,
301 : /// Lsn range for the read, used for selecting the next read
302 : lsn_range: Range<Lsn>,
303 : }
304 :
305 : /// Data structure which maintains a fringe of layers for the
306 : /// read path. The fringe is the set of layers which intersects
307 : /// the current keyspace that the search is descending on.
308 : /// Each layer tracks the keyspace that intersects it.
309 : ///
310 : /// The fringe must appear sorted by Lsn. Hence, it uses
311 : /// a two layer indexing scheme.
312 : #[derive(Debug)]
313 : pub(crate) struct LayerFringe {
314 : planned_reads_by_lsn: BinaryHeap<ReadDesc>,
315 : layers: HashMap<LayerId, LayerKeyspace>,
316 : }
317 :
318 : #[derive(Debug)]
319 : struct LayerKeyspace {
320 : layer: ReadableLayer,
321 : target_keyspace: KeySpaceRandomAccum,
322 : }
323 :
324 : impl LayerFringe {
325 174 : pub(crate) fn new() -> Self {
326 174 : LayerFringe {
327 174 : planned_reads_by_lsn: BinaryHeap::new(),
328 174 : layers: HashMap::new(),
329 174 : }
330 174 : }
331 :
332 408 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
333 408 : let read_desc = match self.planned_reads_by_lsn.pop() {
334 234 : Some(desc) => desc,
335 174 : None => return None,
336 : };
337 :
338 234 : let removed = self.layers.remove_entry(&read_desc.layer_id);
339 234 :
340 234 : match removed {
341 : Some((
342 : _,
343 : LayerKeyspace {
344 234 : layer,
345 234 : mut target_keyspace,
346 234 : },
347 234 : )) => Some((
348 234 : layer,
349 234 : target_keyspace.consume_keyspace(),
350 234 : read_desc.lsn_range,
351 234 : )),
352 0 : None => unreachable!("fringe internals are always consistent"),
353 : }
354 408 : }
355 :
356 63826 : pub(crate) fn update(
357 63826 : &mut self,
358 63826 : layer: ReadableLayer,
359 63826 : keyspace: KeySpace,
360 63826 : lsn_range: Range<Lsn>,
361 63826 : ) {
362 63826 : let layer_id = layer.id();
363 63826 : let entry = self.layers.entry(layer_id.clone());
364 63826 : match entry {
365 63592 : Entry::Occupied(mut entry) => {
366 63592 : entry.get_mut().target_keyspace.add_keyspace(keyspace);
367 63592 : }
368 234 : Entry::Vacant(entry) => {
369 234 : self.planned_reads_by_lsn.push(ReadDesc {
370 234 : lsn_range,
371 234 : layer_id: layer_id.clone(),
372 234 : });
373 234 : let mut accum = KeySpaceRandomAccum::new();
374 234 : accum.add_keyspace(keyspace);
375 234 : entry.insert(LayerKeyspace {
376 234 : layer,
377 234 : target_keyspace: accum,
378 234 : });
379 234 : }
380 : }
381 63826 : }
382 : }
383 :
384 : impl Default for LayerFringe {
385 0 : fn default() -> Self {
386 0 : Self::new()
387 0 : }
388 : }
389 :
390 : impl Ord for ReadDesc {
391 18 : fn cmp(&self, other: &Self) -> Ordering {
392 18 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
393 18 : if ord == std::cmp::Ordering::Equal {
394 18 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
395 : } else {
396 0 : ord
397 : }
398 18 : }
399 : }
400 :
401 : impl PartialOrd for ReadDesc {
402 18 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
403 18 : Some(self.cmp(other))
404 18 : }
405 : }
406 :
407 : impl PartialEq for ReadDesc {
408 0 : fn eq(&self, other: &Self) -> bool {
409 0 : self.lsn_range == other.lsn_range
410 0 : }
411 : }
412 :
413 : impl Eq for ReadDesc {}
414 :
415 : impl ReadableLayer {
416 63826 : pub(crate) fn id(&self) -> LayerId {
417 63826 : match self {
418 63812 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
419 14 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
420 : }
421 63826 : }
422 :
423 234 : pub(crate) async fn get_values_reconstruct_data(
424 234 : &self,
425 234 : keyspace: KeySpace,
426 234 : lsn_range: Range<Lsn>,
427 234 : reconstruct_state: &mut ValuesReconstructState,
428 234 : ctx: &RequestContext,
429 234 : ) -> Result<(), GetVectoredError> {
430 234 : match self {
431 220 : ReadableLayer::PersistentLayer(layer) => {
432 220 : layer
433 220 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
434 11439 : .await
435 : }
436 14 : ReadableLayer::InMemoryLayer(layer) => {
437 14 : layer
438 14 : .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
439 27 : .await
440 : }
441 : }
442 234 : }
443 : }
444 :
445 : /// Return value from [`Layer::get_value_reconstruct_data`]
446 : #[derive(Clone, Copy, Debug)]
447 : pub enum ValueReconstructResult {
448 : /// Got all the data needed to reconstruct the requested page
449 : Complete,
450 : /// This layer didn't contain all the required data, the caller should look up
451 : /// the predecessor layer at the returned LSN and collect more data from there.
452 : Continue,
453 :
454 : /// This layer didn't contain data needed to reconstruct the page version at
455 : /// the returned LSN. This is usually considered an error, but might be OK
456 : /// in some circumstances.
457 : Missing,
458 : }
459 :
460 : #[derive(Debug)]
461 : pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
462 :
463 : /// This struct holds two instances of [`LayerAccessStatsInner`].
464 : /// Accesses are recorded to both instances.
465 : /// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
466 : /// The `for_eviction_policy` is never reset.
467 : #[derive(Debug, Default, Clone)]
468 : struct LayerAccessStatsLocked {
469 : for_scraping_api: LayerAccessStatsInner,
470 : for_eviction_policy: LayerAccessStatsInner,
471 : }
472 :
473 : impl LayerAccessStatsLocked {
474 213768 : fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
475 213768 : [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
476 213768 : }
477 : }
478 :
479 : #[derive(Debug, Default, Clone)]
480 : struct LayerAccessStatsInner {
481 : first_access: Option<LayerAccessStatFullDetails>,
482 : count_by_access_kind: EnumMap<LayerAccessKind, u64>,
483 : task_kind_flag: EnumSet<TaskKind>,
484 : last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
485 : last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
486 : }
487 :
488 : #[derive(Debug, Clone, Copy)]
489 : pub(crate) struct LayerAccessStatFullDetails {
490 : pub(crate) when: SystemTime,
491 : pub(crate) task_kind: TaskKind,
492 : pub(crate) access_kind: LayerAccessKind,
493 : }
494 :
495 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
496 : pub enum LayerAccessStatsReset {
497 : NoReset,
498 : JustTaskKindFlags,
499 : AllStats,
500 : }
501 :
502 0 : fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
503 0 : ts.duration_since(UNIX_EPOCH)
504 0 : .expect("better to die in this unlikely case than report false stats")
505 0 : .as_millis()
506 0 : .try_into()
507 0 : .expect("64 bits is enough for few more years")
508 0 : }
509 :
510 : impl LayerAccessStatFullDetails {
511 0 : fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
512 0 : let Self {
513 0 : when,
514 0 : task_kind,
515 0 : access_kind,
516 0 : } = self;
517 0 : pageserver_api::models::LayerAccessStatFullDetails {
518 0 : when_millis_since_epoch: system_time_to_millis_since_epoch(when),
519 0 : task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
520 0 : access_kind: *access_kind,
521 0 : }
522 0 : }
523 : }
524 :
525 : impl LayerAccessStats {
526 : /// Create an empty stats object.
527 : ///
528 : /// The caller is responsible for recording a residence event
529 : /// using [`record_residence_event`] before calling `latest_activity`.
530 : /// If they don't, [`latest_activity`] will return `None`.
531 : ///
532 : /// [`record_residence_event`]: Self::record_residence_event
533 : /// [`latest_activity`]: Self::latest_activity
534 1544 : pub(crate) fn empty_will_record_residence_event_later() -> Self {
535 1544 : LayerAccessStats(Mutex::default())
536 1544 : }
537 :
538 : /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
539 : ///
540 : /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
541 : ///
542 : /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
543 : /// [`record_residence_event`]: Self::record_residence_event
544 24 : pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
545 24 : let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
546 24 : new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
547 24 : new
548 24 : }
549 :
550 : /// Record a change in layer residency.
551 : ///
552 : /// Recording the event must happen while holding the layer map lock to
553 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
554 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
555 : ///
556 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
557 : /// the following race could happen:
558 : ///
559 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
560 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
561 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
562 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
563 : ///
564 1592 : pub(crate) fn record_residence_event(
565 1592 : &self,
566 1592 : status: LayerResidenceStatus,
567 1592 : reason: LayerResidenceEventReason,
568 1592 : ) {
569 1592 : let mut locked = self.0.lock().unwrap();
570 3184 : locked.iter_mut().for_each(|inner| {
571 3184 : inner
572 3184 : .last_residence_changes
573 3184 : .write(LayerResidenceEvent::new(status, reason))
574 3184 : });
575 1592 : }
576 :
577 212530 : fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
578 212530 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
579 354 : return;
580 212176 : }
581 212176 :
582 212176 : let this_access = LayerAccessStatFullDetails {
583 212176 : when: SystemTime::now(),
584 212176 : task_kind: ctx.task_kind(),
585 212176 : access_kind,
586 212176 : };
587 212176 :
588 212176 : let mut locked = self.0.lock().unwrap();
589 424352 : locked.iter_mut().for_each(|inner| {
590 424352 : inner.first_access.get_or_insert(this_access);
591 424352 : inner.count_by_access_kind[access_kind] += 1;
592 424352 : inner.task_kind_flag |= ctx.task_kind();
593 424352 : inner.last_accesses.write(this_access);
594 424352 : })
595 212530 : }
596 :
597 0 : fn as_api_model(
598 0 : &self,
599 0 : reset: LayerAccessStatsReset,
600 0 : ) -> pageserver_api::models::LayerAccessStats {
601 0 : let mut locked = self.0.lock().unwrap();
602 0 : let inner = &mut locked.for_scraping_api;
603 0 : let LayerAccessStatsInner {
604 0 : first_access,
605 0 : count_by_access_kind,
606 0 : task_kind_flag,
607 0 : last_accesses,
608 0 : last_residence_changes,
609 0 : } = inner;
610 0 : let ret = pageserver_api::models::LayerAccessStats {
611 0 : access_count_by_access_kind: count_by_access_kind
612 0 : .iter()
613 0 : .map(|(kind, count)| (kind, *count))
614 0 : .collect(),
615 0 : task_kind_access_flag: task_kind_flag
616 0 : .iter()
617 0 : .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
618 0 : .collect(),
619 0 : first: first_access.as_ref().map(|a| a.as_api_model()),
620 0 : accesses_history: last_accesses.map(|m| m.as_api_model()),
621 0 : residence_events_history: last_residence_changes.clone(),
622 0 : };
623 0 : match reset {
624 0 : LayerAccessStatsReset::NoReset => (),
625 0 : LayerAccessStatsReset::JustTaskKindFlags => {
626 0 : inner.task_kind_flag.clear();
627 0 : }
628 0 : LayerAccessStatsReset::AllStats => {
629 0 : *inner = LayerAccessStatsInner::default();
630 0 : }
631 : }
632 0 : ret
633 0 : }
634 :
635 : /// Get the latest access timestamp, falling back to latest residence event, further falling
636 : /// back to `SystemTime::now` for a usable timestamp for eviction.
637 0 : pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
638 0 : self.latest_activity().unwrap_or_else(SystemTime::now)
639 0 : }
640 :
641 : /// Get the latest access timestamp, falling back to latest residence event.
642 : ///
643 : /// This function can only return `None` if there has not yet been a call to the
644 : /// [`record_residence_event`] method. That would generally be considered an
645 : /// implementation error. This function logs a rate-limited warning in that case.
646 : ///
647 : /// TODO: use type system to avoid the need for `fallback`.
648 : /// The approach in <https://github.com/neondatabase/neon/pull/3775>
649 : /// could be used to enforce that a residence event is recorded
650 : /// before a layer is added to the layer map. We could also have
651 : /// a layer wrapper type that holds the LayerAccessStats, and ensure
652 : /// that that type can only be produced by inserting into the layer map.
653 : ///
654 : /// [`record_residence_event`]: Self::record_residence_event
655 0 : fn latest_activity(&self) -> Option<SystemTime> {
656 0 : let locked = self.0.lock().unwrap();
657 0 : let inner = &locked.for_eviction_policy;
658 0 : match inner.last_accesses.recent() {
659 0 : Some(a) => Some(a.when),
660 0 : None => match inner.last_residence_changes.recent() {
661 0 : Some(e) => Some(e.timestamp),
662 : None => {
663 : static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
664 0 : Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
665 0 : let mut guard = WARN_RATE_LIMIT.lock().unwrap();
666 0 : guard.0 += 1;
667 0 : let occurences = guard.0;
668 0 : guard.1.call(move || {
669 0 : warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
670 0 : });
671 0 : None
672 : }
673 : },
674 : }
675 0 : }
676 : }
677 :
678 : /// Get a layer descriptor from a layer.
679 : pub trait AsLayerDesc {
680 : /// Get the layer descriptor.
681 : fn layer_desc(&self) -> &PersistentLayerDesc;
682 : }
683 :
684 : pub mod tests {
685 : use pageserver_api::shard::TenantShardId;
686 :
687 : use super::*;
688 :
689 : impl From<DeltaLayerName> for PersistentLayerDesc {
690 0 : fn from(value: DeltaLayerName) -> Self {
691 0 : PersistentLayerDesc::new_delta(
692 0 : TenantShardId::from([0; 18]),
693 0 : TimelineId::from_array([0; 16]),
694 0 : value.key_range,
695 0 : value.lsn_range,
696 0 : 233,
697 0 : )
698 0 : }
699 : }
700 :
701 : impl From<ImageLayerName> for PersistentLayerDesc {
702 0 : fn from(value: ImageLayerName) -> Self {
703 0 : PersistentLayerDesc::new_img(
704 0 : TenantShardId::from([0; 18]),
705 0 : TimelineId::from_array([0; 16]),
706 0 : value.key_range,
707 0 : value.lsn,
708 0 : 233,
709 0 : )
710 0 : }
711 : }
712 :
713 : impl From<LayerName> for PersistentLayerDesc {
714 0 : fn from(value: LayerName) -> Self {
715 0 : match value {
716 0 : LayerName::Delta(d) => Self::from(d),
717 0 : LayerName::Image(i) => Self::from(i),
718 : }
719 0 : }
720 : }
721 : }
722 :
723 : /// Range wrapping newtype, which uses display to render Debug.
724 : ///
725 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
726 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
727 :
728 : impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
729 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
730 0 : write!(f, "{}..{}", self.0.start, self.0.end)
731 0 : }
732 : }
|