Line data Source code
1 : use anyhow::{bail, ensure, Context};
2 : use itertools::Itertools;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : context::RequestContext,
14 : metrics::TimelineMetrics,
15 : tenant::{
16 : layer_map::{BatchedUpdates, LayerMap},
17 : storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
19 : ResidentLayer,
20 : },
21 : },
22 : };
23 :
24 : use super::TimelineWriterState;
25 :
26 : /// Provides semantic APIs to manipulate the layer map.
27 : pub(crate) enum LayerManager {
28 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
29 : /// the layers.
30 : Open(OpenLayerManager),
31 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
32 : /// read-only.
33 : Closed {
34 : layers: HashMap<PersistentLayerKey, Layer>,
35 : },
36 : }
37 :
38 : impl Default for LayerManager {
39 896 : fn default() -> Self {
40 896 : LayerManager::Open(OpenLayerManager::default())
41 896 : }
42 : }
43 :
44 : impl LayerManager {
45 481396 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
46 481396 : // The assumption for the `expect()` is that all code maintains the following invariant:
47 481396 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
48 481396 : self.try_get_from_key(key)
49 481396 : .with_context(|| format!("get layer from key: {key}"))
50 481396 : .expect("not found")
51 481396 : .clone()
52 481396 : }
53 :
54 481488 : pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
55 481488 : self.layers().get(key)
56 481488 : }
57 :
58 481352 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
59 481352 : self.get_from_key(&desc.key())
60 481352 : }
61 :
62 : /// Get an immutable reference to the layer map.
63 : ///
64 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
65 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
66 2154879 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
67 : use LayerManager::*;
68 2154879 : match self {
69 2154879 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
70 0 : Closed { .. } => Err(Shutdown),
71 : }
72 2154879 : }
73 :
74 9856 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
75 : use LayerManager::*;
76 :
77 9856 : match self {
78 9856 : Open(open) => Ok(open),
79 0 : Closed { .. } => Err(Shutdown),
80 : }
81 9856 : }
82 :
83 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
84 : /// order to allow shutdown to complete.
85 : ///
86 : /// If there was a want to flush in-memory layers, it must have happened earlier.
87 20 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
88 : use LayerManager::*;
89 20 : match self {
90 : Open(OpenLayerManager {
91 20 : layer_map,
92 20 : layer_fmgr: LayerFileManager(hashmap),
93 20 : }) => {
94 20 : // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
95 20 : let open = layer_map.open_layer.take();
96 20 : let frozen = layer_map.frozen_layers.len();
97 20 : let taken_writer_state = writer_state.take();
98 20 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
99 20 : let layers = std::mem::take(hashmap);
100 20 : *self = Closed { layers };
101 20 : assert_eq!(open.is_some(), taken_writer_state.is_some());
102 : }
103 : Closed { .. } => {
104 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
105 : }
106 : }
107 20 : }
108 :
109 : /// Sum up the historic layer sizes
110 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
111 0 : self.layers()
112 0 : .values()
113 0 : .map(|l| l.layer_desc().file_size)
114 0 : .sum()
115 0 : }
116 :
117 80 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
118 225 : self.layers().values().filter(|l| l.is_likely_resident())
119 80 : }
120 :
121 616 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
122 616 : self.contains_key(&layer.layer_desc().key())
123 616 : }
124 :
125 820 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
126 820 : self.layers().contains_key(key)
127 820 : }
128 :
129 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
130 0 : self.layers().keys().cloned().collect_vec()
131 0 : }
132 :
133 482388 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
134 : use LayerManager::*;
135 482388 : match self {
136 482388 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
137 0 : Closed { layers } => layers,
138 : }
139 482388 : }
140 : }
141 :
142 : #[derive(Default)]
143 : pub(crate) struct OpenLayerManager {
144 : layer_map: LayerMap,
145 : layer_fmgr: LayerFileManager<Layer>,
146 : }
147 :
148 : impl std::fmt::Debug for OpenLayerManager {
149 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 0 : f.debug_struct("OpenLayerManager")
151 0 : .field("layer_count", &self.layer_fmgr.0.len())
152 0 : .finish()
153 0 : }
154 : }
155 :
156 : #[derive(Debug, thiserror::Error)]
157 : #[error("layer manager has been shutdown")]
158 : pub(crate) struct Shutdown;
159 :
160 : impl OpenLayerManager {
161 : /// Called from `load_layer_map`. Initialize the layer manager with:
162 : /// 1. all on-disk layers
163 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
164 12 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
165 12 : let mut updates = self.layer_map.batch_update();
166 44 : for layer in layers {
167 32 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
168 32 : }
169 12 : updates.flush();
170 12 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
171 12 : }
172 :
173 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
174 884 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
175 884 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
176 884 : }
177 :
178 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
179 : /// current open layer, called within `get_layer_for_write`.
180 2596 : pub(crate) async fn get_layer_for_write(
181 2596 : &mut self,
182 2596 : lsn: Lsn,
183 2596 : conf: &'static PageServerConf,
184 2596 : timeline_id: TimelineId,
185 2596 : tenant_shard_id: TenantShardId,
186 2596 : gate: &utils::sync::gate::Gate,
187 2596 : ctx: &RequestContext,
188 2596 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
189 2596 : ensure!(lsn.is_aligned());
190 :
191 : // Do we have a layer open for writing already?
192 2596 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
193 0 : if open_layer.get_lsn_range().start > lsn {
194 0 : bail!(
195 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
196 0 : open_layer.get_lsn_range().start,
197 0 : lsn
198 0 : );
199 0 : }
200 0 :
201 0 : Arc::clone(open_layer)
202 : } else {
203 : // No writeable layer yet. Create one.
204 2596 : let start_lsn = self
205 2596 : .layer_map
206 2596 : .next_open_layer_at
207 2596 : .context("No next open layer found")?;
208 :
209 2596 : trace!(
210 0 : "creating in-memory layer at {}/{} for record at {}",
211 : timeline_id,
212 : start_lsn,
213 : lsn
214 : );
215 :
216 2596 : let new_layer =
217 2596 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, gate, ctx)
218 2596 : .await?;
219 2596 : let layer = Arc::new(new_layer);
220 2596 :
221 2596 : self.layer_map.open_layer = Some(layer.clone());
222 2596 : self.layer_map.next_open_layer_at = None;
223 2596 :
224 2596 : layer
225 : };
226 :
227 2596 : Ok(layer)
228 2596 : }
229 :
230 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
231 : ///
232 : /// Returns true if anything was frozen.
233 2404 : pub(super) async fn try_freeze_in_memory_layer(
234 2404 : &mut self,
235 2404 : lsn: Lsn,
236 2404 : last_freeze_at: &AtomicLsn,
237 2404 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
238 2404 : metrics: &TimelineMetrics,
239 2404 : ) -> bool {
240 2404 : let Lsn(last_record_lsn) = lsn;
241 2404 : let end_lsn = Lsn(last_record_lsn + 1);
242 :
243 2404 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
244 2348 : let open_layer_rc = Arc::clone(open_layer);
245 2348 : open_layer.freeze(end_lsn).await;
246 :
247 : // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
248 : // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
249 : // reference to the timeline metrics. Other methods use a metrics borrow as well.
250 2348 : metrics.inc_frozen_layer(open_layer);
251 2348 :
252 2348 : // The layer is no longer open, update the layer map to reflect this.
253 2348 : // We will replace it with on-disk historics below.
254 2348 : self.layer_map.frozen_layers.push_back(open_layer_rc);
255 2348 : self.layer_map.open_layer = None;
256 2348 : self.layer_map.next_open_layer_at = Some(end_lsn);
257 2348 :
258 2348 : true
259 : } else {
260 56 : false
261 : };
262 :
263 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
264 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
265 2404 : last_freeze_at.store(end_lsn);
266 2404 :
267 2404 : // the writer state must no longer have a reference to the frozen layer
268 2404 : let taken = write_lock.take();
269 2404 : assert_eq!(
270 2404 : froze,
271 2404 : taken.is_some(),
272 0 : "should only had frozen a layer when TimelineWriterState existed"
273 : );
274 :
275 2404 : froze
276 2404 : }
277 :
278 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
279 1140 : pub(crate) fn track_new_image_layers(
280 1140 : &mut self,
281 1140 : image_layers: &[ResidentLayer],
282 1140 : metrics: &TimelineMetrics,
283 1140 : ) {
284 1140 : let mut updates = self.layer_map.batch_update();
285 1628 : for layer in image_layers {
286 488 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
287 488 :
288 488 : // record these here instead of Layer::finish_creating because otherwise partial
289 488 : // failure with create_image_layers would balloon up the physical size gauge. downside
290 488 : // is that all layers need to be created before metrics are updated.
291 488 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
292 488 : }
293 1140 : updates.flush();
294 1140 : }
295 :
296 : /// Flush a frozen layer and add the written delta layer to the layer map.
297 2348 : pub(crate) fn finish_flush_l0_layer(
298 2348 : &mut self,
299 2348 : delta_layer: Option<&ResidentLayer>,
300 2348 : frozen_layer_for_check: &Arc<InMemoryLayer>,
301 2348 : metrics: &TimelineMetrics,
302 2348 : ) {
303 2348 : let inmem = self
304 2348 : .layer_map
305 2348 : .frozen_layers
306 2348 : .pop_front()
307 2348 : .expect("there must be a inmem layer to flush");
308 2348 : metrics.dec_frozen_layer(&inmem);
309 2348 :
310 2348 : // Only one task may call this function at a time (for this
311 2348 : // timeline). If two tasks tried to flush the same frozen
312 2348 : // layer to disk at the same time, that would not work.
313 2348 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
314 :
315 2348 : if let Some(l) = delta_layer {
316 1936 : let mut updates = self.layer_map.batch_update();
317 1936 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
318 1936 : metrics.record_new_file_metrics(l.layer_desc().file_size);
319 1936 : updates.flush();
320 1936 : }
321 2348 : }
322 :
323 : /// Called when compaction is completed.
324 56 : pub(crate) fn finish_compact_l0(
325 56 : &mut self,
326 56 : compact_from: &[Layer],
327 56 : compact_to: &[ResidentLayer],
328 56 : metrics: &TimelineMetrics,
329 56 : ) {
330 56 : let mut updates = self.layer_map.batch_update();
331 672 : for l in compact_to {
332 616 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
333 616 : metrics.record_new_file_metrics(l.layer_desc().file_size);
334 616 : }
335 860 : for l in compact_from {
336 804 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
337 804 : }
338 56 : updates.flush();
339 56 : }
340 :
341 : /// Called when a GC-compaction is completed.
342 96 : pub(crate) fn finish_gc_compaction(
343 96 : &mut self,
344 96 : compact_from: &[Layer],
345 96 : compact_to: &[ResidentLayer],
346 96 : metrics: &TimelineMetrics,
347 96 : ) {
348 96 : // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
349 96 :
350 96 : // Match the old layers with the new layers
351 96 : let mut add_layers = HashMap::new();
352 96 : let mut rewrite_layers = HashMap::new();
353 96 : let mut drop_layers = HashMap::new();
354 304 : for layer in compact_from {
355 208 : drop_layers.insert(layer.layer_desc().key(), layer.clone());
356 208 : }
357 204 : for layer in compact_to {
358 108 : if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
359 0 : rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
360 108 : } else {
361 108 : add_layers.insert(layer.layer_desc().key(), layer.clone());
362 108 : }
363 : }
364 96 : let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
365 96 : let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
366 96 : let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
367 96 :
368 96 : self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
369 96 : }
370 :
371 : /// Called post-compaction when some previous generation image layers were trimmed.
372 0 : pub fn rewrite_layers(
373 0 : &mut self,
374 0 : rewrite_layers: &[(Layer, ResidentLayer)],
375 0 : drop_layers: &[Layer],
376 0 : metrics: &TimelineMetrics,
377 0 : ) {
378 0 : self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
379 0 : }
380 :
381 96 : fn rewrite_layers_inner(
382 96 : &mut self,
383 96 : rewrite_layers: &[(Layer, ResidentLayer)],
384 96 : drop_layers: &[Layer],
385 96 : add_layers: &[ResidentLayer],
386 96 : metrics: &TimelineMetrics,
387 96 : ) {
388 96 : let mut updates = self.layer_map.batch_update();
389 96 : for (old_layer, new_layer) in rewrite_layers {
390 0 : debug_assert_eq!(
391 0 : old_layer.layer_desc().key_range,
392 0 : new_layer.layer_desc().key_range
393 : );
394 0 : debug_assert_eq!(
395 0 : old_layer.layer_desc().lsn_range,
396 0 : new_layer.layer_desc().lsn_range
397 : );
398 :
399 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
400 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
401 : // always marking rewritten layers as visible.
402 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
403 0 :
404 0 : // Safety: we may never rewrite the same file in-place. Callers are responsible
405 0 : // for ensuring that they only rewrite layers after something changes the path,
406 0 : // such as an increment in the generation number.
407 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
408 :
409 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
410 0 :
411 0 : Self::insert_historic_layer(
412 0 : new_layer.as_ref().clone(),
413 0 : &mut updates,
414 0 : &mut self.layer_fmgr,
415 0 : );
416 0 :
417 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
418 : }
419 304 : for l in drop_layers {
420 208 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
421 208 : }
422 204 : for l in add_layers {
423 108 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
424 108 : metrics.record_new_file_metrics(l.layer_desc().file_size);
425 108 : }
426 96 : updates.flush();
427 96 : }
428 :
429 : /// Called when garbage collect has selected the layers to be removed.
430 16 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
431 16 : let mut updates = self.layer_map.batch_update();
432 36 : for doomed_layer in gc_layers {
433 20 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
434 20 : }
435 16 : updates.flush()
436 16 : }
437 :
438 : #[cfg(test)]
439 304 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
440 304 : let mut updates = self.layer_map.batch_update();
441 304 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
442 304 : updates.flush()
443 304 : }
444 :
445 : /// Helper function to insert a layer into the layer map and file manager.
446 3484 : fn insert_historic_layer(
447 3484 : layer: Layer,
448 3484 : updates: &mut BatchedUpdates<'_>,
449 3484 : mapping: &mut LayerFileManager<Layer>,
450 3484 : ) {
451 3484 : updates.insert_historic(layer.layer_desc().clone());
452 3484 : mapping.insert(layer);
453 3484 : }
454 :
455 : /// Removes the layer from local FS (if present) and from memory.
456 : /// Remote storage is not affected by this operation.
457 1032 : fn delete_historic_layer(
458 1032 : // we cannot remove layers otherwise, since gc and compaction will race
459 1032 : layer: &Layer,
460 1032 : updates: &mut BatchedUpdates<'_>,
461 1032 : mapping: &mut LayerFileManager<Layer>,
462 1032 : ) {
463 1032 : let desc = layer.layer_desc();
464 1032 :
465 1032 : // TODO Removing from the bottom of the layer map is expensive.
466 1032 : // Maybe instead discard all layer map historic versions that
467 1032 : // won't be needed for page reconstruction for this timeline,
468 1032 : // and mark what we can't delete yet as deleted from the layer
469 1032 : // map index without actually rebuilding the index.
470 1032 : updates.remove_historic(desc);
471 1032 : mapping.remove(layer);
472 1032 : layer.delete_on_drop();
473 1032 : }
474 : }
475 :
476 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
477 :
478 : impl<T> Default for LayerFileManager<T> {
479 896 : fn default() -> Self {
480 896 : Self(HashMap::default())
481 896 : }
482 : }
483 :
484 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
485 3484 : pub(crate) fn insert(&mut self, layer: T) {
486 3484 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
487 3484 : if present.is_some() && cfg!(debug_assertions) {
488 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
489 3484 : }
490 3484 : }
491 :
492 1032 : pub(crate) fn remove(&mut self, layer: &T) {
493 1032 : let present = self.0.remove(&layer.layer_desc().key());
494 1032 : if present.is_none() && cfg!(debug_assertions) {
495 0 : panic!(
496 0 : "removing layer that is not present in layer mapping: {:?}",
497 0 : layer.layer_desc()
498 0 : )
499 1032 : }
500 1032 : }
501 : }
|