TLA Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod delta_layer;
4 : mod filename;
5 : mod image_layer;
6 : mod inmemory_layer;
7 : mod layer_desc;
8 : mod remote_layer;
9 :
10 : use crate::config::PageServerConf;
11 : use crate::context::{AccessStatsBehavior, RequestContext};
12 : use crate::repository::Key;
13 : use crate::task_mgr::TaskKind;
14 : use crate::walrecord::NeonWalRecord;
15 : use anyhow::Result;
16 : use bytes::Bytes;
17 : use camino::Utf8PathBuf;
18 : use enum_map::EnumMap;
19 : use enumset::EnumSet;
20 : use once_cell::sync::Lazy;
21 : use pageserver_api::models::LayerAccessKind;
22 : use pageserver_api::models::{
23 : HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
24 : };
25 : use std::ops::Range;
26 : use std::sync::{Arc, Mutex};
27 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
28 : use tracing::warn;
29 : use utils::history_buffer::HistoryBufferWithDropCounter;
30 : use utils::rate_limit::RateLimit;
31 :
32 : use utils::{
33 : id::{TenantId, TimelineId},
34 : lsn::Lsn,
35 : };
36 :
37 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
38 : pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
39 : pub use image_layer::{ImageLayer, ImageLayerWriter};
40 : pub use inmemory_layer::InMemoryLayer;
41 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
42 : pub use remote_layer::RemoteLayer;
43 :
44 UBC 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
45 0 : where
46 0 : T: PartialOrd<T>,
47 0 : {
48 0 : if a.start < b.start {
49 0 : a.end > b.start
50 : } else {
51 0 : b.end > a.start
52 : }
53 0 : }
54 :
55 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
56 : ///
57 : /// Before first call, you can fill in 'page_img' if you have an older cached
58 : /// version of the page available. That can save work in
59 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
60 : /// when all the WAL records going back to the cached image have been collected.
61 : ///
62 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
63 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
64 : /// record that initializes the page without requiring a previous image.
65 : ///
66 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
67 : /// been collected, but there are more records outside the current layer. Pass
68 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
69 : /// call, to collect more records.
70 : ///
71 0 : #[derive(Debug)]
72 : pub struct ValueReconstructState {
73 : pub records: Vec<(Lsn, NeonWalRecord)>,
74 : pub img: Option<(Lsn, Bytes)>,
75 : }
76 :
77 : /// Return value from Layer::get_page_reconstruct_data
78 CBC 12 : #[derive(Clone, Copy, Debug)]
79 : pub enum ValueReconstructResult {
80 : /// Got all the data needed to reconstruct the requested page
81 : Complete,
82 : /// This layer didn't contain all the required data, the caller should look up
83 : /// the predecessor layer at the returned LSN and collect more data from there.
84 : Continue,
85 :
86 : /// This layer didn't contain data needed to reconstruct the page version at
87 : /// the returned LSN. This is usually considered an error, but might be OK
88 : /// in some circumstances.
89 : Missing,
90 : }
91 :
92 UBC 0 : #[derive(Debug)]
93 : pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
94 :
95 : /// This struct holds two instances of [`LayerAccessStatsInner`].
96 : /// Accesses are recorded to both instances.
97 : /// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
98 : /// The `for_eviction_policy` is never reset.
99 CBC 27201 : #[derive(Debug, Default, Clone)]
100 : struct LayerAccessStatsLocked {
101 : for_scraping_api: LayerAccessStatsInner,
102 : for_eviction_policy: LayerAccessStatsInner,
103 : }
104 :
105 : impl LayerAccessStatsLocked {
106 21826849 : fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
107 21826849 : [&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
108 21826849 : }
109 : }
110 :
111 54402 : #[derive(Debug, Default, Clone)]
112 : struct LayerAccessStatsInner {
113 : first_access: Option<LayerAccessStatFullDetails>,
114 : count_by_access_kind: EnumMap<LayerAccessKind, u64>,
115 : task_kind_flag: EnumSet<TaskKind>,
116 : last_accesses: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
117 : last_residence_changes: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
118 : }
119 :
120 6308 : #[derive(Debug, Clone, Copy)]
121 : pub(crate) struct LayerAccessStatFullDetails {
122 : pub(crate) when: SystemTime,
123 : pub(crate) task_kind: TaskKind,
124 : pub(crate) access_kind: LayerAccessKind,
125 : }
126 :
127 UBC 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
128 : pub enum LayerAccessStatsReset {
129 : NoReset,
130 : JustTaskKindFlags,
131 : AllStats,
132 : }
133 :
134 CBC 8532 : fn system_time_to_millis_since_epoch(ts: &SystemTime) -> u64 {
135 8532 : ts.duration_since(UNIX_EPOCH)
136 8532 : .expect("better to die in this unlikely case than report false stats")
137 8532 : .as_millis()
138 8532 : .try_into()
139 8532 : .expect("64 bits is enough for few more years")
140 8532 : }
141 :
142 : impl LayerAccessStatFullDetails {
143 8532 : fn as_api_model(&self) -> pageserver_api::models::LayerAccessStatFullDetails {
144 8532 : let Self {
145 8532 : when,
146 8532 : task_kind,
147 8532 : access_kind,
148 8532 : } = self;
149 8532 : pageserver_api::models::LayerAccessStatFullDetails {
150 8532 : when_millis_since_epoch: system_time_to_millis_since_epoch(when),
151 8532 : task_kind: task_kind.into(), // into static str, powered by strum_macros
152 8532 : access_kind: *access_kind,
153 8532 : }
154 8532 : }
155 : }
156 :
157 : impl LayerAccessStats {
158 : /// Create an empty stats object.
159 : ///
160 : /// The caller is responsible for recording a residence event
161 : /// using [`record_residence_event`] before calling `latest_activity`.
162 : /// If they don't, [`latest_activity`] will return `None`.
163 : ///
164 : /// [`record_residence_event`]: Self::record_residence_event
165 : /// [`latest_activity`]: Self::latest_activity
166 19123 : pub(crate) fn empty_will_record_residence_event_later() -> Self {
167 19123 : LayerAccessStats(Mutex::default())
168 19123 : }
169 :
170 : /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
171 : ///
172 : /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
173 : ///
174 : /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
175 : /// [`record_residence_event`]: Self::record_residence_event
176 8078 : pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
177 8078 : let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
178 8078 : new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
179 8078 : new
180 8078 : }
181 :
182 : /// Creates a clone of `self` and records `new_status` in the clone.
183 : ///
184 : /// The `new_status` is not recorded in `self`.
185 : ///
186 : /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
187 : ///
188 : /// [`record_residence_event`]: Self::record_residence_event
189 1961 : pub(crate) fn clone_for_residence_change(
190 1961 : &self,
191 1961 : new_status: LayerResidenceStatus,
192 1961 : ) -> LayerAccessStats {
193 1961 : let clone = {
194 1961 : let inner = self.0.lock().unwrap();
195 1961 : inner.clone()
196 1961 : };
197 1961 : let new = LayerAccessStats(Mutex::new(clone));
198 1961 : new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
199 1961 : new
200 1961 : }
201 :
202 : /// Record a change in layer residency.
203 : ///
204 : /// Recording the event must happen while holding the layer map lock to
205 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
206 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
207 : ///
208 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
209 : /// the following race could happen:
210 : ///
211 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
212 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
213 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
214 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
215 : ///
216 28639 : pub(crate) fn record_residence_event(
217 28639 : &self,
218 28639 : status: LayerResidenceStatus,
219 28639 : reason: LayerResidenceEventReason,
220 28639 : ) {
221 28639 : let mut locked = self.0.lock().unwrap();
222 57278 : locked.iter_mut().for_each(|inner| {
223 57278 : inner
224 57278 : .last_residence_changes
225 57278 : .write(LayerResidenceEvent::new(status, reason))
226 57278 : });
227 28639 : }
228 :
229 23854565 : fn record_access(&self, access_kind: LayerAccessKind, ctx: &RequestContext) {
230 23854565 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
231 2056355 : return;
232 21798210 : }
233 21798210 :
234 21798210 : let this_access = LayerAccessStatFullDetails {
235 21798210 : when: SystemTime::now(),
236 21798210 : task_kind: ctx.task_kind(),
237 21798210 : access_kind,
238 21798210 : };
239 21798210 :
240 21798210 : let mut locked = self.0.lock().unwrap();
241 43596446 : locked.iter_mut().for_each(|inner| {
242 43596446 : inner.first_access.get_or_insert(this_access);
243 43596446 : inner.count_by_access_kind[access_kind] += 1;
244 43596446 : inner.task_kind_flag |= ctx.task_kind();
245 43596446 : inner.last_accesses.write(this_access);
246 43596446 : })
247 23854565 : }
248 :
249 1775 : fn as_api_model(
250 1775 : &self,
251 1775 : reset: LayerAccessStatsReset,
252 1775 : ) -> pageserver_api::models::LayerAccessStats {
253 1775 : let mut locked = self.0.lock().unwrap();
254 1775 : let inner = &mut locked.for_scraping_api;
255 1775 : let LayerAccessStatsInner {
256 1775 : first_access,
257 1775 : count_by_access_kind,
258 1775 : task_kind_flag,
259 1775 : last_accesses,
260 1775 : last_residence_changes,
261 1775 : } = inner;
262 1775 : let ret = pageserver_api::models::LayerAccessStats {
263 1775 : access_count_by_access_kind: count_by_access_kind
264 1775 : .iter()
265 7100 : .map(|(kind, count)| (kind, *count))
266 1775 : .collect(),
267 1775 : task_kind_access_flag: task_kind_flag
268 1775 : .iter()
269 1775 : .map(|task_kind| task_kind.into()) // into static str, powered by strum_macros
270 1775 : .collect(),
271 1775 : first: first_access.as_ref().map(|a| a.as_api_model()),
272 8025 : accesses_history: last_accesses.map(|m| m.as_api_model()),
273 1775 : residence_events_history: last_residence_changes.clone(),
274 1775 : };
275 1775 : match reset {
276 1775 : LayerAccessStatsReset::NoReset => (),
277 UBC 0 : LayerAccessStatsReset::JustTaskKindFlags => {
278 0 : inner.task_kind_flag.clear();
279 0 : }
280 0 : LayerAccessStatsReset::AllStats => {
281 0 : *inner = LayerAccessStatsInner::default();
282 0 : }
283 : }
284 CBC 1775 : ret
285 1775 : }
286 :
287 : /// Get the latest access timestamp, falling back to latest residence event.
288 : ///
289 : /// This function can only return `None` if there has not yet been a call to the
290 : /// [`record_residence_event`] method. That would generally be considered an
291 : /// implementation error. This function logs a rate-limited warning in that case.
292 : ///
293 : /// TODO: use type system to avoid the need for `fallback`.
294 : /// The approach in <https://github.com/neondatabase/neon/pull/3775>
295 : /// could be used to enforce that a residence event is recorded
296 : /// before a layer is added to the layer map. We could also have
297 : /// a layer wrapper type that holds the LayerAccessStats, and ensure
298 : /// that that type can only be produced by inserting into the layer map.
299 : ///
300 : /// [`record_residence_event`]: Self::record_residence_event
301 1245 : pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
302 1245 : let locked = self.0.lock().unwrap();
303 1245 : let inner = &locked.for_eviction_policy;
304 1245 : match inner.last_accesses.recent() {
305 797 : Some(a) => Some(a.when),
306 448 : None => match inner.last_residence_changes.recent() {
307 448 : Some(e) => Some(e.timestamp),
308 : None => {
309 : static WARN_RATE_LIMIT: Lazy<Mutex<(usize, RateLimit)>> =
310 UBC 0 : Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10)))));
311 0 : let mut guard = WARN_RATE_LIMIT.lock().unwrap();
312 0 : guard.0 += 1;
313 0 : let occurences = guard.0;
314 0 : guard.1.call(move || {
315 0 : warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value");
316 0 : });
317 0 : None
318 : }
319 : },
320 : }
321 CBC 1245 : }
322 : }
323 :
324 : /// Supertrait of the [`Layer`] trait that captures the bare minimum interface
325 : /// required by [`LayerMap`](super::layer_map::LayerMap).
326 : ///
327 : /// All layers should implement a minimal `std::fmt::Debug` without tenant or
328 : /// timeline names, because those are known in the context of which the layers
329 : /// are used in (timeline).
330 : #[async_trait::async_trait]
331 : pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
332 : ///
333 : /// Return data needed to reconstruct given page at LSN.
334 : ///
335 : /// It is up to the caller to collect more data from previous layer and
336 : /// perform WAL redo, if necessary.
337 : ///
338 : /// See PageReconstructResult for possible return values. The collected data
339 : /// is appended to reconstruct_data; the caller should pass an empty struct
340 : /// on first call, or a struct with a cached older image of the page if one
341 : /// is available. If this returns ValueReconstructResult::Continue, look up
342 : /// the predecessor layer and call again with the same 'reconstruct_data' to
343 : /// collect more data.
344 : async fn get_value_reconstruct_data(
345 : &self,
346 : key: Key,
347 : lsn_range: Range<Lsn>,
348 : reconstruct_data: &mut ValueReconstructState,
349 : ctx: &RequestContext,
350 : ) -> Result<ValueReconstructResult>;
351 : }
352 :
353 : /// Get a layer descriptor from a layer.
354 : pub trait AsLayerDesc {
355 : /// Get the layer descriptor.
356 : fn layer_desc(&self) -> &PersistentLayerDesc;
357 : }
358 :
359 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
360 : /// range of LSNs.
361 : ///
362 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
363 : /// layers are used to ingest incoming WAL, and provide fast access to the
364 : /// recent page versions. On-disk layers are stored as files on disk, and are
365 : /// immutable. This trait presents the common functionality of in-memory and
366 : /// on-disk layers.
367 : ///
368 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
369 : /// A delta layer contains all modifications within a range of LSNs and keys.
370 : /// An image layer is a snapshot of all the data in a key-range, at a single
371 : /// LSN.
372 : pub trait PersistentLayer: Layer + AsLayerDesc {
373 : /// File name used for this layer, both in the pageserver's local filesystem
374 : /// state as well as in the remote storage.
375 21161 : fn filename(&self) -> LayerFileName {
376 21161 : self.layer_desc().filename()
377 21161 : }
378 :
379 : // Path to the layer file in the local filesystem.
380 : // `None` for `RemoteLayer`.
381 : fn local_path(&self) -> Option<Utf8PathBuf>;
382 :
383 : /// Permanently remove this layer from disk.
384 : fn delete_resident_layer_file(&self) -> Result<()>;
385 :
386 4 : fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
387 4 : None
388 4 : }
389 :
390 UBC 0 : fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
391 0 : None
392 0 : }
393 :
394 CBC 23863250 : fn is_remote_layer(&self) -> bool {
395 23863250 : false
396 23863250 : }
397 :
398 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
399 :
400 : fn access_stats(&self) -> &LayerAccessStats;
401 : }
402 :
403 23851654 : pub fn downcast_remote_layer(
404 23851654 : layer: &Arc<dyn PersistentLayer>,
405 23851654 : ) -> Option<std::sync::Arc<RemoteLayer>> {
406 23851654 : if layer.is_remote_layer() {
407 1424 : Arc::clone(layer).downcast_remote_layer()
408 : } else {
409 23850230 : None
410 : }
411 23851654 : }
412 :
413 : pub mod tests {
414 : use super::*;
415 :
416 : impl From<DeltaFileName> for PersistentLayerDesc {
417 3 : fn from(value: DeltaFileName) -> Self {
418 3 : PersistentLayerDesc::new_delta(
419 3 : TenantId::from_array([0; 16]),
420 3 : TimelineId::from_array([0; 16]),
421 3 : value.key_range,
422 3 : value.lsn_range,
423 3 : 233,
424 3 : )
425 3 : }
426 : }
427 :
428 : impl From<ImageFileName> for PersistentLayerDesc {
429 1 : fn from(value: ImageFileName) -> Self {
430 1 : PersistentLayerDesc::new_img(
431 1 : TenantId::from_array([0; 16]),
432 1 : TimelineId::from_array([0; 16]),
433 1 : value.key_range,
434 1 : value.lsn,
435 1 : 233,
436 1 : )
437 1 : }
438 : }
439 :
440 : impl From<LayerFileName> for PersistentLayerDesc {
441 4 : fn from(value: LayerFileName) -> Self {
442 4 : match value {
443 3 : LayerFileName::Delta(d) => Self::from(d),
444 1 : LayerFileName::Image(i) => Self::from(i),
445 : }
446 4 : }
447 : }
448 : }
449 :
450 : /// Helper enum to hold a PageServerConf, or a path
451 : ///
452 : /// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
453 : /// global config, and paths to layer files are constructed using the tenant/timeline
454 : /// path from the config. But in the 'pagectl' binary, we need to construct a Layer
455 : /// struct for a file on disk, without having a page server running, so that we have no
456 : /// config. In that case, we use the Path variant to hold the full path to the file on
457 : /// disk.
458 : enum PathOrConf {
459 : Path(Utf8PathBuf),
460 : Conf(&'static PageServerConf),
461 : }
462 :
463 : /// Range wrapping newtype, which uses display to render Debug.
464 : ///
465 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
466 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
467 :
468 : impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
469 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470 0 : write!(f, "{}..{}", self.0.start, self.0.end)
471 0 : }
472 : }
|