Line data Source code
1 : use anyhow::{bail, ensure, Context};
2 : use itertools::Itertools;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : context::RequestContext,
14 : metrics::TimelineMetrics,
15 : tenant::{
16 : layer_map::{BatchedUpdates, LayerMap},
17 : storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
19 : ResidentLayer,
20 : },
21 : },
22 : };
23 :
24 : use super::TimelineWriterState;
25 :
26 : /// Provides semantic APIs to manipulate the layer map.
27 : pub(crate) enum LayerManager {
28 : /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
29 : /// the layers.
30 : Open(OpenLayerManager),
31 : /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
32 : /// read-only.
33 : Closed {
34 : layers: HashMap<PersistentLayerKey, Layer>,
35 : },
36 : }
37 :
38 : impl Default for LayerManager {
39 418 : fn default() -> Self {
40 418 : LayerManager::Open(OpenLayerManager::default())
41 418 : }
42 : }
43 :
44 : impl LayerManager {
45 240656 : pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
46 240656 : // The assumption for the `expect()` is that all code maintains the following invariant:
47 240656 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
48 240656 : self.try_get_from_key(key)
49 240656 : .with_context(|| format!("get layer from key: {key}"))
50 240656 : .expect("not found")
51 240656 : .clone()
52 240656 : }
53 :
54 240656 : pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
55 240656 : self.layers().get(key)
56 240656 : }
57 :
58 240636 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
59 240636 : self.get_from_key(&desc.key())
60 240636 : }
61 :
62 : /// Get an immutable reference to the layer map.
63 : ///
64 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
65 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
66 1076922 : pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
67 : use LayerManager::*;
68 1076922 : match self {
69 1076922 : Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
70 0 : Closed { .. } => Err(Shutdown),
71 : }
72 1076922 : }
73 :
74 4916 : pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
75 : use LayerManager::*;
76 :
77 4916 : match self {
78 4916 : Open(open) => Ok(open),
79 0 : Closed { .. } => Err(Shutdown),
80 : }
81 4916 : }
82 :
83 : /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
84 : /// order to allow shutdown to complete.
85 : ///
86 : /// If there was a want to flush in-memory layers, it must have happened earlier.
87 10 : pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
88 : use LayerManager::*;
89 10 : match self {
90 : Open(OpenLayerManager {
91 10 : layer_map,
92 10 : layer_fmgr: LayerFileManager(hashmap),
93 10 : }) => {
94 10 : let open = layer_map.open_layer.take();
95 10 : let frozen = layer_map.frozen_layers.len();
96 10 : let taken_writer_state = writer_state.take();
97 10 : tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
98 10 : let layers = std::mem::take(hashmap);
99 10 : *self = Closed { layers };
100 10 : assert_eq!(open.is_some(), taken_writer_state.is_some());
101 : }
102 : Closed { .. } => {
103 0 : tracing::debug!("ignoring multiple shutdowns on layer manager")
104 : }
105 : }
106 10 : }
107 :
108 : /// Sum up the historic layer sizes
109 0 : pub(crate) fn layer_size_sum(&self) -> u64 {
110 0 : self.layers()
111 0 : .values()
112 0 : .map(|l| l.layer_desc().file_size)
113 0 : .sum()
114 0 : }
115 :
116 22 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
117 33 : self.layers().values().filter(|l| l.is_likely_resident())
118 22 : }
119 :
120 308 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
121 308 : self.contains_key(&layer.layer_desc().key())
122 308 : }
123 :
124 394 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
125 394 : self.layers().contains_key(key)
126 394 : }
127 :
128 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
129 0 : self.layers().keys().cloned().collect_vec()
130 0 : }
131 :
132 241072 : fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
133 : use LayerManager::*;
134 241072 : match self {
135 241072 : Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
136 0 : Closed { layers } => layers,
137 : }
138 241072 : }
139 : }
140 :
141 : #[derive(Default)]
142 : pub(crate) struct OpenLayerManager {
143 : layer_map: LayerMap,
144 : layer_fmgr: LayerFileManager<Layer>,
145 : }
146 :
147 : impl std::fmt::Debug for OpenLayerManager {
148 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 0 : f.debug_struct("OpenLayerManager")
150 0 : .field("layer_count", &self.layer_fmgr.0.len())
151 0 : .finish()
152 0 : }
153 : }
154 :
155 0 : #[derive(Debug, thiserror::Error)]
156 : #[error("layer manager has been shutdown")]
157 : pub(crate) struct Shutdown;
158 :
159 : impl OpenLayerManager {
160 : /// Called from `load_layer_map`. Initialize the layer manager with:
161 : /// 1. all on-disk layers
162 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
163 6 : pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
164 6 : let mut updates = self.layer_map.batch_update();
165 22 : for layer in layers {
166 16 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
167 16 : }
168 6 : updates.flush();
169 6 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
170 6 : }
171 :
172 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
173 412 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
174 412 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
175 412 : }
176 :
177 : /// Open a new writable layer to append data if there is no open layer, otherwise return the
178 : /// current open layer, called within `get_layer_for_write`.
179 1268 : pub(crate) async fn get_layer_for_write(
180 1268 : &mut self,
181 1268 : lsn: Lsn,
182 1268 : conf: &'static PageServerConf,
183 1268 : timeline_id: TimelineId,
184 1268 : tenant_shard_id: TenantShardId,
185 1268 : gate_guard: utils::sync::gate::GateGuard,
186 1268 : ctx: &RequestContext,
187 1268 : ) -> anyhow::Result<Arc<InMemoryLayer>> {
188 1268 : ensure!(lsn.is_aligned());
189 :
190 : // Do we have a layer open for writing already?
191 1268 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
192 0 : if open_layer.get_lsn_range().start > lsn {
193 0 : bail!(
194 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
195 0 : open_layer.get_lsn_range().start,
196 0 : lsn
197 0 : );
198 0 : }
199 0 :
200 0 : Arc::clone(open_layer)
201 : } else {
202 : // No writeable layer yet. Create one.
203 1268 : let start_lsn = self
204 1268 : .layer_map
205 1268 : .next_open_layer_at
206 1268 : .context("No next open layer found")?;
207 :
208 1268 : trace!(
209 0 : "creating in-memory layer at {}/{} for record at {}",
210 : timeline_id,
211 : start_lsn,
212 : lsn
213 : );
214 :
215 1268 : let new_layer = InMemoryLayer::create(
216 1268 : conf,
217 1268 : timeline_id,
218 1268 : tenant_shard_id,
219 1268 : start_lsn,
220 1268 : gate_guard,
221 1268 : ctx,
222 1268 : )
223 719 : .await?;
224 1268 : let layer = Arc::new(new_layer);
225 1268 :
226 1268 : self.layer_map.open_layer = Some(layer.clone());
227 1268 : self.layer_map.next_open_layer_at = None;
228 1268 :
229 1268 : layer
230 : };
231 :
232 1268 : Ok(layer)
233 1268 : }
234 :
235 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
236 : ///
237 : /// Returns true if anything was frozen.
238 1172 : pub(super) async fn try_freeze_in_memory_layer(
239 1172 : &mut self,
240 1172 : lsn: Lsn,
241 1172 : last_freeze_at: &AtomicLsn,
242 1172 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
243 1172 : ) -> bool {
244 1172 : let Lsn(last_record_lsn) = lsn;
245 1172 : let end_lsn = Lsn(last_record_lsn + 1);
246 :
247 1172 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
248 1144 : let open_layer_rc = Arc::clone(open_layer);
249 1144 : open_layer.freeze(end_lsn).await;
250 :
251 : // The layer is no longer open, update the layer map to reflect this.
252 : // We will replace it with on-disk historics below.
253 1144 : self.layer_map.frozen_layers.push_back(open_layer_rc);
254 1144 : self.layer_map.open_layer = None;
255 1144 : self.layer_map.next_open_layer_at = Some(end_lsn);
256 1144 :
257 1144 : true
258 : } else {
259 28 : false
260 : };
261 :
262 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
263 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
264 1172 : last_freeze_at.store(end_lsn);
265 1172 :
266 1172 : // the writer state must no longer have a reference to the frozen layer
267 1172 : let taken = write_lock.take();
268 1172 : assert_eq!(
269 1172 : froze,
270 1172 : taken.is_some(),
271 0 : "should only had frozen a layer when TimelineWriterState existed"
272 : );
273 :
274 1172 : froze
275 1172 : }
276 :
277 : /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
278 716 : pub(crate) fn track_new_image_layers(
279 716 : &mut self,
280 716 : image_layers: &[ResidentLayer],
281 716 : metrics: &TimelineMetrics,
282 716 : ) {
283 716 : let mut updates = self.layer_map.batch_update();
284 930 : for layer in image_layers {
285 214 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
286 214 :
287 214 : // record these here instead of Layer::finish_creating because otherwise partial
288 214 : // failure with create_image_layers would balloon up the physical size gauge. downside
289 214 : // is that all layers need to be created before metrics are updated.
290 214 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
291 214 : }
292 716 : updates.flush();
293 716 : }
294 :
295 : /// Flush a frozen layer and add the written delta layer to the layer map.
296 1144 : pub(crate) fn finish_flush_l0_layer(
297 1144 : &mut self,
298 1144 : delta_layer: Option<&ResidentLayer>,
299 1144 : frozen_layer_for_check: &Arc<InMemoryLayer>,
300 1144 : metrics: &TimelineMetrics,
301 1144 : ) {
302 1144 : let inmem = self
303 1144 : .layer_map
304 1144 : .frozen_layers
305 1144 : .pop_front()
306 1144 : .expect("there must be a inmem layer to flush");
307 1144 :
308 1144 : // Only one task may call this function at a time (for this
309 1144 : // timeline). If two tasks tried to flush the same frozen
310 1144 : // layer to disk at the same time, that would not work.
311 1144 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
312 :
313 1144 : if let Some(l) = delta_layer {
314 968 : let mut updates = self.layer_map.batch_update();
315 968 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
316 968 : metrics.record_new_file_metrics(l.layer_desc().file_size);
317 968 : updates.flush();
318 968 : }
319 1144 : }
320 :
321 : /// Called when compaction is completed.
322 64 : pub(crate) fn finish_compact_l0(
323 64 : &mut self,
324 64 : compact_from: &[Layer],
325 64 : compact_to: &[ResidentLayer],
326 64 : metrics: &TimelineMetrics,
327 64 : ) {
328 64 : let mut updates = self.layer_map.batch_update();
329 412 : for l in compact_to {
330 348 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
331 348 : metrics.record_new_file_metrics(l.layer_desc().file_size);
332 348 : }
333 540 : for l in compact_from {
334 476 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
335 476 : }
336 64 : updates.flush();
337 64 : }
338 :
339 : /// Called when a GC-compaction is completed.
340 36 : pub(crate) fn finish_gc_compaction(
341 36 : &mut self,
342 36 : compact_from: &[Layer],
343 36 : compact_to: &[ResidentLayer],
344 36 : metrics: &TimelineMetrics,
345 36 : ) {
346 36 : // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
347 36 : self.finish_compact_l0(compact_from, compact_to, metrics)
348 36 : }
349 :
350 : /// Called post-compaction when some previous generation image layers were trimmed.
351 0 : pub(crate) fn rewrite_layers(
352 0 : &mut self,
353 0 : rewrite_layers: &[(Layer, ResidentLayer)],
354 0 : drop_layers: &[Layer],
355 0 : metrics: &TimelineMetrics,
356 0 : ) {
357 0 : let mut updates = self.layer_map.batch_update();
358 0 : for (old_layer, new_layer) in rewrite_layers {
359 0 : debug_assert_eq!(
360 0 : old_layer.layer_desc().key_range,
361 0 : new_layer.layer_desc().key_range
362 : );
363 0 : debug_assert_eq!(
364 0 : old_layer.layer_desc().lsn_range,
365 0 : new_layer.layer_desc().lsn_range
366 : );
367 :
368 : // Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
369 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
370 : // always marking rewritten layers as visible.
371 0 : new_layer.as_ref().set_visibility(old_layer.visibility());
372 0 :
373 0 : // Safety: we may never rewrite the same file in-place. Callers are responsible
374 0 : // for ensuring that they only rewrite layers after something changes the path,
375 0 : // such as an increment in the generation number.
376 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
377 :
378 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
379 0 :
380 0 : Self::insert_historic_layer(
381 0 : new_layer.as_ref().clone(),
382 0 : &mut updates,
383 0 : &mut self.layer_fmgr,
384 0 : );
385 0 :
386 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
387 : }
388 0 : for l in drop_layers {
389 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
390 0 : }
391 0 : updates.flush();
392 0 : }
393 :
394 : /// Called when garbage collect has selected the layers to be removed.
395 8 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
396 8 : let mut updates = self.layer_map.batch_update();
397 18 : for doomed_layer in gc_layers {
398 10 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
399 10 : }
400 8 : updates.flush()
401 8 : }
402 :
403 : #[cfg(test)]
404 126 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
405 126 : let mut updates = self.layer_map.batch_update();
406 126 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
407 126 : updates.flush()
408 126 : }
409 :
410 : /// Helper function to insert a layer into the layer map and file manager.
411 1672 : fn insert_historic_layer(
412 1672 : layer: Layer,
413 1672 : updates: &mut BatchedUpdates<'_>,
414 1672 : mapping: &mut LayerFileManager<Layer>,
415 1672 : ) {
416 1672 : updates.insert_historic(layer.layer_desc().clone());
417 1672 : mapping.insert(layer);
418 1672 : }
419 :
420 : /// Removes the layer from local FS (if present) and from memory.
421 : /// Remote storage is not affected by this operation.
422 486 : fn delete_historic_layer(
423 486 : // we cannot remove layers otherwise, since gc and compaction will race
424 486 : layer: &Layer,
425 486 : updates: &mut BatchedUpdates<'_>,
426 486 : mapping: &mut LayerFileManager<Layer>,
427 486 : ) {
428 486 : let desc = layer.layer_desc();
429 486 :
430 486 : // TODO Removing from the bottom of the layer map is expensive.
431 486 : // Maybe instead discard all layer map historic versions that
432 486 : // won't be needed for page reconstruction for this timeline,
433 486 : // and mark what we can't delete yet as deleted from the layer
434 486 : // map index without actually rebuilding the index.
435 486 : updates.remove_historic(desc);
436 486 : mapping.remove(layer);
437 486 : layer.delete_on_drop();
438 486 : }
439 : }
440 :
441 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
442 :
443 : impl<T> Default for LayerFileManager<T> {
444 418 : fn default() -> Self {
445 418 : Self(HashMap::default())
446 418 : }
447 : }
448 :
449 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
450 1672 : pub(crate) fn insert(&mut self, layer: T) {
451 1672 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
452 1672 : if present.is_some() && cfg!(debug_assertions) {
453 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
454 1672 : }
455 1672 : }
456 :
457 486 : pub(crate) fn remove(&mut self, layer: &T) {
458 486 : let present = self.0.remove(&layer.layer_desc().key());
459 486 : if present.is_none() && cfg!(debug_assertions) {
460 0 : panic!(
461 0 : "removing layer that is not present in layer mapping: {:?}",
462 0 : layer.layer_desc()
463 0 : )
464 486 : }
465 486 : }
466 : }
|