Line data Source code
1 : use std::collections::HashMap;
2 : use std::mem::ManuallyDrop;
3 : use std::ops::{Deref, DerefMut};
4 : use std::sync::Arc;
5 : use std::time::Duration;
6 :
7 : use anyhow::{Context, bail, ensure};
8 : use itertools::Itertools;
9 : use pageserver_api::keyspace::KeySpace;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::trace;
13 : use utils::id::TimelineId;
14 : use utils::lsn::{AtomicLsn, Lsn};
15 :
16 : use super::{LayerFringe, ReadableLayer, TimelineWriterState};
17 : use crate::config::PageServerConf;
18 : use crate::context::RequestContext;
19 : use crate::metrics::TimelineMetrics;
20 : use crate::tenant::layer_map::{BatchedUpdates, LayerMap, SearchResult};
21 : use crate::tenant::storage_layer::{
22 : AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
23 : PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
24 : };
25 :
26 : /// Warn if the lock was held for longer than this threshold.
27 : /// It's very generous and we should bring this value down over time.
28 : const LAYER_MANAGER_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(5);
29 : const LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD: Duration = Duration::from_secs(30);
30 :
31 : /// Describes the operation that is holding the layer manager lock
32 : #[derive(Debug, Clone, Copy, strum_macros::Display)]
33 : #[strum(serialize_all = "kebab_case")]
34 : pub(crate) enum LayerManagerLockHolder {
35 : GetLayerMapInfo,
36 : GenerateHeatmap,
37 : GetPage,
38 : Init,
39 : LoadLayerMap,
40 : GetLayerForWrite,
41 : TryFreezeLayer,
42 : FlushFrozenLayer,
43 : FlushLoop,
44 : Compaction,
45 : GarbageCollection,
46 : Shutdown,
47 : ImportPgData,
48 : DetachAncestor,
49 : Eviction,
50 : ComputeImageConsistentLsn,
51 : #[cfg(test)]
52 : Testing,
53 : }
54 :
55 : /// Wrapper for the layer manager that tracks the amount of time during which
56 : /// it was held under read or write lock
57 : #[derive(Default)]
58 : pub(crate) struct LockedLayerManager {
59 : locked: tokio::sync::RwLock<LayerManager>,
60 : }
61 :
62 : pub(crate) struct LayerManagerReadGuard<'a> {
63 : guard: ManuallyDrop<tokio::sync::RwLockReadGuard<'a, LayerManager>>,
64 : acquired_at: std::time::Instant,
65 : holder: LayerManagerLockHolder,
66 : }
67 :
68 : pub(crate) struct LayerManagerWriteGuard<'a> {
69 : guard: ManuallyDrop<tokio::sync::RwLockWriteGuard<'a, LayerManager>>,
70 : acquired_at: std::time::Instant,
71 : holder: LayerManagerLockHolder,
72 : }
73 :
74 : impl Drop for LayerManagerReadGuard<'_> {
75 556861 : fn drop(&mut self) {
76 : // Drop the lock first, before potentially warning if it was held for too long.
77 : // SAFETY: ManuallyDrop in Drop implementation
78 556861 : unsafe { ManuallyDrop::drop(&mut self.guard) };
79 :
80 556861 : let held_for = self.acquired_at.elapsed();
81 556861 : if held_for >= LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD {
82 0 : tracing::warn!(
83 : holder=%self.holder,
84 0 : "Layer manager read lock held for {}s",
85 0 : held_for.as_secs_f64(),
86 : );
87 556861 : }
88 556861 : }
89 : }
90 :
91 : impl Drop for LayerManagerWriteGuard<'_> {
92 2441 : fn drop(&mut self) {
93 : // Drop the lock first, before potentially warning if it was held for too long.
94 : // SAFETY: ManuallyDrop in Drop implementation
95 2441 : unsafe { ManuallyDrop::drop(&mut self.guard) };
96 :
97 2441 : let held_for = self.acquired_at.elapsed();
98 2441 : if held_for >= LAYER_MANAGER_LOCK_WARN_THRESHOLD {
99 0 : tracing::warn!(
100 : holder=%self.holder,
101 0 : "Layer manager write lock held for {}s",
102 0 : held_for.as_secs_f64(),
103 : );
104 2441 : }
105 2441 : }
106 : }
107 :
108 : impl Deref for LayerManagerReadGuard<'_> {
109 : type Target = LayerManager;
110 :
111 568289 : fn deref(&self) -> &Self::Target {
112 568289 : self.guard.deref()
113 568289 : }
114 : }
115 :
116 : impl Deref for LayerManagerWriteGuard<'_> {
117 : type Target = LayerManager;
118 :
119 168 : fn deref(&self) -> &Self::Target {
120 168 : self.guard.deref()
121 168 : }
122 : }
123 :
124 : impl DerefMut for LayerManagerWriteGuard<'_> {
125 2441 : fn deref_mut(&mut self) -> &mut Self::Target {
126 2441 : self.guard.deref_mut()
127 2441 : }
128 : }
129 :
130 : impl LockedLayerManager {
131 556861 : pub(crate) async fn read(&self, holder: LayerManagerLockHolder) -> LayerManagerReadGuard {
132 556861 : let guard = ManuallyDrop::new(self.locked.read().await);
133 556861 : LayerManagerReadGuard {
134 556861 : guard,
135 556861 : acquired_at: std::time::Instant::now(),
136 556861 : holder,
137 556861 : }
138 556861 : }
139 :
140 0 : pub(crate) fn try_read(
141 0 : &self,
142 0 : holder: LayerManagerLockHolder,
143 0 : ) -> Result<LayerManagerReadGuard, tokio::sync::TryLockError> {
144 0 : let guard = ManuallyDrop::new(self.locked.try_read()?);
145 :
146 0 : Ok(LayerManagerReadGuard {
147 0 : guard,
148 0 : acquired_at: std::time::Instant::now(),
149 0 : holder,
150 0 : })
151 0 : }
152 :
153 2209 : pub(crate) async fn write(&self, holder: LayerManagerLockHolder) -> LayerManagerWriteGuard {
154 2209 : let guard = ManuallyDrop::new(self.locked.write().await);
155 2209 : LayerManagerWriteGuard {
156 2209 : guard,
157 2209 : acquired_at: std::time::Instant::now(),
158 2209 : holder,
159 2209 : }
160 2209 : }
161 :
162 232 : pub(crate) fn try_write(
163 232 : &self,
164 232 : holder: LayerManagerLockHolder,
165 232 : ) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
166 232 : let guard = ManuallyDrop::new(self.locked.try_write()?);
167 :
168 232 : Ok(LayerManagerWriteGuard {
169 232 : guard,
170 232 : acquired_at: std::time::Instant::now(),
171 232 : holder,
172 232 : })
173 232 : }
174 : }
175 :
176 : /// Provides semantic APIs to manipulate the layer map.
177 : pub(crate) enum LayerManager {
178 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
179 : /// the layers.
180 : Open(OpenLayerManager),
181 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
182 : /// read-only.
183 : Closed {
184 : layers: HashMap<PersistentLayerKey, Layer>,
185 : },
186 : }
187 :
188 : impl Default for LayerManager {
189 235 : fn default() -> Self {
190 235 : LayerManager::Open(OpenLayerManager::default())
191 235 : }
192 : }
193 :
194 : impl LayerManager {
195 460553 : fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
196 460553 : match weak {
197 149392 : ReadableLayerWeak::PersistentLayer(desc) => {
198 149392 : ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
199 : }
200 311161 : ReadableLayerWeak::InMemoryLayer(desc) => {
201 311161 : let inmem = self
202 311161 : .layer_map()
203 311161 : .expect("no concurrent shutdown")
204 311161 : .in_memory_layer(&desc);
205 311161 : ReadableLayer::InMemoryLayer(inmem)
206 : }
207 : }
208 460553 : }
209 :
210 149880 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
211 : // The assumption for the `expect()` is that all code maintains the following invariant:
212 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
213 149880 : self.try_get_from_key(key)
214 149880 : .with_context(|| format!("get layer from key: {key}"))
215 149880 : .expect("not found")
216 149880 : .clone()
217 149880 : }
218 :
219 149907 : pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
220 149907 : self.layers().get(key)
221 149907 : }
222 :
223 149869 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
224 149869 : self.get_from_key(&desc.key())
225 149869 : }
226 :
227 : /// Get an immutable reference to the layer map.
228 : ///
229 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
230 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
231 878677 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
232 : use LayerManager::*;
233 878677 : match self {
234 878677 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
235 0 : Closed { .. } => Err(Shutdown),
236 : }
237 878677 : }
238 :
239 2436 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
240 : use LayerManager::*;
241 :
242 2436 : match self {
243 2436 : Open(open) => Ok(open),
244 0 : Closed { .. } => Err(Shutdown),
245 : }
246 2436 : }
247 :
248 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
249 : /// order to allow shutdown to complete.
250 : ///
251 : /// If there was a want to flush in-memory layers, it must have happened earlier.
252 5 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
253 : use LayerManager::*;
254 5 : match self {
255 : Open(OpenLayerManager {
256 5 : layer_map,
257 5 : layer_fmgr: LayerFileManager(hashmap),
258 : }) => {
259 : // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
260 5 : let open = layer_map.open_layer.take();
261 5 : let frozen = layer_map.frozen_layers.len();
262 5 : let taken_writer_state = writer_state.take();
263 5 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
264 5 : let layers = std::mem::take(hashmap);
265 5 : *self = Closed { layers };
266 5 : assert_eq!(open.is_some(), taken_writer_state.is_some());
267 : }
268 : Closed { .. } => {
269 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
270 : }
271 : }
272 5 : }
273 :
274 : /// Sum up the historic layer sizes
275 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
276 0 : self.layers()
277 0 : .values()
278 0 : .map(|l| l.layer_desc().file_size)
279 0 : .sum()
280 0 : }
281 :
282 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
283 56 : self.layers().values().filter(|l| l.is_likely_resident())
284 20 : }
285 :
286 0 : pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
287 0 : self.layers()
288 0 : .values()
289 0 : .filter(|l| l.visibility() == LayerVisibilityHint::Visible)
290 0 : }
291 :
292 163 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
293 163 : self.contains_key(&layer.layer_desc().key())
294 163 : }
295 :
296 214 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
297 214 : self.layers().contains_key(key)
298 214 : }
299 :
300 192 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
301 192 : self.layers().keys().cloned().collect_vec()
302 192 : }
303 :
304 : /// Update the [`LayerFringe`] of a read request
305 : ///
306 : /// Take a key space at a given LSN and query the layer map below each range
307 : /// of the key space to find the next layers to visit.
308 565789 : pub(crate) fn update_search_fringe(
309 565789 : &self,
310 565789 : keyspace: &KeySpace,
311 565789 : cont_lsn: Lsn,
312 565789 : fringe: &mut LayerFringe,
313 565789 : ) -> Result<(), Shutdown> {
314 565789 : let map = self.layer_map()?;
315 :
316 575634 : for range in keyspace.ranges.iter() {
317 575634 : let results = map.range_search(range.clone(), cont_lsn);
318 575634 : results
319 575634 : .found
320 575634 : .into_iter()
321 575634 : .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
322 460553 : (
323 460553 : self.upgrade(layer),
324 460553 : keyspace_accum.to_keyspace(),
325 460553 : lsn_floor..cont_lsn,
326 460553 : )
327 460553 : })
328 575634 : .for_each(|(layer, keyspace, lsn_range)| fringe.update(layer, keyspace, lsn_range));
329 : }
330 :
331 565789 : Ok(())
332 565789 : }
333 :
334 150333 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
335 : use LayerManager::*;
336 150333 : match self {
337 150333 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
338 0 : Closed { layers } => layers,
339 : }
340 150333 : }
341 : }
342 :
343 : #[derive(Default)]
344 : pub(crate) struct OpenLayerManager {
345 : layer_map: LayerMap,
346 : layer_fmgr: LayerFileManager<Layer>,
347 : }
348 :
349 : impl std::fmt::Debug for OpenLayerManager {
350 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 0 : f.debug_struct("OpenLayerManager")
352 0 : .field("layer_count", &self.layer_fmgr.0.len())
353 0 : .finish()
354 0 : }
355 : }
356 :
357 : #[derive(Debug, thiserror::Error)]
358 : #[error("layer manager has been shutdown")]
359 : pub(crate) struct Shutdown;
360 :
361 : impl OpenLayerManager {
362 : /// Called from `load_layer_map`. Initialize the layer manager with:
363 : /// 1. all on-disk layers
364 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
365 3 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
366 3 : let mut updates = self.layer_map.batch_update();
367 11 : for layer in layers {
368 8 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
369 8 : }
370 3 : updates.flush();
371 3 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
372 3 : }
373 :
374 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
375 232 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
376 232 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
377 232 : }
378 :
379 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
380 : /// current open layer, called within `get_layer_for_write`.
381 : #[allow(clippy::too_many_arguments)]
382 660 : pub(crate) async fn get_layer_for_write(
383 660 : &mut self,
384 660 : lsn: Lsn,
385 660 : conf: &'static PageServerConf,
386 660 : timeline_id: TimelineId,
387 660 : tenant_shard_id: TenantShardId,
388 660 : gate: &utils::sync::gate::Gate,
389 660 : cancel: &CancellationToken,
390 660 : ctx: &RequestContext,
391 660 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
392 660 : ensure!(lsn.is_aligned());
393 :
394 : // Do we have a layer open for writing already?
395 660 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
396 0 : if open_layer.get_lsn_range().start > lsn {
397 0 : bail!(
398 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
399 0 : open_layer.get_lsn_range().start,
400 : lsn
401 : );
402 0 : }
403 :
404 0 : Arc::clone(open_layer)
405 : } else {
406 : // No writeable layer yet. Create one.
407 660 : let start_lsn = self
408 660 : .layer_map
409 660 : .next_open_layer_at
410 660 : .context("No next open layer found")?;
411 :
412 660 : trace!(
413 0 : "creating in-memory layer at {}/{} for record at {}",
414 : timeline_id, start_lsn, lsn
415 : );
416 :
417 660 : let new_layer = InMemoryLayer::create(
418 660 : conf,
419 660 : timeline_id,
420 660 : tenant_shard_id,
421 660 : start_lsn,
422 660 : gate,
423 660 : cancel,
424 660 : ctx,
425 660 : )
426 660 : .await?;
427 660 : let layer = Arc::new(new_layer);
428 :
429 660 : self.layer_map.open_layer = Some(layer.clone());
430 660 : self.layer_map.next_open_layer_at = None;
431 :
432 660 : layer
433 : };
434 :
435 660 : Ok(layer)
436 660 : }
437 :
438 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
439 : ///
440 : /// Returns true if anything was frozen.
441 611 : pub(super) async fn try_freeze_in_memory_layer(
442 611 : &mut self,
443 611 : lsn: Lsn,
444 611 : last_freeze_at: &AtomicLsn,
445 611 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
446 611 : metrics: &TimelineMetrics,
447 611 : ) -> bool {
448 611 : let Lsn(last_record_lsn) = lsn;
449 611 : let end_lsn = Lsn(last_record_lsn + 1);
450 :
451 611 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
452 597 : let open_layer_rc = Arc::clone(open_layer);
453 597 : open_layer.freeze(end_lsn).await;
454 :
455 : // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
456 : // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
457 : // reference to the timeline metrics. Other methods use a metrics borrow as well.
458 597 : metrics.inc_frozen_layer(open_layer);
459 :
460 : // The layer is no longer open, update the layer map to reflect this.
461 : // We will replace it with on-disk historics below.
462 597 : self.layer_map.frozen_layers.push_back(open_layer_rc);
463 597 : self.layer_map.open_layer = None;
464 597 : self.layer_map.next_open_layer_at = Some(end_lsn);
465 :
466 597 : true
467 : } else {
468 14 : false
469 : };
470 :
471 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
472 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
473 611 : last_freeze_at.store(end_lsn);
474 :
475 : // the writer state must no longer have a reference to the frozen layer
476 611 : let taken = write_lock.take();
477 611 : assert_eq!(
478 : froze,
479 611 : taken.is_some(),
480 0 : "should only had frozen a layer when TimelineWriterState existed"
481 : );
482 :
483 611 : froze
484 611 : }
485 :
486 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
487 191 : pub(crate) fn track_new_image_layers(
488 191 : &mut self,
489 191 : image_layers: &[ResidentLayer],
490 191 : metrics: &TimelineMetrics,
491 191 : ) {
492 191 : let mut updates = self.layer_map.batch_update();
493 321 : for layer in image_layers {
494 130 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
495 130 :
496 130 : // record these here instead of Layer::finish_creating because otherwise partial
497 130 : // failure with create_image_layers would balloon up the physical size gauge. downside
498 130 : // is that all layers need to be created before metrics are updated.
499 130 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
500 130 : }
501 191 : updates.flush();
502 191 : }
503 :
504 : /// Flush a frozen layer and add the written delta layer to the layer map.
505 596 : pub(crate) fn finish_flush_l0_layer(
506 596 : &mut self,
507 596 : delta_layer: Option<&ResidentLayer>,
508 596 : frozen_layer_for_check: &Arc<InMemoryLayer>,
509 596 : metrics: &TimelineMetrics,
510 596 : ) {
511 596 : let inmem = self
512 596 : .layer_map
513 596 : .frozen_layers
514 596 : .pop_front()
515 596 : .expect("there must be a inmem layer to flush");
516 596 : metrics.dec_frozen_layer(&inmem);
517 :
518 : // Only one task may call this function at a time (for this
519 : // timeline). If two tasks tried to flush the same frozen
520 : // layer to disk at the same time, that would not work.
521 596 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
522 :
523 596 : if let Some(l) = delta_layer {
524 485 : let mut updates = self.layer_map.batch_update();
525 485 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
526 485 : metrics.record_new_file_metrics(l.layer_desc().file_size);
527 485 : updates.flush();
528 485 : }
529 596 : }
530 :
531 : /// Called when compaction is completed.
532 23 : pub(crate) fn finish_compact_l0(
533 23 : &mut self,
534 23 : compact_from: &[Layer],
535 23 : compact_to: &[ResidentLayer],
536 23 : metrics: &TimelineMetrics,
537 23 : ) {
538 23 : let mut updates = self.layer_map.batch_update();
539 186 : for l in compact_to {
540 163 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
541 163 : metrics.record_new_file_metrics(l.layer_desc().file_size);
542 163 : }
543 224 : for l in compact_from {
544 201 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
545 201 : }
546 23 : updates.flush();
547 23 : }
548 :
549 : /// Called when a GC-compaction is completed.
550 24 : pub(crate) fn finish_gc_compaction(
551 24 : &mut self,
552 24 : compact_from: &[Layer],
553 24 : compact_to: &[ResidentLayer],
554 24 : metrics: &TimelineMetrics,
555 24 : ) {
556 : // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
557 :
558 : // Match the old layers with the new layers
559 24 : let mut add_layers = HashMap::new();
560 24 : let mut rewrite_layers = HashMap::new();
561 24 : let mut drop_layers = HashMap::new();
562 76 : for layer in compact_from {
563 52 : drop_layers.insert(layer.layer_desc().key(), layer.clone());
564 52 : }
565 51 : for layer in compact_to {
566 27 : if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
567 0 : rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
568 27 : } else {
569 27 : add_layers.insert(layer.layer_desc().key(), layer.clone());
570 27 : }
571 : }
572 24 : let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
573 24 : let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
574 24 : let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
575 :
576 24 : self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
577 24 : }
578 :
579 : /// Called post-compaction when some previous generation image layers were trimmed.
580 0 : pub fn rewrite_layers(
581 0 : &mut self,
582 0 : rewrite_layers: &[(Layer, ResidentLayer)],
583 0 : drop_layers: &[Layer],
584 0 : metrics: &TimelineMetrics,
585 0 : ) {
586 0 : self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
587 0 : }
588 :
589 24 : fn rewrite_layers_inner(
590 24 : &mut self,
591 24 : rewrite_layers: &[(Layer, ResidentLayer)],
592 24 : drop_layers: &[Layer],
593 24 : add_layers: &[ResidentLayer],
594 24 : metrics: &TimelineMetrics,
595 24 : ) {
596 24 : let mut updates = self.layer_map.batch_update();
597 24 : for (old_layer, new_layer) in rewrite_layers {
598 0 : debug_assert_eq!(
599 0 : old_layer.layer_desc().key_range,
600 0 : new_layer.layer_desc().key_range
601 : );
602 0 : debug_assert_eq!(
603 0 : old_layer.layer_desc().lsn_range,
604 0 : new_layer.layer_desc().lsn_range
605 : );
606 :
607 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
608 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
609 : // always marking rewritten layers as visible.
610 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
611 :
612 : // Safety: we may never rewrite the same file in-place. Callers are responsible
613 : // for ensuring that they only rewrite layers after something changes the path,
614 : // such as an increment in the generation number.
615 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
616 :
617 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
618 :
619 0 : Self::insert_historic_layer(
620 0 : new_layer.as_ref().clone(),
621 0 : &mut updates,
622 0 : &mut self.layer_fmgr,
623 : );
624 :
625 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
626 : }
627 76 : for l in drop_layers {
628 52 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
629 52 : }
630 51 : for l in add_layers {
631 27 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
632 27 : metrics.record_new_file_metrics(l.layer_desc().file_size);
633 27 : }
634 24 : updates.flush();
635 24 : }
636 :
637 : /// Called when garbage collect has selected the layers to be removed.
638 6 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
639 6 : let mut updates = self.layer_map.batch_update();
640 14 : for doomed_layer in gc_layers {
641 8 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
642 8 : }
643 6 : updates.flush()
644 6 : }
645 :
646 : #[cfg(test)]
647 86 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
648 86 : let mut updates = self.layer_map.batch_update();
649 86 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
650 86 : updates.flush()
651 86 : }
652 :
653 : /// Helper function to insert a layer into the layer map and file manager.
654 899 : fn insert_historic_layer(
655 899 : layer: Layer,
656 899 : updates: &mut BatchedUpdates<'_>,
657 899 : mapping: &mut LayerFileManager<Layer>,
658 899 : ) {
659 899 : updates.insert_historic(layer.layer_desc().clone());
660 899 : mapping.insert(layer);
661 899 : }
662 :
663 : /// Removes the layer from local FS (if present) and from memory.
664 : /// Remote storage is not affected by this operation.
665 261 : fn delete_historic_layer(
666 261 : // we cannot remove layers otherwise, since gc and compaction will race
667 261 : layer: &Layer,
668 261 : updates: &mut BatchedUpdates<'_>,
669 261 : mapping: &mut LayerFileManager<Layer>,
670 261 : ) {
671 261 : let desc = layer.layer_desc();
672 :
673 : // TODO Removing from the bottom of the layer map is expensive.
674 : // Maybe instead discard all layer map historic versions that
675 : // won't be needed for page reconstruction for this timeline,
676 : // and mark what we can't delete yet as deleted from the layer
677 : // map index without actually rebuilding the index.
678 261 : updates.remove_historic(desc);
679 261 : mapping.remove(layer);
680 261 : layer.delete_on_drop();
681 261 : }
682 :
683 : #[cfg(test)]
684 4 : pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc<InMemoryLayer>) {
685 : use pageserver_api::models::InMemoryLayerInfo;
686 :
687 4 : match layer.info() {
688 : InMemoryLayerInfo::Open { .. } => {
689 1 : assert!(self.layer_map.open_layer.is_none());
690 1 : self.layer_map.open_layer = Some(layer);
691 : }
692 3 : InMemoryLayerInfo::Frozen { lsn_start, .. } => {
693 3 : if let Some(last) = self.layer_map.frozen_layers.back() {
694 1 : assert!(last.get_lsn_range().end <= lsn_start);
695 2 : }
696 :
697 3 : self.layer_map.frozen_layers.push_back(layer);
698 : }
699 : }
700 4 : }
701 : }
702 :
703 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
704 :
705 : impl<T> Default for LayerFileManager<T> {
706 235 : fn default() -> Self {
707 235 : Self(HashMap::default())
708 235 : }
709 : }
710 :
711 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
712 899 : pub(crate) fn insert(&mut self, layer: T) {
713 899 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
714 899 : if present.is_some() && cfg!(debug_assertions) {
715 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
716 899 : }
717 899 : }
718 :
719 261 : pub(crate) fn remove(&mut self, layer: &T) {
720 261 : let present = self.0.remove(&layer.layer_desc().key());
721 261 : if present.is_none() && cfg!(debug_assertions) {
722 0 : panic!(
723 0 : "removing layer that is not present in layer mapping: {:?}",
724 0 : layer.layer_desc()
725 : )
726 261 : }
727 261 : }
728 : }
|