Line data Source code
1 : use anyhow::{bail, ensure, Context};
2 : use itertools::Itertools;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : context::RequestContext,
14 : metrics::TimelineMetrics,
15 : tenant::{
16 : layer_map::{BatchedUpdates, LayerMap},
17 : storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
19 : ResidentLayer,
20 : },
21 : },
22 : };
23 :
24 : use super::TimelineWriterState;
25 :
26 : /// Provides semantic APIs to manipulate the layer map.
27 : pub(crate) enum LayerManager {
28 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
29 : /// the layers.
30 : Open(OpenLayerManager),
31 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
32 : /// read-only.
33 : Closed {
34 : layers: HashMap<PersistentLayerKey, Layer>,
35 : },
36 : }
37 :
38 : impl Default for LayerManager {
39 1236 : fn default() -> Self {
40 1236 : LayerManager::Open(OpenLayerManager::default())
41 1236 : }
42 : }
43 :
44 : impl LayerManager {
45 840636 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
46 840636 : // The assumption for the `expect()` is that all code maintains the following invariant:
47 840636 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
48 840636 : self.layers()
49 840636 : .get(key)
50 840636 : .with_context(|| format!("get layer from key: {key}"))
51 840636 : .expect("not found")
52 840636 : .clone()
53 840636 : }
54 :
55 840588 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
56 840588 : self.get_from_key(&desc.key())
57 840588 : }
58 :
59 : /// Get an immutable reference to the layer map.
60 : ///
61 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
62 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
63 3151514 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
64 : use LayerManager::*;
65 3151514 : match self {
66 3151514 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
67 0 : Closed { .. } => Err(Shutdown),
68 : }
69 3151514 : }
70 :
71 14592 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
72 : use LayerManager::*;
73 :
74 14592 : match self {
75 14592 : Open(open) => Ok(open),
76 0 : Closed { .. } => Err(Shutdown),
77 : }
78 14592 : }
79 :
80 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
81 : /// order to allow shutdown to complete.
82 : ///
83 : /// If there was a want to flush in-memory layers, it must have happened earlier.
84 24 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
85 : use LayerManager::*;
86 24 : match self {
87 : Open(OpenLayerManager {
88 24 : layer_map,
89 24 : layer_fmgr: LayerFileManager(hashmap),
90 24 : }) => {
91 24 : let open = layer_map.open_layer.take();
92 24 : let frozen = layer_map.frozen_layers.len();
93 24 : let taken_writer_state = writer_state.take();
94 24 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
95 24 : let layers = std::mem::take(hashmap);
96 24 : *self = Closed { layers };
97 24 : assert_eq!(open.is_some(), taken_writer_state.is_some());
98 : }
99 : Closed { .. } => {
100 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
101 : }
102 : }
103 24 : }
104 :
105 : /// Sum up the historic layer sizes
106 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
107 0 : self.layers()
108 0 : .values()
109 0 : .map(|l| l.layer_desc().file_size)
110 0 : .sum()
111 0 : }
112 :
113 66 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
114 99 : self.layers().values().filter(|l| l.is_likely_resident())
115 66 : }
116 :
117 924 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
118 924 : self.contains_key(&layer.layer_desc().key())
119 924 : }
120 :
121 1122 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
122 1122 : self.layers().contains_key(key)
123 1122 : }
124 :
125 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
126 0 : self.layers().keys().cloned().collect_vec()
127 0 : }
128 :
129 841824 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
130 : use LayerManager::*;
131 841824 : match self {
132 841824 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
133 0 : Closed { layers } => layers,
134 : }
135 841824 : }
136 : }
137 :
138 : #[derive(Default)]
139 : pub(crate) struct OpenLayerManager {
140 : layer_map: LayerMap,
141 : layer_fmgr: LayerFileManager<Layer>,
142 : }
143 :
144 : impl std::fmt::Debug for OpenLayerManager {
145 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 0 : f.debug_struct("OpenLayerManager")
147 0 : .field("layer_count", &self.layer_fmgr.0.len())
148 0 : .finish()
149 0 : }
150 : }
151 :
152 0 : #[derive(Debug, thiserror::Error)]
153 : #[error("layer manager has been shutdown")]
154 : pub(crate) struct Shutdown;
155 :
156 : impl OpenLayerManager {
157 : /// Called from `load_layer_map`. Initialize the layer manager with:
158 : /// 1. all on-disk layers
159 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
160 18 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
161 18 : let mut updates = self.layer_map.batch_update();
162 66 : for layer in layers {
163 48 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
164 48 : }
165 18 : updates.flush();
166 18 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
167 18 : }
168 :
169 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
170 1218 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
171 1218 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
172 1218 : }
173 :
174 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
175 : /// current open layer, called within `get_layer_for_write`.
176 3810 : pub(crate) async fn get_layer_for_write(
177 3810 : &mut self,
178 3810 : lsn: Lsn,
179 3810 : conf: &'static PageServerConf,
180 3810 : timeline_id: TimelineId,
181 3810 : tenant_shard_id: TenantShardId,
182 3810 : gate_guard: utils::sync::gate::GateGuard,
183 3810 : ctx: &RequestContext,
184 3810 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
185 3810 : ensure!(lsn.is_aligned());
186 :
187 : // Do we have a layer open for writing already?
188 3810 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
189 0 : if open_layer.get_lsn_range().start > lsn {
190 0 : bail!(
191 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
192 0 : open_layer.get_lsn_range().start,
193 0 : lsn
194 0 : );
195 0 : }
196 0 :
197 0 : Arc::clone(open_layer)
198 : } else {
199 : // No writeable layer yet. Create one.
200 3810 : let start_lsn = self
201 3810 : .layer_map
202 3810 : .next_open_layer_at
203 3810 : .context("No next open layer found")?;
204 :
205 3810 : trace!(
206 0 : "creating in-memory layer at {}/{} for record at {}",
207 : timeline_id,
208 : start_lsn,
209 : lsn
210 : );
211 :
212 3810 : let new_layer = InMemoryLayer::create(
213 3810 : conf,
214 3810 : timeline_id,
215 3810 : tenant_shard_id,
216 3810 : start_lsn,
217 3810 : gate_guard,
218 3810 : ctx,
219 3810 : )
220 2154 : .await?;
221 3810 : let layer = Arc::new(new_layer);
222 3810 :
223 3810 : self.layer_map.open_layer = Some(layer.clone());
224 3810 : self.layer_map.next_open_layer_at = None;
225 3810 :
226 3810 : layer
227 : };
228 :
229 3810 : Ok(layer)
230 3810 : }
231 :
232 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
233 : ///
234 : /// Returns true if anything was frozen.
235 3504 : pub(super) async fn try_freeze_in_memory_layer(
236 3504 : &mut self,
237 3504 : lsn: Lsn,
238 3504 : last_freeze_at: &AtomicLsn,
239 3504 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
240 3504 : ) -> bool {
241 3504 : let Lsn(last_record_lsn) = lsn;
242 3504 : let end_lsn = Lsn(last_record_lsn + 1);
243 :
244 3504 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
245 3420 : let open_layer_rc = Arc::clone(open_layer);
246 3420 : open_layer.freeze(end_lsn).await;
247 :
248 : // The layer is no longer open, update the layer map to reflect this.
249 : // We will replace it with on-disk historics below.
250 3420 : self.layer_map.frozen_layers.push_back(open_layer_rc);
251 3420 : self.layer_map.open_layer = None;
252 3420 : self.layer_map.next_open_layer_at = Some(end_lsn);
253 3420 :
254 3420 : true
255 : } else {
256 84 : false
257 : };
258 :
259 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
260 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
261 3504 : last_freeze_at.store(end_lsn);
262 3504 :
263 3504 : // the writer state must no longer have a reference to the frozen layer
264 3504 : let taken = write_lock.take();
265 3504 : assert_eq!(
266 3504 : froze,
267 3504 : taken.is_some(),
268 0 : "should only had frozen a layer when TimelineWriterState existed"
269 : );
270 :
271 3504 : froze
272 3504 : }
273 :
274 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
275 2124 : pub(crate) fn track_new_image_layers(
276 2124 : &mut self,
277 2124 : image_layers: &[ResidentLayer],
278 2124 : metrics: &TimelineMetrics,
279 2124 : ) {
280 2124 : let mut updates = self.layer_map.batch_update();
281 2754 : for layer in image_layers {
282 630 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
283 630 :
284 630 : // record these here instead of Layer::finish_creating because otherwise partial
285 630 : // failure with create_image_layers would balloon up the physical size gauge. downside
286 630 : // is that all layers need to be created before metrics are updated.
287 630 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
288 630 : }
289 2124 : updates.flush();
290 2124 : }
291 :
292 : /// Flush a frozen layer and add the written delta layer to the layer map.
293 3420 : pub(crate) fn finish_flush_l0_layer(
294 3420 : &mut self,
295 3420 : delta_layer: Option<&ResidentLayer>,
296 3420 : frozen_layer_for_check: &Arc<InMemoryLayer>,
297 3420 : metrics: &TimelineMetrics,
298 3420 : ) {
299 3420 : let inmem = self
300 3420 : .layer_map
301 3420 : .frozen_layers
302 3420 : .pop_front()
303 3420 : .expect("there must be a inmem layer to flush");
304 3420 :
305 3420 : // Only one task may call this function at a time (for this
306 3420 : // timeline). If two tasks tried to flush the same frozen
307 3420 : // layer to disk at the same time, that would not work.
308 3420 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
309 :
310 3420 : if let Some(l) = delta_layer {
311 2904 : let mut updates = self.layer_map.batch_update();
312 2904 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
313 2904 : metrics.record_new_file_metrics(l.layer_desc().file_size);
314 2904 : updates.flush();
315 2904 : }
316 3420 : }
317 :
318 : /// Called when compaction is completed.
319 150 : pub(crate) fn finish_compact_l0(
320 150 : &mut self,
321 150 : compact_from: &[Layer],
322 150 : compact_to: &[ResidentLayer],
323 150 : metrics: &TimelineMetrics,
324 150 : ) {
325 150 : let mut updates = self.layer_map.batch_update();
326 1140 : for l in compact_to {
327 990 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
328 990 : metrics.record_new_file_metrics(l.layer_desc().file_size);
329 990 : }
330 1500 : for l in compact_from {
331 1350 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
332 1350 : }
333 150 : updates.flush();
334 150 : }
335 :
336 : /// Called when a GC-compaction is completed.
337 66 : pub(crate) fn finish_gc_compaction(
338 66 : &mut self,
339 66 : compact_from: &[Layer],
340 66 : compact_to: &[ResidentLayer],
341 66 : metrics: &TimelineMetrics,
342 66 : ) {
343 66 : // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
344 66 : self.finish_compact_l0(compact_from, compact_to, metrics)
345 66 : }
346 :
347 : /// Called post-compaction when some previous generation image layers were trimmed.
348 0 : pub(crate) fn rewrite_layers(
349 0 : &mut self,
350 0 : rewrite_layers: &[(Layer, ResidentLayer)],
351 0 : drop_layers: &[Layer],
352 0 : metrics: &TimelineMetrics,
353 0 : ) {
354 0 : let mut updates = self.layer_map.batch_update();
355 0 : for (old_layer, new_layer) in rewrite_layers {
356 0 : debug_assert_eq!(
357 0 : old_layer.layer_desc().key_range,
358 0 : new_layer.layer_desc().key_range
359 : );
360 0 : debug_assert_eq!(
361 0 : old_layer.layer_desc().lsn_range,
362 0 : new_layer.layer_desc().lsn_range
363 : );
364 :
365 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
366 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
367 : // always marking rewritten layers as visible.
368 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
369 0 :
370 0 : // Safety: we may never rewrite the same file in-place. Callers are responsible
371 0 : // for ensuring that they only rewrite layers after something changes the path,
372 0 : // such as an increment in the generation number.
373 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
374 :
375 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
376 0 :
377 0 : Self::insert_historic_layer(
378 0 : new_layer.as_ref().clone(),
379 0 : &mut updates,
380 0 : &mut self.layer_fmgr,
381 0 : );
382 0 :
383 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
384 : }
385 0 : for l in drop_layers {
386 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
387 0 : }
388 0 : updates.flush();
389 0 : }
390 :
391 : /// Called when garbage collect has selected the layers to be removed.
392 36 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
393 36 : let mut updates = self.layer_map.batch_update();
394 84 : for doomed_layer in gc_layers {
395 48 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
396 48 : }
397 36 : updates.flush()
398 36 : }
399 :
400 : #[cfg(test)]
401 312 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
402 312 : let mut updates = self.layer_map.batch_update();
403 312 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
404 312 : updates.flush()
405 312 : }
406 :
407 : /// Helper function to insert a layer into the layer map and file manager.
408 4884 : fn insert_historic_layer(
409 4884 : layer: Layer,
410 4884 : updates: &mut BatchedUpdates<'_>,
411 4884 : mapping: &mut LayerFileManager<Layer>,
412 4884 : ) {
413 4884 : updates.insert_historic(layer.layer_desc().clone());
414 4884 : mapping.insert(layer);
415 4884 : }
416 :
417 : /// Removes the layer from local FS (if present) and from memory.
418 : /// Remote storage is not affected by this operation.
419 1398 : fn delete_historic_layer(
420 1398 : // we cannot remove layers otherwise, since gc and compaction will race
421 1398 : layer: &Layer,
422 1398 : updates: &mut BatchedUpdates<'_>,
423 1398 : mapping: &mut LayerFileManager<Layer>,
424 1398 : ) {
425 1398 : let desc = layer.layer_desc();
426 1398 :
427 1398 : // TODO Removing from the bottom of the layer map is expensive.
428 1398 : // Maybe instead discard all layer map historic versions that
429 1398 : // won't be needed for page reconstruction for this timeline,
430 1398 : // and mark what we can't delete yet as deleted from the layer
431 1398 : // map index without actually rebuilding the index.
432 1398 : updates.remove_historic(desc);
433 1398 : mapping.remove(layer);
434 1398 : layer.delete_on_drop();
435 1398 : }
436 : }
437 :
438 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
439 :
440 : impl<T> Default for LayerFileManager<T> {
441 1236 : fn default() -> Self {
442 1236 : Self(HashMap::default())
443 1236 : }
444 : }
445 :
446 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
447 4884 : pub(crate) fn insert(&mut self, layer: T) {
448 4884 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
449 4884 : if present.is_some() && cfg!(debug_assertions) {
450 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
451 4884 : }
452 4884 : }
453 :
454 1398 : pub(crate) fn remove(&mut self, layer: &T) {
455 1398 : let present = self.0.remove(&layer.layer_desc().key());
456 1398 : if present.is_none() && cfg!(debug_assertions) {
457 0 : panic!(
458 0 : "removing layer that is not present in layer mapping: {:?}",
459 0 : layer.layer_desc()
460 0 : )
461 1398 : }
462 1398 : }
463 : }
|