Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use pageserver_api::shard::TenantShardId;
3 : use std::{collections::HashMap, sync::Arc};
4 : use tracing::trace;
5 : use utils::{
6 : id::TimelineId,
7 : lsn::{AtomicLsn, Lsn},
8 : };
9 :
10 : use crate::{
11 : config::PageServerConf,
12 : metrics::TimelineMetrics,
13 : tenant::{
14 : layer_map::{BatchedUpdates, LayerMap},
15 : storage_layer::{
16 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
17 : ResidentLayer,
18 : },
19 : },
20 : };
21 :
22 : /// Provides semantic APIs to manipulate the layer map.
23 : #[derive(Default)]
24 : pub(crate) struct LayerManager {
25 : layer_map: LayerMap,
26 : layer_fmgr: LayerFileManager<Layer>,
27 : }
28 :
29 : impl LayerManager {
30 274206 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
31 274206 : self.layer_fmgr.get_from_desc(desc)
32 274206 : }
33 :
34 : /// Get an immutable reference to the layer map.
35 : ///
36 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
37 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
38 818167 : pub(crate) fn layer_map(&self) -> &LayerMap {
39 818167 : &self.layer_map
40 818167 : }
41 :
42 : /// Called from `load_layer_map`. Initialize the layer manager with:
43 : /// 1. all on-disk layers
44 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
45 6 : pub(crate) fn initialize_local_layers(
46 6 : &mut self,
47 6 : on_disk_layers: Vec<Layer>,
48 6 : next_open_layer_at: Lsn,
49 6 : ) {
50 6 : let mut updates = self.layer_map.batch_update();
51 22 : for layer in on_disk_layers {
52 16 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
53 16 : }
54 6 : updates.flush();
55 6 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
56 6 : }
57 :
58 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
59 328 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
60 328 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
61 328 : }
62 :
63 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
64 : /// called within `get_layer_for_write`.
65 4780028 : pub(crate) async fn get_layer_for_write(
66 4780028 : &mut self,
67 4780028 : lsn: Lsn,
68 4780028 : last_record_lsn: Lsn,
69 4780028 : conf: &'static PageServerConf,
70 4780028 : timeline_id: TimelineId,
71 4780028 : tenant_shard_id: TenantShardId,
72 4780028 : ) -> Result<Arc<InMemoryLayer>> {
73 4780028 : ensure!(lsn.is_aligned());
74 :
75 4780028 : ensure!(
76 4780028 : lsn > last_record_lsn,
77 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
78 : lsn,
79 : last_record_lsn,
80 : );
81 :
82 : // Do we have a layer open for writing already?
83 4780028 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
84 4778978 : if open_layer.get_lsn_range().start > lsn {
85 0 : bail!(
86 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
87 0 : open_layer.get_lsn_range().start,
88 0 : lsn
89 0 : );
90 4778978 : }
91 4778978 :
92 4778978 : Arc::clone(open_layer)
93 : } else {
94 : // No writeable layer yet. Create one.
95 1050 : let start_lsn = self
96 1050 : .layer_map
97 1050 : .next_open_layer_at
98 1050 : .context("No next open layer found")?;
99 :
100 1050 : trace!(
101 0 : "creating in-memory layer at {}/{} for record at {}",
102 : timeline_id,
103 : start_lsn,
104 : lsn
105 : );
106 :
107 1050 : let new_layer =
108 1050 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
109 1050 : let layer = Arc::new(new_layer);
110 1050 :
111 1050 : self.layer_map.open_layer = Some(layer.clone());
112 1050 : self.layer_map.next_open_layer_at = None;
113 1050 :
114 1050 : layer
115 : };
116 :
117 4780028 : Ok(layer)
118 4780028 : }
119 :
120 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
121 934 : pub(crate) async fn try_freeze_in_memory_layer(
122 934 : &mut self,
123 934 : lsn: Lsn,
124 934 : last_freeze_at: &AtomicLsn,
125 934 : ) {
126 934 : let Lsn(last_record_lsn) = lsn;
127 934 : let end_lsn = Lsn(last_record_lsn + 1);
128 :
129 934 : if let Some(open_layer) = &self.layer_map.open_layer {
130 928 : let open_layer_rc = Arc::clone(open_layer);
131 928 : // Does this layer need freezing?
132 928 : open_layer.freeze(end_lsn).await;
133 :
134 : // The layer is no longer open, update the layer map to reflect this.
135 : // We will replace it with on-disk historics below.
136 928 : self.layer_map.frozen_layers.push_back(open_layer_rc);
137 928 : self.layer_map.open_layer = None;
138 928 : self.layer_map.next_open_layer_at = Some(end_lsn);
139 6 : }
140 :
141 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
142 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
143 934 : last_freeze_at.store(end_lsn);
144 934 : }
145 :
146 : /// Add image layers to the layer map, called from `create_image_layers`.
147 762 : pub(crate) fn track_new_image_layers(
148 762 : &mut self,
149 762 : image_layers: &[ResidentLayer],
150 762 : metrics: &TimelineMetrics,
151 762 : ) {
152 762 : let mut updates = self.layer_map.batch_update();
153 878 : for layer in image_layers {
154 116 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
155 116 :
156 116 : // record these here instead of Layer::finish_creating because otherwise partial
157 116 : // failure with create_image_layers would balloon up the physical size gauge. downside
158 116 : // is that all layers need to be created before metrics are updated.
159 116 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
160 116 : }
161 762 : updates.flush();
162 762 : }
163 :
164 : /// Flush a frozen layer and add the written delta layer to the layer map.
165 928 : pub(crate) fn finish_flush_l0_layer(
166 928 : &mut self,
167 928 : delta_layer: Option<&ResidentLayer>,
168 928 : frozen_layer_for_check: &Arc<InMemoryLayer>,
169 928 : metrics: &TimelineMetrics,
170 928 : ) {
171 928 : let inmem = self
172 928 : .layer_map
173 928 : .frozen_layers
174 928 : .pop_front()
175 928 : .expect("there must be a inmem layer to flush");
176 928 :
177 928 : // Only one task may call this function at a time (for this
178 928 : // timeline). If two tasks tried to flush the same frozen
179 928 : // layer to disk at the same time, that would not work.
180 928 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
181 :
182 928 : if let Some(l) = delta_layer {
183 826 : let mut updates = self.layer_map.batch_update();
184 826 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
185 826 : metrics.record_new_file_metrics(l.layer_desc().file_size);
186 826 : updates.flush();
187 826 : }
188 928 : }
189 :
190 : /// Called when compaction is completed.
191 24 : pub(crate) fn finish_compact_l0(
192 24 : &mut self,
193 24 : compact_from: &[Layer],
194 24 : compact_to: &[ResidentLayer],
195 24 : metrics: &TimelineMetrics,
196 24 : ) {
197 24 : let mut updates = self.layer_map.batch_update();
198 248 : for l in compact_to {
199 224 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
200 224 : metrics.record_new_file_metrics(l.layer_desc().file_size);
201 224 : }
202 286 : for l in compact_from {
203 262 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
204 262 : }
205 24 : updates.flush();
206 24 : }
207 :
208 : /// Called when compaction is completed.
209 0 : pub(crate) fn rewrite_layers(
210 0 : &mut self,
211 0 : rewrite_layers: &[(Layer, ResidentLayer)],
212 0 : drop_layers: &[Layer],
213 0 : _metrics: &TimelineMetrics,
214 0 : ) {
215 0 : let mut updates = self.layer_map.batch_update();
216 0 :
217 0 : // TODO: implement rewrites (currently this code path only used for drops)
218 0 : assert!(rewrite_layers.is_empty());
219 :
220 0 : for l in drop_layers {
221 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
222 0 : }
223 0 : updates.flush();
224 0 : }
225 :
226 : /// Called when garbage collect has selected the layers to be removed.
227 6 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
228 6 : let mut updates = self.layer_map.batch_update();
229 14 : for doomed_layer in gc_layers {
230 8 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
231 8 : }
232 6 : updates.flush()
233 6 : }
234 :
235 : /// Helper function to insert a layer into the layer map and file manager.
236 1182 : fn insert_historic_layer(
237 1182 : layer: Layer,
238 1182 : updates: &mut BatchedUpdates<'_>,
239 1182 : mapping: &mut LayerFileManager<Layer>,
240 1182 : ) {
241 1182 : updates.insert_historic(layer.layer_desc().clone());
242 1182 : mapping.insert(layer);
243 1182 : }
244 :
245 : /// Removes the layer from local FS (if present) and from memory.
246 : /// Remote storage is not affected by this operation.
247 270 : fn delete_historic_layer(
248 270 : // we cannot remove layers otherwise, since gc and compaction will race
249 270 : layer: &Layer,
250 270 : updates: &mut BatchedUpdates<'_>,
251 270 : mapping: &mut LayerFileManager<Layer>,
252 270 : ) {
253 270 : let desc = layer.layer_desc();
254 270 :
255 270 : // TODO Removing from the bottom of the layer map is expensive.
256 270 : // Maybe instead discard all layer map historic versions that
257 270 : // won't be needed for page reconstruction for this timeline,
258 270 : // and mark what we can't delete yet as deleted from the layer
259 270 : // map index without actually rebuilding the index.
260 270 : updates.remove_historic(desc);
261 270 : mapping.remove(layer);
262 270 : layer.delete_on_drop();
263 270 : }
264 :
265 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
266 20 : // for small layer maps, we most likely have all resident, but for larger more are likely
267 20 : // to be evicted assuming lots of layers correlated with longer lifespan.
268 20 :
269 24 : self.layer_map().iter_historic_layers().filter_map(|desc| {
270 24 : self.layer_fmgr
271 24 : .0
272 24 : .get(&desc.key())
273 24 : .filter(|l| l.is_likely_resident())
274 24 : .cloned()
275 24 : })
276 20 : }
277 :
278 224 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
279 224 : self.layer_fmgr.contains(layer)
280 224 : }
281 : }
282 :
283 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
284 :
285 : impl<T> Default for LayerFileManager<T> {
286 334 : fn default() -> Self {
287 334 : Self(HashMap::default())
288 334 : }
289 : }
290 :
291 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
292 274206 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
293 274206 : // The assumption for the `expect()` is that all code maintains the following invariant:
294 274206 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
295 274206 : self.0
296 274206 : .get(&desc.key())
297 274206 : .with_context(|| format!("get layer from desc: {}", desc.layer_name()))
298 274206 : .expect("not found")
299 274206 : .clone()
300 274206 : }
301 :
302 1182 : pub(crate) fn insert(&mut self, layer: T) {
303 1182 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
304 1182 : if present.is_some() && cfg!(debug_assertions) {
305 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
306 1182 : }
307 1182 : }
308 :
309 224 : pub(crate) fn contains(&self, layer: &T) -> bool {
310 224 : self.0.contains_key(&layer.layer_desc().key())
311 224 : }
312 :
313 270 : pub(crate) fn remove(&mut self, layer: &T) {
314 270 : let present = self.0.remove(&layer.layer_desc().key());
315 270 : if present.is_none() && cfg!(debug_assertions) {
316 0 : panic!(
317 0 : "removing layer that is not present in layer mapping: {:?}",
318 0 : layer.layer_desc()
319 0 : )
320 270 : }
321 270 : }
322 : }
|