Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use itertools::Itertools;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : context::RequestContext,
14 : metrics::TimelineMetrics,
15 : tenant::{
16 : layer_map::{BatchedUpdates, LayerMap},
17 : storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
19 : ResidentLayer,
20 : },
21 : },
22 : };
23 :
24 : use super::TimelineWriterState;
25 :
26 : /// Provides semantic APIs to manipulate the layer map.
27 : #[derive(Default)]
28 : pub(crate) struct LayerManager {
29 : layer_map: LayerMap,
30 : layer_fmgr: LayerFileManager<Layer>,
31 : }
32 :
33 : impl LayerManager {
34 277061 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
35 277061 : self.layer_fmgr.get_from_desc(desc)
36 277061 : }
37 :
38 : /// Get an immutable reference to the layer map.
39 : ///
40 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
41 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
42 821505 : pub(crate) fn layer_map(&self) -> &LayerMap {
43 821505 : &self.layer_map
44 821505 : }
45 :
46 : /// Called from `load_layer_map`. Initialize the layer manager with:
47 : /// 1. all on-disk layers
48 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
49 6 : pub(crate) fn initialize_local_layers(
50 6 : &mut self,
51 6 : on_disk_layers: Vec<Layer>,
52 6 : next_open_layer_at: Lsn,
53 6 : ) {
54 6 : let mut updates = self.layer_map.batch_update();
55 22 : for layer in on_disk_layers {
56 16 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
57 16 : }
58 6 : updates.flush();
59 6 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
60 6 : }
61 :
62 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
63 377 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
64 377 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
65 377 : }
66 :
67 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
68 : /// called within `get_layer_for_write`.
69 1243 : pub(crate) async fn get_layer_for_write(
70 1243 : &mut self,
71 1243 : lsn: Lsn,
72 1243 : last_record_lsn: Lsn,
73 1243 : conf: &'static PageServerConf,
74 1243 : timeline_id: TimelineId,
75 1243 : tenant_shard_id: TenantShardId,
76 1243 : ctx: &RequestContext,
77 1243 : ) -> Result<Arc<InMemoryLayer>> {
78 1243 : ensure!(lsn.is_aligned());
79 :
80 1243 : ensure!(
81 1243 : lsn > last_record_lsn,
82 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
83 : lsn,
84 : last_record_lsn,
85 : );
86 :
87 : // Do we have a layer open for writing already?
88 1243 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
89 0 : if open_layer.get_lsn_range().start > lsn {
90 0 : bail!(
91 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
92 0 : open_layer.get_lsn_range().start,
93 0 : lsn
94 0 : );
95 0 : }
96 0 :
97 0 : Arc::clone(open_layer)
98 : } else {
99 : // No writeable layer yet. Create one.
100 1243 : let start_lsn = self
101 1243 : .layer_map
102 1243 : .next_open_layer_at
103 1243 : .context("No next open layer found")?;
104 :
105 1243 : trace!(
106 0 : "creating in-memory layer at {}/{} for record at {}",
107 : timeline_id,
108 : start_lsn,
109 : lsn
110 : );
111 :
112 1243 : let new_layer =
113 1243 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
114 1243 : let layer = Arc::new(new_layer);
115 1243 :
116 1243 : self.layer_map.open_layer = Some(layer.clone());
117 1243 : self.layer_map.next_open_layer_at = None;
118 1243 :
119 1243 : layer
120 : };
121 :
122 1243 : Ok(layer)
123 1243 : }
124 :
125 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
126 : ///
127 : /// Returns true if anything was frozen.
128 1141 : pub(super) async fn try_freeze_in_memory_layer(
129 1141 : &mut self,
130 1141 : lsn: Lsn,
131 1141 : last_freeze_at: &AtomicLsn,
132 1141 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
133 1141 : ) -> bool {
134 1141 : let Lsn(last_record_lsn) = lsn;
135 1141 : let end_lsn = Lsn(last_record_lsn + 1);
136 :
137 1141 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
138 1113 : let open_layer_rc = Arc::clone(open_layer);
139 1113 : open_layer.freeze(end_lsn).await;
140 :
141 : // The layer is no longer open, update the layer map to reflect this.
142 : // We will replace it with on-disk historics below.
143 1113 : self.layer_map.frozen_layers.push_back(open_layer_rc);
144 1113 : self.layer_map.open_layer = None;
145 1113 : self.layer_map.next_open_layer_at = Some(end_lsn);
146 1113 :
147 1113 : true
148 : } else {
149 28 : false
150 : };
151 :
152 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
153 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
154 1141 : last_freeze_at.store(end_lsn);
155 1141 :
156 1141 : // the writer state must no longer have a reference to the frozen layer
157 1141 : let taken = write_lock.take();
158 1141 : assert_eq!(
159 1141 : froze,
160 1141 : taken.is_some(),
161 0 : "should only had frozen a layer when TimelineWriterState existed"
162 : );
163 :
164 1141 : froze
165 1141 : }
166 :
167 : /// Add image layers to the layer map, called from `create_image_layers`.
168 509 : pub(crate) fn track_new_image_layers(
169 509 : &mut self,
170 509 : image_layers: &[ResidentLayer],
171 509 : metrics: &TimelineMetrics,
172 509 : ) {
173 509 : let mut updates = self.layer_map.batch_update();
174 694 : for layer in image_layers {
175 185 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
176 185 :
177 185 : // record these here instead of Layer::finish_creating because otherwise partial
178 185 : // failure with create_image_layers would balloon up the physical size gauge. downside
179 185 : // is that all layers need to be created before metrics are updated.
180 185 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
181 185 : }
182 509 : updates.flush();
183 509 : }
184 :
185 : /// Flush a frozen layer and add the written delta layer to the layer map.
186 1113 : pub(crate) fn finish_flush_l0_layer(
187 1113 : &mut self,
188 1113 : delta_layer: Option<&ResidentLayer>,
189 1113 : frozen_layer_for_check: &Arc<InMemoryLayer>,
190 1113 : metrics: &TimelineMetrics,
191 1113 : ) {
192 1113 : let inmem = self
193 1113 : .layer_map
194 1113 : .frozen_layers
195 1113 : .pop_front()
196 1113 : .expect("there must be a inmem layer to flush");
197 1113 :
198 1113 : // Only one task may call this function at a time (for this
199 1113 : // timeline). If two tasks tried to flush the same frozen
200 1113 : // layer to disk at the same time, that would not work.
201 1113 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
202 :
203 1113 : if let Some(l) = delta_layer {
204 968 : let mut updates = self.layer_map.batch_update();
205 968 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
206 968 : metrics.record_new_file_metrics(l.layer_desc().file_size);
207 968 : updates.flush();
208 968 : }
209 1113 : }
210 :
211 : /// Called when compaction is completed.
212 32 : pub(crate) fn finish_compact_l0(
213 32 : &mut self,
214 32 : compact_from: &[Layer],
215 32 : compact_to: &[ResidentLayer],
216 32 : metrics: &TimelineMetrics,
217 32 : ) {
218 32 : let mut updates = self.layer_map.batch_update();
219 348 : for l in compact_to {
220 316 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
221 316 : metrics.record_new_file_metrics(l.layer_desc().file_size);
222 316 : }
223 450 : for l in compact_from {
224 418 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
225 418 : }
226 32 : updates.flush();
227 32 : }
228 :
229 : /// Called when a GC-compaction is completed.
230 4 : pub(crate) fn finish_gc_compaction(
231 4 : &mut self,
232 4 : compact_from: &[Layer],
233 4 : compact_to: &[ResidentLayer],
234 4 : metrics: &TimelineMetrics,
235 4 : ) {
236 4 : // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
237 4 : self.finish_compact_l0(compact_from, compact_to, metrics)
238 4 : }
239 :
240 : /// Called when compaction is completed.
241 0 : pub(crate) fn rewrite_layers(
242 0 : &mut self,
243 0 : rewrite_layers: &[(Layer, ResidentLayer)],
244 0 : drop_layers: &[Layer],
245 0 : metrics: &TimelineMetrics,
246 0 : ) {
247 0 : let mut updates = self.layer_map.batch_update();
248 0 : for (old_layer, new_layer) in rewrite_layers {
249 0 : debug_assert_eq!(
250 0 : old_layer.layer_desc().key_range,
251 0 : new_layer.layer_desc().key_range
252 : );
253 0 : debug_assert_eq!(
254 0 : old_layer.layer_desc().lsn_range,
255 0 : new_layer.layer_desc().lsn_range
256 : );
257 :
258 : // Safety: we may never rewrite the same file in-place. Callers are responsible
259 : // for ensuring that they only rewrite layers after something changes the path,
260 : // such as an increment in the generation number.
261 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
262 :
263 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
264 0 :
265 0 : Self::insert_historic_layer(
266 0 : new_layer.as_ref().clone(),
267 0 : &mut updates,
268 0 : &mut self.layer_fmgr,
269 0 : );
270 0 :
271 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
272 : }
273 0 : for l in drop_layers {
274 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
275 0 : }
276 0 : updates.flush();
277 0 : }
278 :
279 : /// Called when garbage collect has selected the layers to be removed.
280 12 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
281 12 : let mut updates = self.layer_map.batch_update();
282 28 : for doomed_layer in gc_layers {
283 16 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
284 16 : }
285 12 : updates.flush()
286 12 : }
287 :
288 : #[cfg(test)]
289 72 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
290 72 : let mut updates = self.layer_map.batch_update();
291 72 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
292 72 : updates.flush()
293 72 : }
294 :
295 : /// Helper function to insert a layer into the layer map and file manager.
296 1557 : fn insert_historic_layer(
297 1557 : layer: Layer,
298 1557 : updates: &mut BatchedUpdates<'_>,
299 1557 : mapping: &mut LayerFileManager<Layer>,
300 1557 : ) {
301 1557 : updates.insert_historic(layer.layer_desc().clone());
302 1557 : mapping.insert(layer);
303 1557 : }
304 :
305 : /// Removes the layer from local FS (if present) and from memory.
306 : /// Remote storage is not affected by this operation.
307 434 : fn delete_historic_layer(
308 434 : // we cannot remove layers otherwise, since gc and compaction will race
309 434 : layer: &Layer,
310 434 : updates: &mut BatchedUpdates<'_>,
311 434 : mapping: &mut LayerFileManager<Layer>,
312 434 : ) {
313 434 : let desc = layer.layer_desc();
314 434 :
315 434 : // TODO Removing from the bottom of the layer map is expensive.
316 434 : // Maybe instead discard all layer map historic versions that
317 434 : // won't be needed for page reconstruction for this timeline,
318 434 : // and mark what we can't delete yet as deleted from the layer
319 434 : // map index without actually rebuilding the index.
320 434 : updates.remove_historic(desc);
321 434 : mapping.remove(layer);
322 434 : layer.delete_on_drop();
323 434 : }
324 :
325 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
326 20 : // for small layer maps, we most likely have all resident, but for larger more are likely
327 20 : // to be evicted assuming lots of layers correlated with longer lifespan.
328 20 :
329 24 : self.layer_map().iter_historic_layers().filter_map(|desc| {
330 24 : self.layer_fmgr
331 24 : .0
332 24 : .get(&desc.key())
333 24 : .filter(|l| l.is_likely_resident())
334 24 : .cloned()
335 24 : })
336 20 : }
337 :
338 308 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
339 308 : self.layer_fmgr.contains(layer)
340 308 : }
341 :
342 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
343 0 : self.layer_fmgr.0.keys().cloned().collect_vec()
344 0 : }
345 : }
346 :
347 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
348 :
349 : impl<T> Default for LayerFileManager<T> {
350 383 : fn default() -> Self {
351 383 : Self(HashMap::default())
352 383 : }
353 : }
354 :
355 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
356 277061 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
357 277061 : // The assumption for the `expect()` is that all code maintains the following invariant:
358 277061 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
359 277061 : self.0
360 277061 : .get(&desc.key())
361 277061 : .with_context(|| format!("get layer from desc: {}", desc.layer_name()))
362 277061 : .expect("not found")
363 277061 : .clone()
364 277061 : }
365 :
366 1557 : pub(crate) fn insert(&mut self, layer: T) {
367 1557 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
368 1557 : if present.is_some() && cfg!(debug_assertions) {
369 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
370 1557 : }
371 1557 : }
372 :
373 308 : pub(crate) fn contains(&self, layer: &T) -> bool {
374 308 : self.0.contains_key(&layer.layer_desc().key())
375 308 : }
376 :
377 434 : pub(crate) fn remove(&mut self, layer: &T) {
378 434 : let present = self.0.remove(&layer.layer_desc().key());
379 434 : if present.is_none() && cfg!(debug_assertions) {
380 0 : panic!(
381 0 : "removing layer that is not present in layer mapping: {:?}",
382 0 : layer.layer_desc()
383 0 : )
384 434 : }
385 434 : }
386 : }
|