Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use itertools::Itertools;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : context::RequestContext,
14 : metrics::TimelineMetrics,
15 : tenant::{
16 : layer_map::{BatchedUpdates, LayerMap},
17 : storage_layer::{
18 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
19 : ResidentLayer,
20 : },
21 : },
22 : };
23 :
24 : use super::TimelineWriterState;
25 :
26 : /// Provides semantic APIs to manipulate the layer map.
27 : #[derive(Default)]
28 : pub(crate) struct LayerManager {
29 : layer_map: LayerMap,
30 : layer_fmgr: LayerFileManager<Layer>,
31 : }
32 :
33 : impl LayerManager {
34 275811 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
35 275811 : self.layer_fmgr.get_from_desc(desc)
36 275811 : }
37 :
38 : /// Get an immutable reference to the layer map.
39 : ///
40 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
41 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
42 820240 : pub(crate) fn layer_map(&self) -> &LayerMap {
43 820240 : &self.layer_map
44 820240 : }
45 :
46 : /// Called from `load_layer_map`. Initialize the layer manager with:
47 : /// 1. all on-disk layers
48 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
49 6 : pub(crate) fn initialize_local_layers(
50 6 : &mut self,
51 6 : on_disk_layers: Vec<Layer>,
52 6 : next_open_layer_at: Lsn,
53 6 : ) {
54 6 : let mut updates = self.layer_map.batch_update();
55 22 : for layer in on_disk_layers {
56 16 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
57 16 : }
58 6 : updates.flush();
59 6 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
60 6 : }
61 :
62 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
63 373 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
64 373 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
65 373 : }
66 :
67 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
68 : /// called within `get_layer_for_write`.
69 1239 : pub(crate) async fn get_layer_for_write(
70 1239 : &mut self,
71 1239 : lsn: Lsn,
72 1239 : last_record_lsn: Lsn,
73 1239 : conf: &'static PageServerConf,
74 1239 : timeline_id: TimelineId,
75 1239 : tenant_shard_id: TenantShardId,
76 1239 : ctx: &RequestContext,
77 1239 : ) -> Result<Arc<InMemoryLayer>> {
78 1239 : ensure!(lsn.is_aligned());
79 :
80 1239 : ensure!(
81 1239 : lsn > last_record_lsn,
82 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
83 : lsn,
84 : last_record_lsn,
85 : );
86 :
87 : // Do we have a layer open for writing already?
88 1239 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
89 0 : if open_layer.get_lsn_range().start > lsn {
90 0 : bail!(
91 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
92 0 : open_layer.get_lsn_range().start,
93 0 : lsn
94 0 : );
95 0 : }
96 0 :
97 0 : Arc::clone(open_layer)
98 : } else {
99 : // No writeable layer yet. Create one.
100 1239 : let start_lsn = self
101 1239 : .layer_map
102 1239 : .next_open_layer_at
103 1239 : .context("No next open layer found")?;
104 :
105 1239 : trace!(
106 0 : "creating in-memory layer at {}/{} for record at {}",
107 : timeline_id,
108 : start_lsn,
109 : lsn
110 : );
111 :
112 1239 : let new_layer =
113 1239 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
114 1239 : let layer = Arc::new(new_layer);
115 1239 :
116 1239 : self.layer_map.open_layer = Some(layer.clone());
117 1239 : self.layer_map.next_open_layer_at = None;
118 1239 :
119 1239 : layer
120 : };
121 :
122 1239 : Ok(layer)
123 1239 : }
124 :
125 : /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
126 : ///
127 : /// Returns true if anything was frozen.
128 1137 : pub(super) async fn try_freeze_in_memory_layer(
129 1137 : &mut self,
130 1137 : lsn: Lsn,
131 1137 : last_freeze_at: &AtomicLsn,
132 1137 : write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
133 1137 : ) -> bool {
134 1137 : let Lsn(last_record_lsn) = lsn;
135 1137 : let end_lsn = Lsn(last_record_lsn + 1);
136 :
137 1137 : let froze = if let Some(open_layer) = &self.layer_map.open_layer {
138 1109 : let open_layer_rc = Arc::clone(open_layer);
139 1109 : open_layer.freeze(end_lsn).await;
140 :
141 : // The layer is no longer open, update the layer map to reflect this.
142 : // We will replace it with on-disk historics below.
143 1109 : self.layer_map.frozen_layers.push_back(open_layer_rc);
144 1109 : self.layer_map.open_layer = None;
145 1109 : self.layer_map.next_open_layer_at = Some(end_lsn);
146 1109 :
147 1109 : true
148 : } else {
149 28 : false
150 : };
151 :
152 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
153 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
154 1137 : last_freeze_at.store(end_lsn);
155 1137 :
156 1137 : // the writer state must no longer have a reference to the frozen layer
157 1137 : let taken = write_lock.take();
158 1137 : assert_eq!(
159 1137 : froze,
160 1137 : taken.is_some(),
161 0 : "should only had frozen a layer when TimelineWriterState existed"
162 : );
163 :
164 1137 : froze
165 1137 : }
166 :
167 : /// Add image layers to the layer map, called from `create_image_layers`.
168 504 : pub(crate) fn track_new_image_layers(
169 504 : &mut self,
170 504 : image_layers: &[ResidentLayer],
171 504 : metrics: &TimelineMetrics,
172 504 : ) {
173 504 : let mut updates = self.layer_map.batch_update();
174 684 : for layer in image_layers {
175 180 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
176 180 :
177 180 : // record these here instead of Layer::finish_creating because otherwise partial
178 180 : // failure with create_image_layers would balloon up the physical size gauge. downside
179 180 : // is that all layers need to be created before metrics are updated.
180 180 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
181 180 : }
182 504 : updates.flush();
183 504 : }
184 :
185 : /// Flush a frozen layer and add the written delta layer to the layer map.
186 1108 : pub(crate) fn finish_flush_l0_layer(
187 1108 : &mut self,
188 1108 : delta_layer: Option<&ResidentLayer>,
189 1108 : frozen_layer_for_check: &Arc<InMemoryLayer>,
190 1108 : metrics: &TimelineMetrics,
191 1108 : ) {
192 1108 : let inmem = self
193 1108 : .layer_map
194 1108 : .frozen_layers
195 1108 : .pop_front()
196 1108 : .expect("there must be a inmem layer to flush");
197 1108 :
198 1108 : // Only one task may call this function at a time (for this
199 1108 : // timeline). If two tasks tried to flush the same frozen
200 1108 : // layer to disk at the same time, that would not work.
201 1108 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
202 :
203 1108 : if let Some(l) = delta_layer {
204 968 : let mut updates = self.layer_map.batch_update();
205 968 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
206 968 : metrics.record_new_file_metrics(l.layer_desc().file_size);
207 968 : updates.flush();
208 968 : }
209 1108 : }
210 :
211 : /// Called when compaction is completed.
212 30 : pub(crate) fn finish_compact_l0(
213 30 : &mut self,
214 30 : compact_from: &[Layer],
215 30 : compact_to: &[ResidentLayer],
216 30 : metrics: &TimelineMetrics,
217 30 : ) {
218 30 : let mut updates = self.layer_map.batch_update();
219 342 : for l in compact_to {
220 312 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
221 312 : metrics.record_new_file_metrics(l.layer_desc().file_size);
222 312 : }
223 440 : for l in compact_from {
224 410 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
225 410 : }
226 30 : updates.flush();
227 30 : }
228 :
229 : /// Called when a GC-compaction is completed.
230 : #[cfg(test)]
231 2 : pub(crate) fn finish_gc_compaction(
232 2 : &mut self,
233 2 : compact_from: &[Layer],
234 2 : compact_to: &[ResidentLayer],
235 2 : metrics: &TimelineMetrics,
236 2 : ) {
237 2 : // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
238 2 : self.finish_compact_l0(compact_from, compact_to, metrics)
239 2 : }
240 :
241 : /// Called when compaction is completed.
242 0 : pub(crate) fn rewrite_layers(
243 0 : &mut self,
244 0 : rewrite_layers: &[(Layer, ResidentLayer)],
245 0 : drop_layers: &[Layer],
246 0 : metrics: &TimelineMetrics,
247 0 : ) {
248 0 : let mut updates = self.layer_map.batch_update();
249 0 : for (old_layer, new_layer) in rewrite_layers {
250 0 : debug_assert_eq!(
251 0 : old_layer.layer_desc().key_range,
252 0 : new_layer.layer_desc().key_range
253 : );
254 0 : debug_assert_eq!(
255 0 : old_layer.layer_desc().lsn_range,
256 0 : new_layer.layer_desc().lsn_range
257 : );
258 :
259 : // Safety: we may never rewrite the same file in-place. Callers are responsible
260 : // for ensuring that they only rewrite layers after something changes the path,
261 : // such as an increment in the generation number.
262 0 : assert_ne!(old_layer.local_path(), new_layer.local_path());
263 :
264 0 : Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
265 0 :
266 0 : Self::insert_historic_layer(
267 0 : new_layer.as_ref().clone(),
268 0 : &mut updates,
269 0 : &mut self.layer_fmgr,
270 0 : );
271 0 :
272 0 : metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
273 : }
274 0 : for l in drop_layers {
275 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
276 0 : }
277 0 : updates.flush();
278 0 : }
279 :
280 : /// Called when garbage collect has selected the layers to be removed.
281 12 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
282 12 : let mut updates = self.layer_map.batch_update();
283 28 : for doomed_layer in gc_layers {
284 16 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
285 16 : }
286 12 : updates.flush()
287 12 : }
288 :
289 : #[cfg(test)]
290 64 : pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
291 64 : let mut updates = self.layer_map.batch_update();
292 64 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
293 64 : updates.flush()
294 64 : }
295 :
296 : /// Helper function to insert a layer into the layer map and file manager.
297 1540 : fn insert_historic_layer(
298 1540 : layer: Layer,
299 1540 : updates: &mut BatchedUpdates<'_>,
300 1540 : mapping: &mut LayerFileManager<Layer>,
301 1540 : ) {
302 1540 : updates.insert_historic(layer.layer_desc().clone());
303 1540 : mapping.insert(layer);
304 1540 : }
305 :
306 : /// Removes the layer from local FS (if present) and from memory.
307 : /// Remote storage is not affected by this operation.
308 426 : fn delete_historic_layer(
309 426 : // we cannot remove layers otherwise, since gc and compaction will race
310 426 : layer: &Layer,
311 426 : updates: &mut BatchedUpdates<'_>,
312 426 : mapping: &mut LayerFileManager<Layer>,
313 426 : ) {
314 426 : let desc = layer.layer_desc();
315 426 :
316 426 : // TODO Removing from the bottom of the layer map is expensive.
317 426 : // Maybe instead discard all layer map historic versions that
318 426 : // won't be needed for page reconstruction for this timeline,
319 426 : // and mark what we can't delete yet as deleted from the layer
320 426 : // map index without actually rebuilding the index.
321 426 : updates.remove_historic(desc);
322 426 : mapping.remove(layer);
323 426 : layer.delete_on_drop();
324 426 : }
325 :
326 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
327 20 : // for small layer maps, we most likely have all resident, but for larger more are likely
328 20 : // to be evicted assuming lots of layers correlated with longer lifespan.
329 20 :
330 24 : self.layer_map().iter_historic_layers().filter_map(|desc| {
331 24 : self.layer_fmgr
332 24 : .0
333 24 : .get(&desc.key())
334 24 : .filter(|l| l.is_likely_resident())
335 24 : .cloned()
336 24 : })
337 20 : }
338 :
339 308 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
340 308 : self.layer_fmgr.contains(layer)
341 308 : }
342 :
343 0 : pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
344 0 : self.layer_fmgr.0.keys().cloned().collect_vec()
345 0 : }
346 : }
347 :
348 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
349 :
350 : impl<T> Default for LayerFileManager<T> {
351 379 : fn default() -> Self {
352 379 : Self(HashMap::default())
353 379 : }
354 : }
355 :
356 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
357 275811 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
358 275811 : // The assumption for the `expect()` is that all code maintains the following invariant:
359 275811 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
360 275811 : self.0
361 275811 : .get(&desc.key())
362 275811 : .with_context(|| format!("get layer from desc: {}", desc.layer_name()))
363 275811 : .expect("not found")
364 275811 : .clone()
365 275811 : }
366 :
367 1540 : pub(crate) fn insert(&mut self, layer: T) {
368 1540 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
369 1540 : if present.is_some() && cfg!(debug_assertions) {
370 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
371 1540 : }
372 1540 : }
373 :
374 308 : pub(crate) fn contains(&self, layer: &T) -> bool {
375 308 : self.0.contains_key(&layer.layer_desc().key())
376 308 : }
377 :
378 426 : pub(crate) fn remove(&mut self, layer: &T) {
379 426 : let present = self.0.remove(&layer.layer_desc().key());
380 426 : if present.is_none() && cfg!(debug_assertions) {
381 0 : panic!(
382 0 : "removing layer that is not present in layer mapping: {:?}",
383 0 : layer.layer_desc()
384 0 : )
385 426 : }
386 426 : }
387 : }
|