Line data Source code
1 : use std::collections::HashMap;
2 : use std::sync::Arc;
3 :
4 : use anyhow::{Context, bail, ensure};
5 : use itertools::Itertools;
6 : use pageserver_api::shard::TenantShardId;
7 : use tokio_util::sync::CancellationToken;
8 : use tracing::trace;
9 : use utils::id::TimelineId;
10 : use utils::lsn::{AtomicLsn, Lsn};
11 :
12 : use super::{ReadableLayer, TimelineWriterState};
13 : use crate::config::PageServerConf;
14 : use crate::context::RequestContext;
15 : use crate::metrics::TimelineMetrics;
16 : use crate::tenant::layer_map::{BatchedUpdates, LayerMap};
17 : use crate::tenant::storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
19 : PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
20 : };
21 :
22 : /// Provides semantic APIs to manipulate the layer map.
23 : pub(crate) enum LayerManager {
24 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
25 : /// the layers.
26 : Open(OpenLayerManager),
27 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
28 : /// read-only.
29 : Closed {
30 : layers: HashMap<PersistentLayerKey, Layer>,
31 : },
32 : }
33 :
34 : impl Default for LayerManager {
35 904 : fn default() -> Self {
36 904 : LayerManager::Open(OpenLayerManager::default())
37 904 : }
38 : }
39 :
40 : impl LayerManager {
41 1694185 : pub(crate) fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
42 1694185 : match weak {
43 479950 : ReadableLayerWeak::PersistentLayer(desc) => {
44 479950 : ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
45 : }
46 1214235 : ReadableLayerWeak::InMemoryLayer(desc) => {
47 1214235 : let inmem = self
48 1214235 : .layer_map()
49 1214235 : .expect("no concurrent shutdown")
50 1214235 : .in_memory_layer(&desc);
51 1214235 : ReadableLayer::InMemoryLayer(inmem)
52 : }
53 : }
54 1694185 : }
55 :
56 481850 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
57 481850 : // The assumption for the `expect()` is that all code maintains the following invariant:
58 481850 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
59 481850 : self.try_get_from_key(key)
60 481850 : .with_context(|| format!("get layer from key: {key}"))
61 481850 : .expect("not found")
62 481850 : .clone()
63 481850 : }
64 :
65 481942 : pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
66 481942 : self.layers().get(key)
67 481942 : }
68 :
69 481806 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
70 481806 : self.get_from_key(&desc.key())
71 481806 : }
72 :
73 : /// Get an immutable reference to the layer map.
74 : ///
75 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
76 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
77 3368243 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
78 : use LayerManager::*;
79 3368243 : match self {
80 3368243 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
81 0 : Closed { .. } => Err(Shutdown),
82 : }
83 3368243 : }
84 :
85 9909 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
86 : use LayerManager::*;
87 :
88 9909 : match self {
89 9909 : Open(open) => Ok(open),
90 0 : Closed { .. } => Err(Shutdown),
91 : }
92 9909 : }
93 :
94 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
95 : /// order to allow shutdown to complete.
96 : ///
97 : /// If there was a want to flush in-memory layers, it must have happened earlier.
98 20 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
99 : use LayerManager::*;
100 20 : match self {
101 : Open(OpenLayerManager {
102 20 : layer_map,
103 20 : layer_fmgr: LayerFileManager(hashmap),
104 20 : }) => {
105 20 : // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
106 20 : let open = layer_map.open_layer.take();
107 20 : let frozen = layer_map.frozen_layers.len();
108 20 : let taken_writer_state = writer_state.take();
109 20 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
110 20 : let layers = std::mem::take(hashmap);
111 20 : *self = Closed { layers };
112 20 : assert_eq!(open.is_some(), taken_writer_state.is_some());
113 : }
114 : Closed { .. } => {
115 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
116 : }
117 : }
118 20 : }
119 :
120 : /// Sum up the historic layer sizes
121 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
122 0 : self.layers()
123 0 : .values()
124 0 : .map(|l| l.layer_desc().file_size)
125 0 : .sum()
126 0 : }
127 :
128 80 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
129 227 : self.layers().values().filter(|l| l.is_likely_resident())
130 80 : }
131 :
132 0 : pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
133 0 : self.layers()
134 0 : .values()
135 0 : .filter(|l| l.visibility() == LayerVisibilityHint::Visible)
136 0 : }
137 :
138 616 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
139 616 : self.contains_key(&layer.layer_desc().key())
140 616 : }
141 :
142 820 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
143 820 : self.layers().contains_key(key)
144 820 : }
145 :
146 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
147 0 : self.layers().keys().cloned().collect_vec()
148 0 : }
149 :
150 482842 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
151 : use LayerManager::*;
152 482842 : match self {
153 482842 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
154 0 : Closed { layers } => layers,
155 : }
156 482842 : }
157 : }
158 :
159 : #[derive(Default)]
160 : pub(crate) struct OpenLayerManager {
161 : layer_map: LayerMap,
162 : layer_fmgr: LayerFileManager<Layer>,
163 : }
164 :
165 : impl std::fmt::Debug for OpenLayerManager {
166 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 0 : f.debug_struct("OpenLayerManager")
168 0 : .field("layer_count", &self.layer_fmgr.0.len())
169 0 : .finish()
170 0 : }
171 : }
172 :
173 : #[derive(Debug, thiserror::Error)]
174 : #[error("layer manager has been shutdown")]
175 : pub(crate) struct Shutdown;
176 :
177 : impl OpenLayerManager {
178 : /// Called from `load_layer_map`. Initialize the layer manager with:
179 : /// 1. all on-disk layers
180 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
181 12 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
182 12 : let mut updates = self.layer_map.batch_update();
183 44 : for layer in layers {
184 32 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
185 32 : }
186 12 : updates.flush();
187 12 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
188 12 : }
189 :
190 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
191 892 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
192 892 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
193 892 : }
194 :
195 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
196 : /// current open layer, called within `get_layer_for_write`.
197 : #[allow(clippy::too_many_arguments)]
198 2604 : pub(crate) async fn get_layer_for_write(
199 2604 : &mut self,
200 2604 : lsn: Lsn,
201 2604 : conf: &'static PageServerConf,
202 2604 : timeline_id: TimelineId,
203 2604 : tenant_shard_id: TenantShardId,
204 2604 : gate: &utils::sync::gate::Gate,
205 2604 : cancel: &CancellationToken,
206 2604 : ctx: &RequestContext,
207 2604 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
208 2604 : ensure!(lsn.is_aligned());
209 :
210 : // Do we have a layer open for writing already?
211 2604 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
212 0 : if open_layer.get_lsn_range().start > lsn {
213 0 : bail!(
214 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
215 0 : open_layer.get_lsn_range().start,
216 0 : lsn
217 0 : );
218 0 : }
219 0 :
220 0 : Arc::clone(open_layer)
221 : } else {
222 : // No writeable layer yet. Create one.
223 2604 : let start_lsn = self
224 2604 : .layer_map
225 2604 : .next_open_layer_at
226 2604 : .context("No next open layer found")?;
227 :
228 2604 : trace!(
229 0 : "creating in-memory layer at {}/{} for record at {}",
230 : timeline_id, start_lsn, lsn
231 : );
232 :
233 2604 : let new_layer = InMemoryLayer::create(
234 2604 : conf,
235 2604 : timeline_id,
236 2604 : tenant_shard_id,
237 2604 : start_lsn,
238 2604 : gate,
239 2604 : cancel,
240 2604 : ctx,
241 2604 : )
242 2604 : .await?;
243 2604 : let layer = Arc::new(new_layer);
244 2604 :
245 2604 : self.layer_map.open_layer = Some(layer.clone());
246 2604 : self.layer_map.next_open_layer_at = None;
247 2604 :
248 2604 : layer
249 : };
250 :
251 2604 : Ok(layer)
252 2604 : }
253 :
254 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
255 : ///
256 : /// Returns true if anything was frozen.
257 2412 : pub(super) async fn try_freeze_in_memory_layer(
258 2412 : &mut self,
259 2412 : lsn: Lsn,
260 2412 : last_freeze_at: &AtomicLsn,
261 2412 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
262 2412 : metrics: &TimelineMetrics,
263 2412 : ) -> bool {
264 2412 : let Lsn(last_record_lsn) = lsn;
265 2412 : let end_lsn = Lsn(last_record_lsn + 1);
266 :
267 2412 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
268 2356 : let open_layer_rc = Arc::clone(open_layer);
269 2356 : open_layer.freeze(end_lsn).await;
270 :
271 : // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
272 : // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
273 : // reference to the timeline metrics. Other methods use a metrics borrow as well.
274 2356 : metrics.inc_frozen_layer(open_layer);
275 2356 :
276 2356 : // The layer is no longer open, update the layer map to reflect this.
277 2356 : // We will replace it with on-disk historics below.
278 2356 : self.layer_map.frozen_layers.push_back(open_layer_rc);
279 2356 : self.layer_map.open_layer = None;
280 2356 : self.layer_map.next_open_layer_at = Some(end_lsn);
281 2356 :
282 2356 : true
283 : } else {
284 56 : false
285 : };
286 :
287 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
288 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
289 2412 : last_freeze_at.store(end_lsn);
290 2412 :
291 2412 : // the writer state must no longer have a reference to the frozen layer
292 2412 : let taken = write_lock.take();
293 2412 : assert_eq!(
294 2412 : froze,
295 2412 : taken.is_some(),
296 0 : "should only had frozen a layer when TimelineWriterState existed"
297 : );
298 :
299 2412 : froze
300 2412 : }
301 :
302 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
303 1145 : pub(crate) fn track_new_image_layers(
304 1145 : &mut self,
305 1145 : image_layers: &[ResidentLayer],
306 1145 : metrics: &TimelineMetrics,
307 1145 : ) {
308 1145 : let mut updates = self.layer_map.batch_update();
309 1641 : for layer in image_layers {
310 496 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
311 496 :
312 496 : // record these here instead of Layer::finish_creating because otherwise partial
313 496 : // failure with create_image_layers would balloon up the physical size gauge. downside
314 496 : // is that all layers need to be created before metrics are updated.
315 496 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
316 496 : }
317 1145 : updates.flush();
318 1145 : }
319 :
320 : /// Flush a frozen layer and add the written delta layer to the layer map.
321 2356 : pub(crate) fn finish_flush_l0_layer(
322 2356 : &mut self,
323 2356 : delta_layer: Option<&ResidentLayer>,
324 2356 : frozen_layer_for_check: &Arc<InMemoryLayer>,
325 2356 : metrics: &TimelineMetrics,
326 2356 : ) {
327 2356 : let inmem = self
328 2356 : .layer_map
329 2356 : .frozen_layers
330 2356 : .pop_front()
331 2356 : .expect("there must be a inmem layer to flush");
332 2356 : metrics.dec_frozen_layer(&inmem);
333 2356 :
334 2356 : // Only one task may call this function at a time (for this
335 2356 : // timeline). If two tasks tried to flush the same frozen
336 2356 : // layer to disk at the same time, that would not work.
337 2356 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
338 :
339 2356 : if let Some(l) = delta_layer {
340 1936 : let mut updates = self.layer_map.batch_update();
341 1936 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
342 1936 : metrics.record_new_file_metrics(l.layer_desc().file_size);
343 1936 : updates.flush();
344 1936 : }
345 2356 : }
346 :
347 : /// Called when compaction is completed.
348 56 : pub(crate) fn finish_compact_l0(
349 56 : &mut self,
350 56 : compact_from: &[Layer],
351 56 : compact_to: &[ResidentLayer],
352 56 : metrics: &TimelineMetrics,
353 56 : ) {
354 56 : let mut updates = self.layer_map.batch_update();
355 672 : for l in compact_to {
356 616 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
357 616 : metrics.record_new_file_metrics(l.layer_desc().file_size);
358 616 : }
359 860 : for l in compact_from {
360 804 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
361 804 : }
362 56 : updates.flush();
363 56 : }
364 :
365 : /// Called when a GC-compaction is completed.
366 96 : pub(crate) fn finish_gc_compaction(
367 96 : &mut self,
368 96 : compact_from: &[Layer],
369 96 : compact_to: &[ResidentLayer],
370 96 : metrics: &TimelineMetrics,
371 96 : ) {
372 96 : // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
373 96 :
374 96 : // Match the old layers with the new layers
375 96 : let mut add_layers = HashMap::new();
376 96 : let mut rewrite_layers = HashMap::new();
377 96 : let mut drop_layers = HashMap::new();
378 304 : for layer in compact_from {
379 208 : drop_layers.insert(layer.layer_desc().key(), layer.clone());
380 208 : }
381 204 : for layer in compact_to {
382 108 : if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
383 0 : rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
384 108 : } else {
385 108 : add_layers.insert(layer.layer_desc().key(), layer.clone());
386 108 : }
387 : }
388 96 : let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
389 96 : let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
390 96 : let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
391 96 :
392 96 : self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
393 96 : }
394 :
395 : /// Called post-compaction when some previous generation image layers were trimmed.
396 0 : pub fn rewrite_layers(
397 0 : &mut self,
398 0 : rewrite_layers: &[(Layer, ResidentLayer)],
399 0 : drop_layers: &[Layer],
400 0 : metrics: &TimelineMetrics,
401 0 : ) {
402 0 : self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
403 0 : }
404 :
405 96 : fn rewrite_layers_inner(
406 96 : &mut self,
407 96 : rewrite_layers: &[(Layer, ResidentLayer)],
408 96 : drop_layers: &[Layer],
409 96 : add_layers: &[ResidentLayer],
410 96 : metrics: &TimelineMetrics,
411 96 : ) {
412 96 : let mut updates = self.layer_map.batch_update();
413 96 : for (old_layer, new_layer) in rewrite_layers {
414 0 : debug_assert_eq!(
415 0 : old_layer.layer_desc().key_range,
416 0 : new_layer.layer_desc().key_range
417 : );
418 0 : debug_assert_eq!(
419 0 : old_layer.layer_desc().lsn_range,
420 0 : new_layer.layer_desc().lsn_range
421 : );
422 :
423 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
424 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
425 : // always marking rewritten layers as visible.
426 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
427 0 :
428 0 : // Safety: we may never rewrite the same file in-place. Callers are responsible
429 0 : // for ensuring that they only rewrite layers after something changes the path,
430 0 : // such as an increment in the generation number.
431 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
432 :
433 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
434 0 :
435 0 : Self::insert_historic_layer(
436 0 : new_layer.as_ref().clone(),
437 0 : &mut updates,
438 0 : &mut self.layer_fmgr,
439 0 : );
440 0 :
441 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
442 : }
443 304 : for l in drop_layers {
444 208 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
445 208 : }
446 204 : for l in add_layers {
447 108 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
448 108 : metrics.record_new_file_metrics(l.layer_desc().file_size);
449 108 : }
450 96 : updates.flush();
451 96 : }
452 :
453 : /// Called when garbage collect has selected the layers to be removed.
454 16 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
455 16 : let mut updates = self.layer_map.batch_update();
456 36 : for doomed_layer in gc_layers {
457 20 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
458 20 : }
459 16 : updates.flush()
460 16 : }
461 :
462 : #[cfg(test)]
463 312 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
464 312 : let mut updates = self.layer_map.batch_update();
465 312 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
466 312 : updates.flush()
467 312 : }
468 :
469 : /// Helper function to insert a layer into the layer map and file manager.
470 3500 : fn insert_historic_layer(
471 3500 : layer: Layer,
472 3500 : updates: &mut BatchedUpdates<'_>,
473 3500 : mapping: &mut LayerFileManager<Layer>,
474 3500 : ) {
475 3500 : updates.insert_historic(layer.layer_desc().clone());
476 3500 : mapping.insert(layer);
477 3500 : }
478 :
479 : /// Removes the layer from local FS (if present) and from memory.
480 : /// Remote storage is not affected by this operation.
481 1032 : fn delete_historic_layer(
482 1032 : // we cannot remove layers otherwise, since gc and compaction will race
483 1032 : layer: &Layer,
484 1032 : updates: &mut BatchedUpdates<'_>,
485 1032 : mapping: &mut LayerFileManager<Layer>,
486 1032 : ) {
487 1032 : let desc = layer.layer_desc();
488 1032 :
489 1032 : // TODO Removing from the bottom of the layer map is expensive.
490 1032 : // Maybe instead discard all layer map historic versions that
491 1032 : // won't be needed for page reconstruction for this timeline,
492 1032 : // and mark what we can't delete yet as deleted from the layer
493 1032 : // map index without actually rebuilding the index.
494 1032 : updates.remove_historic(desc);
495 1032 : mapping.remove(layer);
496 1032 : layer.delete_on_drop();
497 1032 : }
498 :
499 : #[cfg(test)]
500 8 : pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc<InMemoryLayer>) {
501 : use pageserver_api::models::InMemoryLayerInfo;
502 :
503 8 : match layer.info() {
504 : InMemoryLayerInfo::Open { .. } => {
505 4 : assert!(self.layer_map.open_layer.is_none());
506 4 : self.layer_map.open_layer = Some(layer);
507 : }
508 4 : InMemoryLayerInfo::Frozen { lsn_start, .. } => {
509 4 : if let Some(last) = self.layer_map.frozen_layers.back() {
510 0 : assert!(last.get_lsn_range().end <= lsn_start);
511 4 : }
512 :
513 4 : self.layer_map.frozen_layers.push_back(layer);
514 : }
515 : }
516 8 : }
517 : }
518 :
519 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
520 :
521 : impl<T> Default for LayerFileManager<T> {
522 904 : fn default() -> Self {
523 904 : Self(HashMap::default())
524 904 : }
525 : }
526 :
527 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
528 3500 : pub(crate) fn insert(&mut self, layer: T) {
529 3500 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
530 3500 : if present.is_some() && cfg!(debug_assertions) {
531 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
532 3500 : }
533 3500 : }
534 :
535 1032 : pub(crate) fn remove(&mut self, layer: &T) {
536 1032 : let present = self.0.remove(&layer.layer_desc().key());
537 1032 : if present.is_none() && cfg!(debug_assertions) {
538 0 : panic!(
539 0 : "removing layer that is not present in layer mapping: {:?}",
540 0 : layer.layer_desc()
541 0 : )
542 1032 : }
543 1032 : }
544 : }
|