Line data Source code
1 : use std::collections::HashMap;
2 : use std::sync::Arc;
3 :
4 : use anyhow::{Context, bail, ensure};
5 : use itertools::Itertools;
6 : use pageserver_api::shard::TenantShardId;
7 : use tracing::trace;
8 : use utils::id::TimelineId;
9 : use utils::lsn::{AtomicLsn, Lsn};
10 :
11 : use super::TimelineWriterState;
12 : use crate::config::PageServerConf;
13 : use crate::context::RequestContext;
14 : use crate::metrics::TimelineMetrics;
15 : use crate::tenant::layer_map::{BatchedUpdates, LayerMap};
16 : use crate::tenant::storage_layer::{
17 : AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
18 : PersistentLayerKey, ResidentLayer,
19 : };
20 :
21 : /// Provides semantic APIs to manipulate the layer map.
22 : pub(crate) enum LayerManager {
23 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
24 : /// the layers.
25 : Open(OpenLayerManager),
26 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
27 : /// read-only.
28 : Closed {
29 : layers: HashMap<PersistentLayerKey, Layer>,
30 : },
31 : }
32 :
33 : impl Default for LayerManager {
34 896 : fn default() -> Self {
35 896 : LayerManager::Open(OpenLayerManager::default())
36 896 : }
37 : }
38 :
39 : impl LayerManager {
40 481955 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
41 481955 : // The assumption for the `expect()` is that all code maintains the following invariant:
42 481955 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
43 481955 : self.try_get_from_key(key)
44 481955 : .with_context(|| format!("get layer from key: {key}"))
45 481955 : .expect("not found")
46 481955 : .clone()
47 481955 : }
48 :
49 482047 : pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
50 482047 : self.layers().get(key)
51 482047 : }
52 :
53 481911 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
54 481911 : self.get_from_key(&desc.key())
55 481911 : }
56 :
57 : /// Get an immutable reference to the layer map.
58 : ///
59 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
60 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
61 2153839 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
62 : use LayerManager::*;
63 2153839 : match self {
64 2153839 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
65 0 : Closed { .. } => Err(Shutdown),
66 : }
67 2153839 : }
68 :
69 9856 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
70 : use LayerManager::*;
71 :
72 9856 : match self {
73 9856 : Open(open) => Ok(open),
74 0 : Closed { .. } => Err(Shutdown),
75 : }
76 9856 : }
77 :
78 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
79 : /// order to allow shutdown to complete.
80 : ///
81 : /// If there was a want to flush in-memory layers, it must have happened earlier.
82 20 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
83 : use LayerManager::*;
84 20 : match self {
85 : Open(OpenLayerManager {
86 20 : layer_map,
87 20 : layer_fmgr: LayerFileManager(hashmap),
88 20 : }) => {
89 20 : // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
90 20 : let open = layer_map.open_layer.take();
91 20 : let frozen = layer_map.frozen_layers.len();
92 20 : let taken_writer_state = writer_state.take();
93 20 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
94 20 : let layers = std::mem::take(hashmap);
95 20 : *self = Closed { layers };
96 20 : assert_eq!(open.is_some(), taken_writer_state.is_some());
97 : }
98 : Closed { .. } => {
99 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
100 : }
101 : }
102 20 : }
103 :
104 : /// Sum up the historic layer sizes
105 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
106 0 : self.layers()
107 0 : .values()
108 0 : .map(|l| l.layer_desc().file_size)
109 0 : .sum()
110 0 : }
111 :
112 80 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
113 224 : self.layers().values().filter(|l| l.is_likely_resident())
114 80 : }
115 :
116 0 : pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
117 0 : self.layers()
118 0 : .values()
119 0 : .filter(|l| l.visibility() == LayerVisibilityHint::Visible)
120 0 : }
121 :
122 616 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
123 616 : self.contains_key(&layer.layer_desc().key())
124 616 : }
125 :
126 820 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
127 820 : self.layers().contains_key(key)
128 820 : }
129 :
130 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
131 0 : self.layers().keys().cloned().collect_vec()
132 0 : }
133 :
134 482947 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
135 : use LayerManager::*;
136 482947 : match self {
137 482947 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
138 0 : Closed { layers } => layers,
139 : }
140 482947 : }
141 : }
142 :
143 : #[derive(Default)]
144 : pub(crate) struct OpenLayerManager {
145 : layer_map: LayerMap,
146 : layer_fmgr: LayerFileManager<Layer>,
147 : }
148 :
149 : impl std::fmt::Debug for OpenLayerManager {
150 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 0 : f.debug_struct("OpenLayerManager")
152 0 : .field("layer_count", &self.layer_fmgr.0.len())
153 0 : .finish()
154 0 : }
155 : }
156 :
157 : #[derive(Debug, thiserror::Error)]
158 : #[error("layer manager has been shutdown")]
159 : pub(crate) struct Shutdown;
160 :
161 : impl OpenLayerManager {
162 : /// Called from `load_layer_map`. Initialize the layer manager with:
163 : /// 1. all on-disk layers
164 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
165 12 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
166 12 : let mut updates = self.layer_map.batch_update();
167 44 : for layer in layers {
168 32 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
169 32 : }
170 12 : updates.flush();
171 12 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
172 12 : }
173 :
174 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
175 884 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
176 884 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
177 884 : }
178 :
179 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
180 : /// current open layer, called within `get_layer_for_write`.
181 2596 : pub(crate) async fn get_layer_for_write(
182 2596 : &mut self,
183 2596 : lsn: Lsn,
184 2596 : conf: &'static PageServerConf,
185 2596 : timeline_id: TimelineId,
186 2596 : tenant_shard_id: TenantShardId,
187 2596 : gate: &utils::sync::gate::Gate,
188 2596 : ctx: &RequestContext,
189 2596 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
190 2596 : ensure!(lsn.is_aligned());
191 :
192 : // Do we have a layer open for writing already?
193 2596 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
194 0 : if open_layer.get_lsn_range().start > lsn {
195 0 : bail!(
196 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
197 0 : open_layer.get_lsn_range().start,
198 0 : lsn
199 0 : );
200 0 : }
201 0 :
202 0 : Arc::clone(open_layer)
203 : } else {
204 : // No writeable layer yet. Create one.
205 2596 : let start_lsn = self
206 2596 : .layer_map
207 2596 : .next_open_layer_at
208 2596 : .context("No next open layer found")?;
209 :
210 2596 : trace!(
211 0 : "creating in-memory layer at {}/{} for record at {}",
212 : timeline_id, start_lsn, lsn
213 : );
214 :
215 2596 : let new_layer =
216 2596 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, gate, ctx)
217 2596 : .await?;
218 2596 : let layer = Arc::new(new_layer);
219 2596 :
220 2596 : self.layer_map.open_layer = Some(layer.clone());
221 2596 : self.layer_map.next_open_layer_at = None;
222 2596 :
223 2596 : layer
224 : };
225 :
226 2596 : Ok(layer)
227 2596 : }
228 :
229 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
230 : ///
231 : /// Returns true if anything was frozen.
232 2404 : pub(super) async fn try_freeze_in_memory_layer(
233 2404 : &mut self,
234 2404 : lsn: Lsn,
235 2404 : last_freeze_at: &AtomicLsn,
236 2404 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
237 2404 : metrics: &TimelineMetrics,
238 2404 : ) -> bool {
239 2404 : let Lsn(last_record_lsn) = lsn;
240 2404 : let end_lsn = Lsn(last_record_lsn + 1);
241 :
242 2404 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
243 2348 : let open_layer_rc = Arc::clone(open_layer);
244 2348 : open_layer.freeze(end_lsn).await;
245 :
246 : // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
247 : // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
248 : // reference to the timeline metrics. Other methods use a metrics borrow as well.
249 2348 : metrics.inc_frozen_layer(open_layer);
250 2348 :
251 2348 : // The layer is no longer open, update the layer map to reflect this.
252 2348 : // We will replace it with on-disk historics below.
253 2348 : self.layer_map.frozen_layers.push_back(open_layer_rc);
254 2348 : self.layer_map.open_layer = None;
255 2348 : self.layer_map.next_open_layer_at = Some(end_lsn);
256 2348 :
257 2348 : true
258 : } else {
259 56 : false
260 : };
261 :
262 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
263 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
264 2404 : last_freeze_at.store(end_lsn);
265 2404 :
266 2404 : // the writer state must no longer have a reference to the frozen layer
267 2404 : let taken = write_lock.take();
268 2404 : assert_eq!(
269 2404 : froze,
270 2404 : taken.is_some(),
271 0 : "should only had frozen a layer when TimelineWriterState existed"
272 : );
273 :
274 2404 : froze
275 2404 : }
276 :
277 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
278 1140 : pub(crate) fn track_new_image_layers(
279 1140 : &mut self,
280 1140 : image_layers: &[ResidentLayer],
281 1140 : metrics: &TimelineMetrics,
282 1140 : ) {
283 1140 : let mut updates = self.layer_map.batch_update();
284 1628 : for layer in image_layers {
285 488 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
286 488 :
287 488 : // record these here instead of Layer::finish_creating because otherwise partial
288 488 : // failure with create_image_layers would balloon up the physical size gauge. downside
289 488 : // is that all layers need to be created before metrics are updated.
290 488 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
291 488 : }
292 1140 : updates.flush();
293 1140 : }
294 :
295 : /// Flush a frozen layer and add the written delta layer to the layer map.
296 2348 : pub(crate) fn finish_flush_l0_layer(
297 2348 : &mut self,
298 2348 : delta_layer: Option<&ResidentLayer>,
299 2348 : frozen_layer_for_check: &Arc<InMemoryLayer>,
300 2348 : metrics: &TimelineMetrics,
301 2348 : ) {
302 2348 : let inmem = self
303 2348 : .layer_map
304 2348 : .frozen_layers
305 2348 : .pop_front()
306 2348 : .expect("there must be a inmem layer to flush");
307 2348 : metrics.dec_frozen_layer(&inmem);
308 2348 :
309 2348 : // Only one task may call this function at a time (for this
310 2348 : // timeline). If two tasks tried to flush the same frozen
311 2348 : // layer to disk at the same time, that would not work.
312 2348 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
313 :
314 2348 : if let Some(l) = delta_layer {
315 1936 : let mut updates = self.layer_map.batch_update();
316 1936 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
317 1936 : metrics.record_new_file_metrics(l.layer_desc().file_size);
318 1936 : updates.flush();
319 1936 : }
320 2348 : }
321 :
322 : /// Called when compaction is completed.
323 56 : pub(crate) fn finish_compact_l0(
324 56 : &mut self,
325 56 : compact_from: &[Layer],
326 56 : compact_to: &[ResidentLayer],
327 56 : metrics: &TimelineMetrics,
328 56 : ) {
329 56 : let mut updates = self.layer_map.batch_update();
330 672 : for l in compact_to {
331 616 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
332 616 : metrics.record_new_file_metrics(l.layer_desc().file_size);
333 616 : }
334 860 : for l in compact_from {
335 804 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
336 804 : }
337 56 : updates.flush();
338 56 : }
339 :
340 : /// Called when a GC-compaction is completed.
341 96 : pub(crate) fn finish_gc_compaction(
342 96 : &mut self,
343 96 : compact_from: &[Layer],
344 96 : compact_to: &[ResidentLayer],
345 96 : metrics: &TimelineMetrics,
346 96 : ) {
347 96 : // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
348 96 :
349 96 : // Match the old layers with the new layers
350 96 : let mut add_layers = HashMap::new();
351 96 : let mut rewrite_layers = HashMap::new();
352 96 : let mut drop_layers = HashMap::new();
353 304 : for layer in compact_from {
354 208 : drop_layers.insert(layer.layer_desc().key(), layer.clone());
355 208 : }
356 204 : for layer in compact_to {
357 108 : if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
358 0 : rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
359 108 : } else {
360 108 : add_layers.insert(layer.layer_desc().key(), layer.clone());
361 108 : }
362 : }
363 96 : let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
364 96 : let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
365 96 : let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
366 96 :
367 96 : self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
368 96 : }
369 :
370 : /// Called post-compaction when some previous generation image layers were trimmed.
371 0 : pub fn rewrite_layers(
372 0 : &mut self,
373 0 : rewrite_layers: &[(Layer, ResidentLayer)],
374 0 : drop_layers: &[Layer],
375 0 : metrics: &TimelineMetrics,
376 0 : ) {
377 0 : self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
378 0 : }
379 :
380 96 : fn rewrite_layers_inner(
381 96 : &mut self,
382 96 : rewrite_layers: &[(Layer, ResidentLayer)],
383 96 : drop_layers: &[Layer],
384 96 : add_layers: &[ResidentLayer],
385 96 : metrics: &TimelineMetrics,
386 96 : ) {
387 96 : let mut updates = self.layer_map.batch_update();
388 96 : for (old_layer, new_layer) in rewrite_layers {
389 0 : debug_assert_eq!(
390 0 : old_layer.layer_desc().key_range,
391 0 : new_layer.layer_desc().key_range
392 : );
393 0 : debug_assert_eq!(
394 0 : old_layer.layer_desc().lsn_range,
395 0 : new_layer.layer_desc().lsn_range
396 : );
397 :
398 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
399 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
400 : // always marking rewritten layers as visible.
401 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
402 0 :
403 0 : // Safety: we may never rewrite the same file in-place. Callers are responsible
404 0 : // for ensuring that they only rewrite layers after something changes the path,
405 0 : // such as an increment in the generation number.
406 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
407 :
408 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
409 0 :
410 0 : Self::insert_historic_layer(
411 0 : new_layer.as_ref().clone(),
412 0 : &mut updates,
413 0 : &mut self.layer_fmgr,
414 0 : );
415 0 :
416 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
417 : }
418 304 : for l in drop_layers {
419 208 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
420 208 : }
421 204 : for l in add_layers {
422 108 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
423 108 : metrics.record_new_file_metrics(l.layer_desc().file_size);
424 108 : }
425 96 : updates.flush();
426 96 : }
427 :
428 : /// Called when garbage collect has selected the layers to be removed.
429 16 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
430 16 : let mut updates = self.layer_map.batch_update();
431 36 : for doomed_layer in gc_layers {
432 20 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
433 20 : }
434 16 : updates.flush()
435 16 : }
436 :
437 : #[cfg(test)]
438 304 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
439 304 : let mut updates = self.layer_map.batch_update();
440 304 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
441 304 : updates.flush()
442 304 : }
443 :
444 : /// Helper function to insert a layer into the layer map and file manager.
445 3484 : fn insert_historic_layer(
446 3484 : layer: Layer,
447 3484 : updates: &mut BatchedUpdates<'_>,
448 3484 : mapping: &mut LayerFileManager<Layer>,
449 3484 : ) {
450 3484 : updates.insert_historic(layer.layer_desc().clone());
451 3484 : mapping.insert(layer);
452 3484 : }
453 :
454 : /// Removes the layer from local FS (if present) and from memory.
455 : /// Remote storage is not affected by this operation.
456 1032 : fn delete_historic_layer(
457 1032 : // we cannot remove layers otherwise, since gc and compaction will race
458 1032 : layer: &Layer,
459 1032 : updates: &mut BatchedUpdates<'_>,
460 1032 : mapping: &mut LayerFileManager<Layer>,
461 1032 : ) {
462 1032 : let desc = layer.layer_desc();
463 1032 :
464 1032 : // TODO Removing from the bottom of the layer map is expensive.
465 1032 : // Maybe instead discard all layer map historic versions that
466 1032 : // won't be needed for page reconstruction for this timeline,
467 1032 : // and mark what we can't delete yet as deleted from the layer
468 1032 : // map index without actually rebuilding the index.
469 1032 : updates.remove_historic(desc);
470 1032 : mapping.remove(layer);
471 1032 : layer.delete_on_drop();
472 1032 : }
473 : }
474 :
475 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
476 :
477 : impl<T> Default for LayerFileManager<T> {
478 896 : fn default() -> Self {
479 896 : Self(HashMap::default())
480 896 : }
481 : }
482 :
483 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
484 3484 : pub(crate) fn insert(&mut self, layer: T) {
485 3484 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
486 3484 : if present.is_some() && cfg!(debug_assertions) {
487 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
488 3484 : }
489 3484 : }
490 :
491 1032 : pub(crate) fn remove(&mut self, layer: &T) {
492 1032 : let present = self.0.remove(&layer.layer_desc().key());
493 1032 : if present.is_none() && cfg!(debug_assertions) {
494 0 : panic!(
495 0 : "removing layer that is not present in layer mapping: {:?}",
496 0 : layer.layer_desc()
497 0 : )
498 1032 : }
499 1032 : }
500 : }
|