Line data Source code
1 : use std::collections::HashMap;
2 : use std::mem::ManuallyDrop;
3 : use std::ops::{Deref, DerefMut};
4 : use std::sync::Arc;
5 : use std::time::Duration;
6 :
7 : use anyhow::{Context, bail, ensure};
8 : use itertools::Itertools;
9 : use pageserver_api::keyspace::KeySpace;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::trace;
13 : use utils::id::TimelineId;
14 : use utils::lsn::{AtomicLsn, Lsn};
15 :
16 : use super::{LayerFringe, ReadableLayer, TimelineWriterState};
17 : use crate::config::PageServerConf;
18 : use crate::context::RequestContext;
19 : use crate::metrics::TimelineMetrics;
20 : use crate::tenant::layer_map::{BatchedUpdates, LayerMap, SearchResult};
21 : use crate::tenant::storage_layer::{
22 : AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
23 : PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
24 : };
25 :
26 : /// Warn if the lock was held for longer than this threshold.
27 : /// It's very generous and we should bring this value down over time.
28 : const LAYER_MANAGER_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(5);
29 : const LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD: Duration = Duration::from_secs(30);
30 :
31 : /// Describes the operation that is holding the layer manager lock
32 : #[derive(Debug, Clone, Copy, strum_macros::Display)]
33 : #[strum(serialize_all = "kebab_case")]
34 : pub(crate) enum LayerManagerLockHolder {
35 : GetLayerMapInfo,
36 : GenerateHeatmap,
37 : GetPage,
38 : Init,
39 : LoadLayerMap,
40 : GetLayerForWrite,
41 : TryFreezeLayer,
42 : FlushFrozenLayer,
43 : FlushLoop,
44 : Compaction,
45 : GarbageCollection,
46 : Shutdown,
47 : ImportPgData,
48 : DetachAncestor,
49 : Eviction,
50 : #[cfg(test)]
51 : Testing,
52 : }
53 :
54 : /// Wrapper for the layer manager that tracks the amount of time during which
55 : /// it was held under read or write lock
56 : #[derive(Default)]
57 : pub(crate) struct LockedLayerManager {
58 : locked: tokio::sync::RwLock<LayerManager>,
59 : }
60 :
61 : pub(crate) struct LayerManagerReadGuard<'a> {
62 : guard: ManuallyDrop<tokio::sync::RwLockReadGuard<'a, LayerManager>>,
63 : acquired_at: std::time::Instant,
64 : holder: LayerManagerLockHolder,
65 : }
66 :
67 : pub(crate) struct LayerManagerWriteGuard<'a> {
68 : guard: ManuallyDrop<tokio::sync::RwLockWriteGuard<'a, LayerManager>>,
69 : acquired_at: std::time::Instant,
70 : holder: LayerManagerLockHolder,
71 : }
72 :
73 : impl Drop for LayerManagerReadGuard<'_> {
74 555868 : fn drop(&mut self) {
75 : // Drop the lock first, before potentially warning if it was held for too long.
76 : // SAFETY: ManuallyDrop in Drop implementation
77 555868 : unsafe { ManuallyDrop::drop(&mut self.guard) };
78 :
79 555868 : let held_for = self.acquired_at.elapsed();
80 555868 : if held_for >= LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD {
81 0 : tracing::warn!(
82 : holder=%self.holder,
83 0 : "Layer manager read lock held for {}s",
84 0 : held_for.as_secs_f64(),
85 : );
86 555868 : }
87 555868 : }
88 : }
89 :
90 : impl Drop for LayerManagerWriteGuard<'_> {
91 2436 : fn drop(&mut self) {
92 : // Drop the lock first, before potentially warning if it was held for too long.
93 : // SAFETY: ManuallyDrop in Drop implementation
94 2436 : unsafe { ManuallyDrop::drop(&mut self.guard) };
95 :
96 2436 : let held_for = self.acquired_at.elapsed();
97 2436 : if held_for >= LAYER_MANAGER_LOCK_WARN_THRESHOLD {
98 0 : tracing::warn!(
99 : holder=%self.holder,
100 0 : "Layer manager write lock held for {}s",
101 0 : held_for.as_secs_f64(),
102 : );
103 2436 : }
104 2436 : }
105 : }
106 :
107 : impl Deref for LayerManagerReadGuard<'_> {
108 : type Target = LayerManager;
109 :
110 567295 : fn deref(&self) -> &Self::Target {
111 567295 : self.guard.deref()
112 567295 : }
113 : }
114 :
115 : impl Deref for LayerManagerWriteGuard<'_> {
116 : type Target = LayerManager;
117 :
118 168 : fn deref(&self) -> &Self::Target {
119 168 : self.guard.deref()
120 168 : }
121 : }
122 :
123 : impl DerefMut for LayerManagerWriteGuard<'_> {
124 2436 : fn deref_mut(&mut self) -> &mut Self::Target {
125 2436 : self.guard.deref_mut()
126 2436 : }
127 : }
128 :
129 : impl LockedLayerManager {
130 555868 : pub(crate) async fn read(&self, holder: LayerManagerLockHolder) -> LayerManagerReadGuard {
131 555868 : let guard = ManuallyDrop::new(self.locked.read().await);
132 555868 : LayerManagerReadGuard {
133 555868 : guard,
134 555868 : acquired_at: std::time::Instant::now(),
135 555868 : holder,
136 555868 : }
137 555868 : }
138 :
139 0 : pub(crate) fn try_read(
140 0 : &self,
141 0 : holder: LayerManagerLockHolder,
142 0 : ) -> Result<LayerManagerReadGuard, tokio::sync::TryLockError> {
143 0 : let guard = ManuallyDrop::new(self.locked.try_read()?);
144 :
145 0 : Ok(LayerManagerReadGuard {
146 0 : guard,
147 0 : acquired_at: std::time::Instant::now(),
148 0 : holder,
149 0 : })
150 0 : }
151 :
152 2205 : pub(crate) async fn write(&self, holder: LayerManagerLockHolder) -> LayerManagerWriteGuard {
153 2205 : let guard = ManuallyDrop::new(self.locked.write().await);
154 2205 : LayerManagerWriteGuard {
155 2205 : guard,
156 2205 : acquired_at: std::time::Instant::now(),
157 2205 : holder,
158 2205 : }
159 2205 : }
160 :
161 231 : pub(crate) fn try_write(
162 231 : &self,
163 231 : holder: LayerManagerLockHolder,
164 231 : ) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
165 231 : let guard = ManuallyDrop::new(self.locked.try_write()?);
166 :
167 231 : Ok(LayerManagerWriteGuard {
168 231 : guard,
169 231 : acquired_at: std::time::Instant::now(),
170 231 : holder,
171 231 : })
172 231 : }
173 : }
174 :
175 : /// Provides semantic APIs to manipulate the layer map.
176 : pub(crate) enum LayerManager {
177 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
178 : /// the layers.
179 : Open(OpenLayerManager),
180 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
181 : /// read-only.
182 : Closed {
183 : layers: HashMap<PersistentLayerKey, Layer>,
184 : },
185 : }
186 :
187 : impl Default for LayerManager {
188 234 : fn default() -> Self {
189 234 : LayerManager::Open(OpenLayerManager::default())
190 234 : }
191 : }
192 :
193 : impl LayerManager {
194 459961 : fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
195 459961 : match weak {
196 148916 : ReadableLayerWeak::PersistentLayer(desc) => {
197 148916 : ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
198 : }
199 311045 : ReadableLayerWeak::InMemoryLayer(desc) => {
200 311045 : let inmem = self
201 311045 : .layer_map()
202 311045 : .expect("no concurrent shutdown")
203 311045 : .in_memory_layer(&desc);
204 311045 : ReadableLayer::InMemoryLayer(inmem)
205 : }
206 : }
207 459961 : }
208 :
209 149403 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
210 : // The assumption for the `expect()` is that all code maintains the following invariant:
211 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
212 149403 : self.try_get_from_key(key)
213 149403 : .with_context(|| format!("get layer from key: {key}"))
214 149403 : .expect("not found")
215 149403 : .clone()
216 149403 : }
217 :
218 149430 : pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
219 149430 : self.layers().get(key)
220 149430 : }
221 :
222 149392 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
223 149392 : self.get_from_key(&desc.key())
224 149392 : }
225 :
226 : /// Get an immutable reference to the layer map.
227 : ///
228 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
229 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
230 877568 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
231 : use LayerManager::*;
232 877568 : match self {
233 877568 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
234 0 : Closed { .. } => Err(Shutdown),
235 : }
236 877568 : }
237 :
238 2431 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
239 : use LayerManager::*;
240 :
241 2431 : match self {
242 2431 : Open(open) => Ok(open),
243 0 : Closed { .. } => Err(Shutdown),
244 : }
245 2431 : }
246 :
247 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
248 : /// order to allow shutdown to complete.
249 : ///
250 : /// If there was a want to flush in-memory layers, it must have happened earlier.
251 5 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
252 : use LayerManager::*;
253 5 : match self {
254 : Open(OpenLayerManager {
255 5 : layer_map,
256 5 : layer_fmgr: LayerFileManager(hashmap),
257 : }) => {
258 : // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
259 5 : let open = layer_map.open_layer.take();
260 5 : let frozen = layer_map.frozen_layers.len();
261 5 : let taken_writer_state = writer_state.take();
262 5 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
263 5 : let layers = std::mem::take(hashmap);
264 5 : *self = Closed { layers };
265 5 : assert_eq!(open.is_some(), taken_writer_state.is_some());
266 : }
267 : Closed { .. } => {
268 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
269 : }
270 : }
271 5 : }
272 :
273 : /// Sum up the historic layer sizes
274 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
275 0 : self.layers()
276 0 : .values()
277 0 : .map(|l| l.layer_desc().file_size)
278 0 : .sum()
279 0 : }
280 :
281 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
282 57 : self.layers().values().filter(|l| l.is_likely_resident())
283 20 : }
284 :
285 0 : pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
286 0 : self.layers()
287 0 : .values()
288 0 : .filter(|l| l.visibility() == LayerVisibilityHint::Visible)
289 0 : }
290 :
291 163 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
292 163 : self.contains_key(&layer.layer_desc().key())
293 163 : }
294 :
295 214 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
296 214 : self.layers().contains_key(key)
297 214 : }
298 :
299 192 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
300 192 : self.layers().keys().cloned().collect_vec()
301 192 : }
302 :
303 : /// Update the [`LayerFringe`] of a read request
304 : ///
305 : /// Take a key space at a given LSN and query the layer map below each range
306 : /// of the key space to find the next layers to visit.
307 564848 : pub(crate) fn update_search_fringe(
308 564848 : &self,
309 564848 : keyspace: &KeySpace,
310 564848 : cont_lsn: Lsn,
311 564848 : fringe: &mut LayerFringe,
312 564848 : ) -> Result<(), Shutdown> {
313 564848 : let map = self.layer_map()?;
314 :
315 574540 : for range in keyspace.ranges.iter() {
316 574540 : let results = map.range_search(range.clone(), cont_lsn);
317 574540 : results
318 574540 : .found
319 574540 : .into_iter()
320 574540 : .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
321 459961 : (
322 459961 : self.upgrade(layer),
323 459961 : keyspace_accum.to_keyspace(),
324 459961 : lsn_floor..cont_lsn,
325 459961 : )
326 459961 : })
327 574540 : .for_each(|(layer, keyspace, lsn_range)| fringe.update(layer, keyspace, lsn_range));
328 : }
329 :
330 564848 : Ok(())
331 564848 : }
332 :
333 149856 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
334 : use LayerManager::*;
335 149856 : match self {
336 149856 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
337 0 : Closed { layers } => layers,
338 : }
339 149856 : }
340 : }
341 :
342 : #[derive(Default)]
343 : pub(crate) struct OpenLayerManager {
344 : layer_map: LayerMap,
345 : layer_fmgr: LayerFileManager<Layer>,
346 : }
347 :
348 : impl std::fmt::Debug for OpenLayerManager {
349 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 0 : f.debug_struct("OpenLayerManager")
351 0 : .field("layer_count", &self.layer_fmgr.0.len())
352 0 : .finish()
353 0 : }
354 : }
355 :
356 : #[derive(Debug, thiserror::Error)]
357 : #[error("layer manager has been shutdown")]
358 : pub(crate) struct Shutdown;
359 :
360 : impl OpenLayerManager {
361 : /// Called from `load_layer_map`. Initialize the layer manager with:
362 : /// 1. all on-disk layers
363 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
364 3 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
365 3 : let mut updates = self.layer_map.batch_update();
366 11 : for layer in layers {
367 8 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
368 8 : }
369 3 : updates.flush();
370 3 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
371 3 : }
372 :
373 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
374 231 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
375 231 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
376 231 : }
377 :
378 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
379 : /// current open layer, called within `get_layer_for_write`.
380 : #[allow(clippy::too_many_arguments)]
381 659 : pub(crate) async fn get_layer_for_write(
382 659 : &mut self,
383 659 : lsn: Lsn,
384 659 : conf: &'static PageServerConf,
385 659 : timeline_id: TimelineId,
386 659 : tenant_shard_id: TenantShardId,
387 659 : gate: &utils::sync::gate::Gate,
388 659 : cancel: &CancellationToken,
389 659 : ctx: &RequestContext,
390 659 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
391 659 : ensure!(lsn.is_aligned());
392 :
393 : // Do we have a layer open for writing already?
394 659 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
395 0 : if open_layer.get_lsn_range().start > lsn {
396 0 : bail!(
397 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
398 0 : open_layer.get_lsn_range().start,
399 : lsn
400 : );
401 0 : }
402 :
403 0 : Arc::clone(open_layer)
404 : } else {
405 : // No writeable layer yet. Create one.
406 659 : let start_lsn = self
407 659 : .layer_map
408 659 : .next_open_layer_at
409 659 : .context("No next open layer found")?;
410 :
411 659 : trace!(
412 0 : "creating in-memory layer at {}/{} for record at {}",
413 : timeline_id, start_lsn, lsn
414 : );
415 :
416 659 : let new_layer = InMemoryLayer::create(
417 659 : conf,
418 659 : timeline_id,
419 659 : tenant_shard_id,
420 659 : start_lsn,
421 659 : gate,
422 659 : cancel,
423 659 : ctx,
424 659 : )
425 659 : .await?;
426 659 : let layer = Arc::new(new_layer);
427 :
428 659 : self.layer_map.open_layer = Some(layer.clone());
429 659 : self.layer_map.next_open_layer_at = None;
430 :
431 659 : layer
432 : };
433 :
434 659 : Ok(layer)
435 659 : }
436 :
437 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
438 : ///
439 : /// Returns true if anything was frozen.
440 610 : pub(super) async fn try_freeze_in_memory_layer(
441 610 : &mut self,
442 610 : lsn: Lsn,
443 610 : last_freeze_at: &AtomicLsn,
444 610 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
445 610 : metrics: &TimelineMetrics,
446 610 : ) -> bool {
447 610 : let Lsn(last_record_lsn) = lsn;
448 610 : let end_lsn = Lsn(last_record_lsn + 1);
449 :
450 610 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
451 596 : let open_layer_rc = Arc::clone(open_layer);
452 596 : open_layer.freeze(end_lsn).await;
453 :
454 : // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
455 : // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
456 : // reference to the timeline metrics. Other methods use a metrics borrow as well.
457 596 : metrics.inc_frozen_layer(open_layer);
458 :
459 : // The layer is no longer open, update the layer map to reflect this.
460 : // We will replace it with on-disk historics below.
461 596 : self.layer_map.frozen_layers.push_back(open_layer_rc);
462 596 : self.layer_map.open_layer = None;
463 596 : self.layer_map.next_open_layer_at = Some(end_lsn);
464 :
465 596 : true
466 : } else {
467 14 : false
468 : };
469 :
470 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
471 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
472 610 : last_freeze_at.store(end_lsn);
473 :
474 : // the writer state must no longer have a reference to the frozen layer
475 610 : let taken = write_lock.take();
476 610 : assert_eq!(
477 : froze,
478 610 : taken.is_some(),
479 0 : "should only had frozen a layer when TimelineWriterState existed"
480 : );
481 :
482 610 : froze
483 610 : }
484 :
485 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
486 190 : pub(crate) fn track_new_image_layers(
487 190 : &mut self,
488 190 : image_layers: &[ResidentLayer],
489 190 : metrics: &TimelineMetrics,
490 190 : ) {
491 190 : let mut updates = self.layer_map.batch_update();
492 319 : for layer in image_layers {
493 129 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
494 129 :
495 129 : // record these here instead of Layer::finish_creating because otherwise partial
496 129 : // failure with create_image_layers would balloon up the physical size gauge. downside
497 129 : // is that all layers need to be created before metrics are updated.
498 129 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
499 129 : }
500 190 : updates.flush();
501 190 : }
502 :
503 : /// Flush a frozen layer and add the written delta layer to the layer map.
504 595 : pub(crate) fn finish_flush_l0_layer(
505 595 : &mut self,
506 595 : delta_layer: Option<&ResidentLayer>,
507 595 : frozen_layer_for_check: &Arc<InMemoryLayer>,
508 595 : metrics: &TimelineMetrics,
509 595 : ) {
510 595 : let inmem = self
511 595 : .layer_map
512 595 : .frozen_layers
513 595 : .pop_front()
514 595 : .expect("there must be a inmem layer to flush");
515 595 : metrics.dec_frozen_layer(&inmem);
516 :
517 : // Only one task may call this function at a time (for this
518 : // timeline). If two tasks tried to flush the same frozen
519 : // layer to disk at the same time, that would not work.
520 595 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
521 :
522 595 : if let Some(l) = delta_layer {
523 485 : let mut updates = self.layer_map.batch_update();
524 485 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
525 485 : metrics.record_new_file_metrics(l.layer_desc().file_size);
526 485 : updates.flush();
527 485 : }
528 595 : }
529 :
530 : /// Called when compaction is completed.
531 23 : pub(crate) fn finish_compact_l0(
532 23 : &mut self,
533 23 : compact_from: &[Layer],
534 23 : compact_to: &[ResidentLayer],
535 23 : metrics: &TimelineMetrics,
536 23 : ) {
537 23 : let mut updates = self.layer_map.batch_update();
538 186 : for l in compact_to {
539 163 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
540 163 : metrics.record_new_file_metrics(l.layer_desc().file_size);
541 163 : }
542 224 : for l in compact_from {
543 201 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
544 201 : }
545 23 : updates.flush();
546 23 : }
547 :
548 : /// Called when a GC-compaction is completed.
549 24 : pub(crate) fn finish_gc_compaction(
550 24 : &mut self,
551 24 : compact_from: &[Layer],
552 24 : compact_to: &[ResidentLayer],
553 24 : metrics: &TimelineMetrics,
554 24 : ) {
555 : // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
556 :
557 : // Match the old layers with the new layers
558 24 : let mut add_layers = HashMap::new();
559 24 : let mut rewrite_layers = HashMap::new();
560 24 : let mut drop_layers = HashMap::new();
561 76 : for layer in compact_from {
562 52 : drop_layers.insert(layer.layer_desc().key(), layer.clone());
563 52 : }
564 51 : for layer in compact_to {
565 27 : if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
566 0 : rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
567 27 : } else {
568 27 : add_layers.insert(layer.layer_desc().key(), layer.clone());
569 27 : }
570 : }
571 24 : let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
572 24 : let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
573 24 : let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
574 :
575 24 : self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
576 24 : }
577 :
578 : /// Called post-compaction when some previous generation image layers were trimmed.
579 0 : pub fn rewrite_layers(
580 0 : &mut self,
581 0 : rewrite_layers: &[(Layer, ResidentLayer)],
582 0 : drop_layers: &[Layer],
583 0 : metrics: &TimelineMetrics,
584 0 : ) {
585 0 : self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
586 0 : }
587 :
588 24 : fn rewrite_layers_inner(
589 24 : &mut self,
590 24 : rewrite_layers: &[(Layer, ResidentLayer)],
591 24 : drop_layers: &[Layer],
592 24 : add_layers: &[ResidentLayer],
593 24 : metrics: &TimelineMetrics,
594 24 : ) {
595 24 : let mut updates = self.layer_map.batch_update();
596 24 : for (old_layer, new_layer) in rewrite_layers {
597 0 : debug_assert_eq!(
598 0 : old_layer.layer_desc().key_range,
599 0 : new_layer.layer_desc().key_range
600 : );
601 0 : debug_assert_eq!(
602 0 : old_layer.layer_desc().lsn_range,
603 0 : new_layer.layer_desc().lsn_range
604 : );
605 :
606 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
607 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
608 : // always marking rewritten layers as visible.
609 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
610 :
611 : // Safety: we may never rewrite the same file in-place. Callers are responsible
612 : // for ensuring that they only rewrite layers after something changes the path,
613 : // such as an increment in the generation number.
614 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
615 :
616 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
617 :
618 0 : Self::insert_historic_layer(
619 0 : new_layer.as_ref().clone(),
620 0 : &mut updates,
621 0 : &mut self.layer_fmgr,
622 : );
623 :
624 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
625 : }
626 76 : for l in drop_layers {
627 52 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
628 52 : }
629 51 : for l in add_layers {
630 27 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
631 27 : metrics.record_new_file_metrics(l.layer_desc().file_size);
632 27 : }
633 24 : updates.flush();
634 24 : }
635 :
636 : /// Called when garbage collect has selected the layers to be removed.
637 6 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
638 6 : let mut updates = self.layer_map.batch_update();
639 14 : for doomed_layer in gc_layers {
640 8 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
641 8 : }
642 6 : updates.flush()
643 6 : }
644 :
645 : #[cfg(test)]
646 86 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
647 86 : let mut updates = self.layer_map.batch_update();
648 86 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
649 86 : updates.flush()
650 86 : }
651 :
652 : /// Helper function to insert a layer into the layer map and file manager.
653 898 : fn insert_historic_layer(
654 898 : layer: Layer,
655 898 : updates: &mut BatchedUpdates<'_>,
656 898 : mapping: &mut LayerFileManager<Layer>,
657 898 : ) {
658 898 : updates.insert_historic(layer.layer_desc().clone());
659 898 : mapping.insert(layer);
660 898 : }
661 :
662 : /// Removes the layer from local FS (if present) and from memory.
663 : /// Remote storage is not affected by this operation.
664 261 : fn delete_historic_layer(
665 261 : // we cannot remove layers otherwise, since gc and compaction will race
666 261 : layer: &Layer,
667 261 : updates: &mut BatchedUpdates<'_>,
668 261 : mapping: &mut LayerFileManager<Layer>,
669 261 : ) {
670 261 : let desc = layer.layer_desc();
671 :
672 : // TODO Removing from the bottom of the layer map is expensive.
673 : // Maybe instead discard all layer map historic versions that
674 : // won't be needed for page reconstruction for this timeline,
675 : // and mark what we can't delete yet as deleted from the layer
676 : // map index without actually rebuilding the index.
677 261 : updates.remove_historic(desc);
678 261 : mapping.remove(layer);
679 261 : layer.delete_on_drop();
680 261 : }
681 :
682 : #[cfg(test)]
683 4 : pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc<InMemoryLayer>) {
684 : use pageserver_api::models::InMemoryLayerInfo;
685 :
686 4 : match layer.info() {
687 : InMemoryLayerInfo::Open { .. } => {
688 1 : assert!(self.layer_map.open_layer.is_none());
689 1 : self.layer_map.open_layer = Some(layer);
690 : }
691 3 : InMemoryLayerInfo::Frozen { lsn_start, .. } => {
692 3 : if let Some(last) = self.layer_map.frozen_layers.back() {
693 1 : assert!(last.get_lsn_range().end <= lsn_start);
694 2 : }
695 :
696 3 : self.layer_map.frozen_layers.push_back(layer);
697 : }
698 : }
699 4 : }
700 : }
701 :
702 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
703 :
704 : impl<T> Default for LayerFileManager<T> {
705 234 : fn default() -> Self {
706 234 : Self(HashMap::default())
707 234 : }
708 : }
709 :
710 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
711 898 : pub(crate) fn insert(&mut self, layer: T) {
712 898 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
713 898 : if present.is_some() && cfg!(debug_assertions) {
714 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
715 898 : }
716 898 : }
717 :
718 261 : pub(crate) fn remove(&mut self, layer: &T) {
719 261 : let present = self.0.remove(&layer.layer_desc().key());
720 261 : if present.is_none() && cfg!(debug_assertions) {
721 0 : panic!(
722 0 : "removing layer that is not present in layer mapping: {:?}",
723 0 : layer.layer_desc()
724 : )
725 261 : }
726 261 : }
727 : }
|