Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use itertools::Itertools;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : context::RequestContext,
14 : metrics::TimelineMetrics,
15 : tenant::{
16 : layer_map::{BatchedUpdates, LayerMap},
17 : storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
19 : ResidentLayer,
20 : },
21 : },
22 : };
23 :
24 : use super::TimelineWriterState;
25 :
26 : /// Provides semantic APIs to manipulate the layer map.
27 : #[derive(Default)]
28 : pub(crate) struct LayerManager {
29 : layer_map: LayerMap,
30 : layer_fmgr: LayerFileManager<Layer>,
31 : }
32 :
33 : impl LayerManager {
34 280227 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
35 280227 : self.layer_fmgr.get_from_desc(desc)
36 280227 : }
37 :
38 32 : pub(crate) fn get_from_key(&self, desc: &PersistentLayerKey) -> Layer {
39 32 : self.layer_fmgr.get_from_key(desc)
40 32 : }
41 :
42 : /// Get an immutable reference to the layer map.
43 : ///
44 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
45 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
46 1047755 : pub(crate) fn layer_map(&self) -> &LayerMap {
47 1047755 : &self.layer_map
48 1047755 : }
49 :
50 : /// Called from `load_layer_map`. Initialize the layer manager with:
51 : /// 1. all on-disk layers
52 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
53 6 : pub(crate) fn initialize_local_layers(
54 6 : &mut self,
55 6 : on_disk_layers: Vec<Layer>,
56 6 : next_open_layer_at: Lsn,
57 6 : ) {
58 6 : let mut updates = self.layer_map.batch_update();
59 22 : for layer in on_disk_layers {
60 16 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
61 16 : }
62 6 : updates.flush();
63 6 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
64 6 : }
65 :
66 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
67 392 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
68 392 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
69 392 : }
70 :
71 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
72 : /// called within `get_layer_for_write`.
73 1256 : pub(crate) async fn get_layer_for_write(
74 1256 : &mut self,
75 1256 : lsn: Lsn,
76 1256 : last_record_lsn: Lsn,
77 1256 : conf: &'static PageServerConf,
78 1256 : timeline_id: TimelineId,
79 1256 : tenant_shard_id: TenantShardId,
80 1256 : ctx: &RequestContext,
81 1256 : ) -> Result<Arc<InMemoryLayer>> {
82 1256 : ensure!(lsn.is_aligned());
83 :
84 1256 : ensure!(
85 1256 : lsn > last_record_lsn,
86 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
87 : lsn,
88 : last_record_lsn,
89 : );
90 :
91 : // Do we have a layer open for writing already?
92 1256 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
93 0 : if open_layer.get_lsn_range().start > lsn {
94 0 : bail!(
95 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
96 0 : open_layer.get_lsn_range().start,
97 0 : lsn
98 0 : );
99 0 : }
100 0 :
101 0 : Arc::clone(open_layer)
102 : } else {
103 : // No writeable layer yet. Create one.
104 1256 : let start_lsn = self
105 1256 : .layer_map
106 1256 : .next_open_layer_at
107 1256 : .context("No next open layer found")?;
108 :
109 1256 : trace!(
110 0 : "creating in-memory layer at {}/{} for record at {}",
111 : timeline_id,
112 : start_lsn,
113 : lsn
114 : );
115 :
116 1256 : let new_layer =
117 1256 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
118 1256 : let layer = Arc::new(new_layer);
119 1256 :
120 1256 : self.layer_map.open_layer = Some(layer.clone());
121 1256 : self.layer_map.next_open_layer_at = None;
122 1256 :
123 1256 : layer
124 : };
125 :
126 1256 : Ok(layer)
127 1256 : }
128 :
129 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
130 : ///
131 : /// Returns true if anything was frozen.
132 1154 : pub(super) async fn try_freeze_in_memory_layer(
133 1154 : &mut self,
134 1154 : lsn: Lsn,
135 1154 : last_freeze_at: &AtomicLsn,
136 1154 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
137 1154 : ) -> bool {
138 1154 : let Lsn(last_record_lsn) = lsn;
139 1154 : let end_lsn = Lsn(last_record_lsn + 1);
140 :
141 1154 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
142 1126 : let open_layer_rc = Arc::clone(open_layer);
143 1126 : open_layer.freeze(end_lsn).await;
144 :
145 : // The layer is no longer open, update the layer map to reflect this.
146 : // We will replace it with on-disk historics below.
147 1126 : self.layer_map.frozen_layers.push_back(open_layer_rc);
148 1126 : self.layer_map.open_layer = None;
149 1126 : self.layer_map.next_open_layer_at = Some(end_lsn);
150 1126 :
151 1126 : true
152 : } else {
153 28 : false
154 : };
155 :
156 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
157 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
158 1154 : last_freeze_at.store(end_lsn);
159 1154 :
160 1154 : // the writer state must no longer have a reference to the frozen layer
161 1154 : let taken = write_lock.take();
162 1154 : assert_eq!(
163 1154 : froze,
164 1154 : taken.is_some(),
165 0 : "should only had frozen a layer when TimelineWriterState existed"
166 : );
167 :
168 1154 : froze
169 1154 : }
170 :
171 : /// Add image layers to the layer map, called from `create_image_layers`.
172 522 : pub(crate) fn track_new_image_layers(
173 522 : &mut self,
174 522 : image_layers: &[ResidentLayer],
175 522 : metrics: &TimelineMetrics,
176 522 : ) {
177 522 : let mut updates = self.layer_map.batch_update();
178 720 : for layer in image_layers {
179 198 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
180 198 :
181 198 : // record these here instead of Layer::finish_creating because otherwise partial
182 198 : // failure with create_image_layers would balloon up the physical size gauge. downside
183 198 : // is that all layers need to be created before metrics are updated.
184 198 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
185 198 : }
186 522 : updates.flush();
187 522 : }
188 :
189 : /// Flush a frozen layer and add the written delta layer to the layer map.
190 1126 : pub(crate) fn finish_flush_l0_layer(
191 1126 : &mut self,
192 1126 : delta_layer: Option<&ResidentLayer>,
193 1126 : frozen_layer_for_check: &Arc<InMemoryLayer>,
194 1126 : metrics: &TimelineMetrics,
195 1126 : ) {
196 1126 : let inmem = self
197 1126 : .layer_map
198 1126 : .frozen_layers
199 1126 : .pop_front()
200 1126 : .expect("there must be a inmem layer to flush");
201 1126 :
202 1126 : // Only one task may call this function at a time (for this
203 1126 : // timeline). If two tasks tried to flush the same frozen
204 1126 : // layer to disk at the same time, that would not work.
205 1126 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
206 :
207 1126 : if let Some(l) = delta_layer {
208 968 : let mut updates = self.layer_map.batch_update();
209 968 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
210 968 : metrics.record_new_file_metrics(l.layer_desc().file_size);
211 968 : updates.flush();
212 968 : }
213 1126 : }
214 :
215 : /// Called when compaction is completed.
216 46 : pub(crate) fn finish_compact_l0(
217 46 : &mut self,
218 46 : compact_from: &[Layer],
219 46 : compact_to: &[ResidentLayer],
220 46 : metrics: &TimelineMetrics,
221 46 : ) {
222 46 : let mut updates = self.layer_map.batch_update();
223 388 : for l in compact_to {
224 342 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
225 342 : metrics.record_new_file_metrics(l.layer_desc().file_size);
226 342 : }
227 488 : for l in compact_from {
228 442 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
229 442 : }
230 46 : updates.flush();
231 46 : }
232 :
233 : /// Called when a GC-compaction is completed.
234 18 : pub(crate) fn finish_gc_compaction(
235 18 : &mut self,
236 18 : compact_from: &[Layer],
237 18 : compact_to: &[ResidentLayer],
238 18 : metrics: &TimelineMetrics,
239 18 : ) {
240 18 : // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
241 18 : self.finish_compact_l0(compact_from, compact_to, metrics)
242 18 : }
243 :
244 : /// Called when compaction is completed.
245 0 : pub(crate) fn rewrite_layers(
246 0 : &mut self,
247 0 : rewrite_layers: &[(Layer, ResidentLayer)],
248 0 : drop_layers: &[Layer],
249 0 : metrics: &TimelineMetrics,
250 0 : ) {
251 0 : let mut updates = self.layer_map.batch_update();
252 0 : for (old_layer, new_layer) in rewrite_layers {
253 0 : debug_assert_eq!(
254 0 : old_layer.layer_desc().key_range,
255 0 : new_layer.layer_desc().key_range
256 : );
257 0 : debug_assert_eq!(
258 0 : old_layer.layer_desc().lsn_range,
259 0 : new_layer.layer_desc().lsn_range
260 : );
261 :
262 : // Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
263 : // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
264 : // always marking rewritten layers as visible.
265 0 : new_layer
266 0 : .as_ref()
267 0 : .access_stats()
268 0 : .set_visibility(old_layer.access_stats().visibility());
269 0 :
270 0 : // Safety: we may never rewrite the same file in-place. Callers are responsible
271 0 : // for ensuring that they only rewrite layers after something changes the path,
272 0 : // such as an increment in the generation number.
273 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
274 :
275 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
276 0 :
277 0 : Self::insert_historic_layer(
278 0 : new_layer.as_ref().clone(),
279 0 : &mut updates,
280 0 : &mut self.layer_fmgr,
281 0 : );
282 0 :
283 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
284 : }
285 0 : for l in drop_layers {
286 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
287 0 : }
288 0 : updates.flush();
289 0 : }
290 :
291 : /// Called when garbage collect has selected the layers to be removed.
292 12 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
293 12 : let mut updates = self.layer_map.batch_update();
294 28 : for doomed_layer in gc_layers {
295 16 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
296 16 : }
297 12 : updates.flush()
298 12 : }
299 :
300 : #[cfg(test)]
301 88 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
302 88 : let mut updates = self.layer_map.batch_update();
303 88 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
304 88 : updates.flush()
305 88 : }
306 :
307 : /// Helper function to insert a layer into the layer map and file manager.
308 1612 : fn insert_historic_layer(
309 1612 : layer: Layer,
310 1612 : updates: &mut BatchedUpdates<'_>,
311 1612 : mapping: &mut LayerFileManager<Layer>,
312 1612 : ) {
313 1612 : updates.insert_historic(layer.layer_desc().clone());
314 1612 : mapping.insert(layer);
315 1612 : }
316 :
317 : /// Removes the layer from local FS (if present) and from memory.
318 : /// Remote storage is not affected by this operation.
319 458 : fn delete_historic_layer(
320 458 : // we cannot remove layers otherwise, since gc and compaction will race
321 458 : layer: &Layer,
322 458 : updates: &mut BatchedUpdates<'_>,
323 458 : mapping: &mut LayerFileManager<Layer>,
324 458 : ) {
325 458 : let desc = layer.layer_desc();
326 458 :
327 458 : // TODO Removing from the bottom of the layer map is expensive.
328 458 : // Maybe instead discard all layer map historic versions that
329 458 : // won't be needed for page reconstruction for this timeline,
330 458 : // and mark what we can't delete yet as deleted from the layer
331 458 : // map index without actually rebuilding the index.
332 458 : updates.remove_historic(desc);
333 458 : mapping.remove(layer);
334 458 : layer.delete_on_drop();
335 458 : }
336 :
337 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
338 20 : // for small layer maps, we most likely have all resident, but for larger more are likely
339 20 : // to be evicted assuming lots of layers correlated with longer lifespan.
340 20 :
341 24 : self.layer_map().iter_historic_layers().filter_map(|desc| {
342 24 : self.layer_fmgr
343 24 : .0
344 24 : .get(&desc.key())
345 24 : .filter(|l| l.is_likely_resident())
346 24 : .cloned()
347 24 : })
348 20 : }
349 :
350 308 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
351 308 : self.layer_fmgr.contains(layer)
352 308 : }
353 :
354 82 : pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
355 82 : self.layer_fmgr.contains_key(key)
356 82 : }
357 :
358 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
359 0 : self.layer_fmgr.0.keys().cloned().collect_vec()
360 0 : }
361 : }
362 :
363 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
364 :
365 : impl<T> Default for LayerFileManager<T> {
366 398 : fn default() -> Self {
367 398 : Self(HashMap::default())
368 398 : }
369 : }
370 :
371 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
372 280259 : fn get_from_key(&self, key: &PersistentLayerKey) -> T {
373 280259 : // The assumption for the `expect()` is that all code maintains the following invariant:
374 280259 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
375 280259 : self.0
376 280259 : .get(key)
377 280259 : .with_context(|| format!("get layer from key: {}", key))
378 280259 : .expect("not found")
379 280259 : .clone()
380 280259 : }
381 :
382 280227 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
383 280227 : self.get_from_key(&desc.key())
384 280227 : }
385 :
386 82 : fn contains_key(&self, key: &PersistentLayerKey) -> bool {
387 82 : self.0.contains_key(key)
388 82 : }
389 :
390 1612 : pub(crate) fn insert(&mut self, layer: T) {
391 1612 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
392 1612 : if present.is_some() && cfg!(debug_assertions) {
393 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
394 1612 : }
395 1612 : }
396 :
397 308 : pub(crate) fn contains(&self, layer: &T) -> bool {
398 308 : self.0.contains_key(&layer.layer_desc().key())
399 308 : }
400 :
401 458 : pub(crate) fn remove(&mut self, layer: &T) {
402 458 : let present = self.0.remove(&layer.layer_desc().key());
403 458 : if present.is_none() && cfg!(debug_assertions) {
404 0 : panic!(
405 0 : "removing layer that is not present in layer mapping: {:?}",
406 0 : layer.layer_desc()
407 0 : )
408 458 : }
409 458 : }
410 : }
|