Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use pageserver_api::shard::TenantShardId;
3 : use std::{collections::HashMap, sync::Arc};
4 : use tracing::trace;
5 : use utils::{
6 : id::TimelineId,
7 : lsn::{AtomicLsn, Lsn},
8 : };
9 :
10 : use crate::{
11 : config::PageServerConf,
12 : context::RequestContext,
13 : metrics::TimelineMetrics,
14 : tenant::{
15 : layer_map::{BatchedUpdates, LayerMap},
16 : storage_layer::{
17 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
18 : ResidentLayer,
19 : },
20 : },
21 : };
22 :
23 : /// Provides semantic APIs to manipulate the layer map.
24 : #[derive(Default)]
25 : pub(crate) struct LayerManager {
26 : layer_map: LayerMap,
27 : layer_fmgr: LayerFileManager<Layer>,
28 : }
29 :
30 : impl LayerManager {
31 276061 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
32 276061 : self.layer_fmgr.get_from_desc(desc)
33 276061 : }
34 :
35 : /// Get an immutable reference to the layer map.
36 : ///
37 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
38 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
39 820285 : pub(crate) fn layer_map(&self) -> &LayerMap {
40 820285 : &self.layer_map
41 820285 : }
42 :
43 : /// Called from `load_layer_map`. Initialize the layer manager with:
44 : /// 1. all on-disk layers
45 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
46 6 : pub(crate) fn initialize_local_layers(
47 6 : &mut self,
48 6 : on_disk_layers: Vec<Layer>,
49 6 : next_open_layer_at: Lsn,
50 6 : ) {
51 6 : let mut updates = self.layer_map.batch_update();
52 22 : for layer in on_disk_layers {
53 16 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
54 16 : }
55 6 : updates.flush();
56 6 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
57 6 : }
58 :
59 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
60 346 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
61 346 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
62 346 : }
63 :
64 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
65 : /// called within `get_layer_for_write`.
66 4804100 : pub(crate) async fn get_layer_for_write(
67 4804100 : &mut self,
68 4804100 : lsn: Lsn,
69 4804100 : last_record_lsn: Lsn,
70 4804100 : conf: &'static PageServerConf,
71 4804100 : timeline_id: TimelineId,
72 4804100 : tenant_shard_id: TenantShardId,
73 4804100 : ctx: &RequestContext,
74 4804100 : ) -> Result<Arc<InMemoryLayer>> {
75 4804100 : ensure!(lsn.is_aligned());
76 :
77 4804100 : ensure!(
78 4804100 : lsn > last_record_lsn,
79 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
80 : lsn,
81 : last_record_lsn,
82 : );
83 :
84 : // Do we have a layer open for writing already?
85 4804100 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
86 4802964 : if open_layer.get_lsn_range().start > lsn {
87 0 : bail!(
88 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
89 0 : open_layer.get_lsn_range().start,
90 0 : lsn
91 0 : );
92 4802964 : }
93 4802964 :
94 4802964 : Arc::clone(open_layer)
95 : } else {
96 : // No writeable layer yet. Create one.
97 1136 : let start_lsn = self
98 1136 : .layer_map
99 1136 : .next_open_layer_at
100 1136 : .context("No next open layer found")?;
101 :
102 1136 : trace!(
103 0 : "creating in-memory layer at {}/{} for record at {}",
104 : timeline_id,
105 : start_lsn,
106 : lsn
107 : );
108 :
109 1136 : let new_layer =
110 1136 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
111 1136 : let layer = Arc::new(new_layer);
112 1136 :
113 1136 : self.layer_map.open_layer = Some(layer.clone());
114 1136 : self.layer_map.next_open_layer_at = None;
115 1136 :
116 1136 : layer
117 : };
118 :
119 4804100 : Ok(layer)
120 4804100 : }
121 :
122 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
123 1038 : pub(crate) async fn try_freeze_in_memory_layer(
124 1038 : &mut self,
125 1038 : lsn: Lsn,
126 1038 : last_freeze_at: &AtomicLsn,
127 1038 : ) {
128 1038 : let Lsn(last_record_lsn) = lsn;
129 1038 : let end_lsn = Lsn(last_record_lsn + 1);
130 :
131 1038 : if let Some(open_layer) = &self.layer_map.open_layer {
132 1010 : let open_layer_rc = Arc::clone(open_layer);
133 1010 : // Does this layer need freezing?
134 1010 : open_layer.freeze(end_lsn).await;
135 :
136 : // The layer is no longer open, update the layer map to reflect this.
137 : // We will replace it with on-disk historics below.
138 1010 : self.layer_map.frozen_layers.push_back(open_layer_rc);
139 1010 : self.layer_map.open_layer = None;
140 1010 : self.layer_map.next_open_layer_at = Some(end_lsn);
141 28 : }
142 :
143 : // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
144 : // accounts for regions in the LSN range where we might have ingested no data due to sharding.
145 1038 : last_freeze_at.store(end_lsn);
146 1038 : }
147 :
148 : /// Add image layers to the layer map, called from `create_image_layers`.
149 480 : pub(crate) fn track_new_image_layers(
150 480 : &mut self,
151 480 : image_layers: &[ResidentLayer],
152 480 : metrics: &TimelineMetrics,
153 480 : ) {
154 480 : let mut updates = self.layer_map.batch_update();
155 638 : for layer in image_layers {
156 158 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
157 158 :
158 158 : // record these here instead of Layer::finish_creating because otherwise partial
159 158 : // failure with create_image_layers would balloon up the physical size gauge. downside
160 158 : // is that all layers need to be created before metrics are updated.
161 158 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
162 158 : }
163 480 : updates.flush();
164 480 : }
165 :
166 : /// Flush a frozen layer and add the written delta layer to the layer map.
167 1010 : pub(crate) fn finish_flush_l0_layer(
168 1010 : &mut self,
169 1010 : delta_layer: Option<&ResidentLayer>,
170 1010 : frozen_layer_for_check: &Arc<InMemoryLayer>,
171 1010 : metrics: &TimelineMetrics,
172 1010 : ) {
173 1010 : let inmem = self
174 1010 : .layer_map
175 1010 : .frozen_layers
176 1010 : .pop_front()
177 1010 : .expect("there must be a inmem layer to flush");
178 1010 :
179 1010 : // Only one task may call this function at a time (for this
180 1010 : // timeline). If two tasks tried to flush the same frozen
181 1010 : // layer to disk at the same time, that would not work.
182 1010 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
183 :
184 1010 : if let Some(l) = delta_layer {
185 896 : let mut updates = self.layer_map.batch_update();
186 896 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
187 896 : metrics.record_new_file_metrics(l.layer_desc().file_size);
188 896 : updates.flush();
189 896 : }
190 1010 : }
191 :
192 : /// Called when compaction is completed.
193 28 : pub(crate) fn finish_compact_l0(
194 28 : &mut self,
195 28 : compact_from: &[Layer],
196 28 : compact_to: &[ResidentLayer],
197 28 : metrics: &TimelineMetrics,
198 28 : ) {
199 28 : let mut updates = self.layer_map.batch_update();
200 256 : for l in compact_to {
201 228 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
202 228 : metrics.record_new_file_metrics(l.layer_desc().file_size);
203 228 : }
204 350 : for l in compact_from {
205 322 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
206 322 : }
207 28 : updates.flush();
208 28 : }
209 :
210 : /// Called when compaction is completed.
211 0 : pub(crate) fn rewrite_layers(
212 0 : &mut self,
213 0 : rewrite_layers: &[(Layer, ResidentLayer)],
214 0 : drop_layers: &[Layer],
215 0 : _metrics: &TimelineMetrics,
216 0 : ) {
217 0 : let mut updates = self.layer_map.batch_update();
218 0 :
219 0 : // TODO: implement rewrites (currently this code path only used for drops)
220 0 : assert!(rewrite_layers.is_empty());
221 :
222 0 : for l in drop_layers {
223 0 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
224 0 : }
225 0 : updates.flush();
226 0 : }
227 :
228 : /// Called when garbage collect has selected the layers to be removed.
229 10 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
230 10 : let mut updates = self.layer_map.batch_update();
231 24 : for doomed_layer in gc_layers {
232 14 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
233 14 : }
234 10 : updates.flush()
235 10 : }
236 :
237 : /// Helper function to insert a layer into the layer map and file manager.
238 1298 : fn insert_historic_layer(
239 1298 : layer: Layer,
240 1298 : updates: &mut BatchedUpdates<'_>,
241 1298 : mapping: &mut LayerFileManager<Layer>,
242 1298 : ) {
243 1298 : updates.insert_historic(layer.layer_desc().clone());
244 1298 : mapping.insert(layer);
245 1298 : }
246 :
247 : /// Removes the layer from local FS (if present) and from memory.
248 : /// Remote storage is not affected by this operation.
249 336 : fn delete_historic_layer(
250 336 : // we cannot remove layers otherwise, since gc and compaction will race
251 336 : layer: &Layer,
252 336 : updates: &mut BatchedUpdates<'_>,
253 336 : mapping: &mut LayerFileManager<Layer>,
254 336 : ) {
255 336 : let desc = layer.layer_desc();
256 336 :
257 336 : // TODO Removing from the bottom of the layer map is expensive.
258 336 : // Maybe instead discard all layer map historic versions that
259 336 : // won't be needed for page reconstruction for this timeline,
260 336 : // and mark what we can't delete yet as deleted from the layer
261 336 : // map index without actually rebuilding the index.
262 336 : updates.remove_historic(desc);
263 336 : mapping.remove(layer);
264 336 : layer.delete_on_drop();
265 336 : }
266 :
267 20 : pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
268 20 : // for small layer maps, we most likely have all resident, but for larger more are likely
269 20 : // to be evicted assuming lots of layers correlated with longer lifespan.
270 20 :
271 24 : self.layer_map().iter_historic_layers().filter_map(|desc| {
272 24 : self.layer_fmgr
273 24 : .0
274 24 : .get(&desc.key())
275 24 : .filter(|l| l.is_likely_resident())
276 24 : .cloned()
277 24 : })
278 20 : }
279 :
280 228 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
281 228 : self.layer_fmgr.contains(layer)
282 228 : }
283 : }
284 :
285 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
286 :
287 : impl<T> Default for LayerFileManager<T> {
288 352 : fn default() -> Self {
289 352 : Self(HashMap::default())
290 352 : }
291 : }
292 :
293 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
294 276061 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
295 276061 : // The assumption for the `expect()` is that all code maintains the following invariant:
296 276061 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
297 276061 : self.0
298 276061 : .get(&desc.key())
299 276061 : .with_context(|| format!("get layer from desc: {}", desc.layer_name()))
300 276061 : .expect("not found")
301 276061 : .clone()
302 276061 : }
303 :
304 1298 : pub(crate) fn insert(&mut self, layer: T) {
305 1298 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
306 1298 : if present.is_some() && cfg!(debug_assertions) {
307 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
308 1298 : }
309 1298 : }
310 :
311 228 : pub(crate) fn contains(&self, layer: &T) -> bool {
312 228 : self.0.contains_key(&layer.layer_desc().key())
313 228 : }
314 :
315 336 : pub(crate) fn remove(&mut self, layer: &T) {
316 336 : let present = self.0.remove(&layer.layer_desc().key());
317 336 : if present.is_none() && cfg!(debug_assertions) {
318 0 : panic!(
319 0 : "removing layer that is not present in layer mapping: {:?}",
320 0 : layer.layer_desc()
321 0 : )
322 336 : }
323 336 : }
324 : }
|