Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use std::{collections::HashMap, sync::Arc};
3 : use tracing::trace;
4 : use utils::{
5 : id::{TenantId, TimelineId},
6 : lsn::{AtomicLsn, Lsn},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : metrics::TimelineMetrics,
12 : tenant::{
13 : layer_map::{BatchedUpdates, LayerMap},
14 : storage_layer::{
15 : AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
16 : PersistentLayerDesc, PersistentLayerKey,
17 : },
18 : timeline::compare_arced_layers,
19 : },
20 : };
21 :
22 : /// Provides semantic APIs to manipulate the layer map.
23 : pub(crate) struct LayerManager {
24 : layer_map: LayerMap,
25 : layer_fmgr: LayerFileManager,
26 : }
27 :
28 : /// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
29 : /// scheduling deletes in remote client.
30 : pub(crate) struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
31 :
32 : impl ApplyGcResultGuard<'_> {
33 7 : pub(crate) fn flush(self) {
34 7 : self.0.flush();
35 7 : }
36 : }
37 :
38 : impl LayerManager {
39 1394 : pub(crate) fn create() -> Self {
40 1394 : Self {
41 1394 : layer_map: LayerMap::default(),
42 1394 : layer_fmgr: LayerFileManager::new(),
43 1394 : }
44 1394 : }
45 :
46 45716425 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
47 45716425 : self.layer_fmgr.get_from_desc(desc)
48 45716425 : }
49 :
50 : /// Get an immutable reference to the layer map.
51 : ///
52 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
53 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
54 52824776 : pub(crate) fn layer_map(&self) -> &LayerMap {
55 52824776 : &self.layer_map
56 52824776 : }
57 :
58 : /// Replace layers in the layer file manager, used in evictions and layer downloads.
59 1217 : pub(crate) fn replace_and_verify(
60 1217 : &mut self,
61 1217 : expected: Arc<dyn PersistentLayer>,
62 1217 : new: Arc<dyn PersistentLayer>,
63 1217 : ) -> Result<()> {
64 1217 : self.layer_fmgr.replace_and_verify(expected, new)
65 1217 : }
66 :
67 : /// Called from `load_layer_map`. Initialize the layer manager with:
68 : /// 1. all on-disk layers
69 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
70 326 : pub(crate) fn initialize_local_layers(
71 326 : &mut self,
72 326 : on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
73 326 : next_open_layer_at: Lsn,
74 326 : ) {
75 326 : let mut updates = self.layer_map.batch_update();
76 6666 : for layer in on_disk_layers {
77 6340 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
78 6340 : }
79 326 : updates.flush();
80 326 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
81 326 : }
82 :
83 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
84 1042 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
85 1042 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
86 1042 : }
87 :
88 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
89 : /// called within `get_layer_for_write`.
90 82563776 : pub(crate) fn get_layer_for_write(
91 82563776 : &mut self,
92 82563776 : lsn: Lsn,
93 82563776 : last_record_lsn: Lsn,
94 82563776 : conf: &'static PageServerConf,
95 82563776 : timeline_id: TimelineId,
96 82563776 : tenant_id: TenantId,
97 82563776 : ) -> Result<Arc<InMemoryLayer>> {
98 82563776 : ensure!(lsn.is_aligned());
99 :
100 82563776 : ensure!(
101 82563776 : lsn > last_record_lsn,
102 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
103 : lsn,
104 : last_record_lsn,
105 : );
106 :
107 : // Do we have a layer open for writing already?
108 82563776 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
109 82556908 : if open_layer.get_lsn_range().start > lsn {
110 0 : bail!(
111 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
112 0 : open_layer.get_lsn_range().start,
113 0 : lsn
114 0 : );
115 82556908 : }
116 82556908 :
117 82556908 : Arc::clone(open_layer)
118 : } else {
119 : // No writeable layer yet. Create one.
120 6868 : let start_lsn = self
121 6868 : .layer_map
122 6868 : .next_open_layer_at
123 6868 : .context("No next open layer found")?;
124 :
125 6868 : trace!(
126 0 : "creating in-memory layer at {}/{} for record at {}",
127 0 : timeline_id,
128 0 : start_lsn,
129 0 : lsn
130 0 : );
131 :
132 6868 : let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
133 6868 : let layer = Arc::new(new_layer);
134 6868 :
135 6868 : self.layer_map.open_layer = Some(layer.clone());
136 6868 : self.layer_map.next_open_layer_at = None;
137 6868 :
138 6868 : layer
139 : };
140 :
141 82563776 : Ok(layer)
142 82563776 : }
143 :
144 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
145 6753 : pub(crate) async fn try_freeze_in_memory_layer(
146 6753 : &mut self,
147 6753 : Lsn(last_record_lsn): Lsn,
148 6753 : last_freeze_at: &AtomicLsn,
149 6753 : ) {
150 6753 : let end_lsn = Lsn(last_record_lsn + 1);
151 :
152 6753 : if let Some(open_layer) = &self.layer_map.open_layer {
153 6562 : let open_layer_rc = Arc::clone(open_layer);
154 6562 : // Does this layer need freezing?
155 6562 : open_layer.freeze(end_lsn).await;
156 :
157 : // The layer is no longer open, update the layer map to reflect this.
158 : // We will replace it with on-disk historics below.
159 6562 : self.layer_map.frozen_layers.push_back(open_layer_rc);
160 6562 : self.layer_map.open_layer = None;
161 6562 : self.layer_map.next_open_layer_at = Some(end_lsn);
162 6562 : last_freeze_at.store(end_lsn);
163 191 : }
164 6753 : }
165 :
166 : /// Add image layers to the layer map, called from `create_image_layers`.
167 1435 : pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
168 1435 : let mut updates = self.layer_map.batch_update();
169 2547 : for layer in image_layers {
170 1112 : Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
171 1112 : }
172 1435 : updates.flush();
173 1435 : }
174 :
175 : /// Flush a frozen layer and add the written delta layer to the layer map.
176 6513 : pub(crate) fn finish_flush_l0_layer(
177 6513 : &mut self,
178 6513 : delta_layer: Option<DeltaLayer>,
179 6513 : frozen_layer_for_check: &Arc<InMemoryLayer>,
180 6513 : ) {
181 6513 : let l = self.layer_map.frozen_layers.pop_front();
182 6513 : let mut updates = self.layer_map.batch_update();
183 6513 :
184 6513 : // Only one thread may call this function at a time (for this
185 6513 : // timeline). If two threads tried to flush the same frozen
186 6513 : // layer to disk at the same time, that would not work.
187 6513 : assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
188 :
189 6513 : if let Some(delta_layer) = delta_layer {
190 6476 : Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
191 6476 : }
192 6513 : updates.flush();
193 6513 : }
194 :
195 : /// Called when compaction is completed.
196 321 : pub(crate) fn finish_compact_l0(
197 321 : &mut self,
198 321 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
199 321 : compact_from: Vec<Arc<dyn PersistentLayer>>,
200 321 : compact_to: Vec<Arc<dyn PersistentLayer>>,
201 321 : metrics: &TimelineMetrics,
202 321 : ) -> Result<()> {
203 321 : let mut updates = self.layer_map.batch_update();
204 8523 : for l in compact_to {
205 8202 : Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
206 8202 : }
207 4717 : for l in compact_from {
208 : // NB: the layer file identified by descriptor `l` is guaranteed to be present
209 : // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
210 : // time, even though we dropped `Timeline::layers` inbetween.
211 4396 : Self::delete_historic_layer(
212 4396 : layer_removal_cs.clone(),
213 4396 : l,
214 4396 : &mut updates,
215 4396 : metrics,
216 4396 : &mut self.layer_fmgr,
217 4396 : )?;
218 : }
219 321 : updates.flush();
220 321 : Ok(())
221 321 : }
222 :
223 : /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
224 12 : pub(crate) fn finish_gc_timeline(
225 12 : &mut self,
226 12 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
227 12 : gc_layers: Vec<Arc<dyn PersistentLayer>>,
228 12 : metrics: &TimelineMetrics,
229 12 : ) -> Result<ApplyGcResultGuard> {
230 12 : let mut updates = self.layer_map.batch_update();
231 745 : for doomed_layer in gc_layers {
232 733 : Self::delete_historic_layer(
233 733 : layer_removal_cs.clone(),
234 733 : doomed_layer,
235 733 : &mut updates,
236 733 : metrics,
237 733 : &mut self.layer_fmgr,
238 733 : )?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
239 : }
240 12 : Ok(ApplyGcResultGuard(updates))
241 12 : }
242 :
243 : /// Helper function to insert a layer into the layer map and file manager.
244 22130 : fn insert_historic_layer(
245 22130 : layer: Arc<dyn PersistentLayer>,
246 22130 : updates: &mut BatchedUpdates<'_>,
247 22130 : mapping: &mut LayerFileManager,
248 22130 : ) {
249 22130 : updates.insert_historic(layer.layer_desc().clone());
250 22130 : mapping.insert(layer);
251 22130 : }
252 :
253 : /// Removes the layer from local FS (if present) and from memory.
254 : /// Remote storage is not affected by this operation.
255 5129 : fn delete_historic_layer(
256 5129 : // we cannot remove layers otherwise, since gc and compaction will race
257 5129 : _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
258 5129 : layer: Arc<dyn PersistentLayer>,
259 5129 : updates: &mut BatchedUpdates<'_>,
260 5129 : metrics: &TimelineMetrics,
261 5129 : mapping: &mut LayerFileManager,
262 5129 : ) -> anyhow::Result<()> {
263 5129 : let desc = layer.layer_desc();
264 5129 : if !layer.is_remote_layer() {
265 5127 : layer.delete_resident_layer_file()?;
266 5127 : metrics.resident_physical_size_gauge.sub(desc.file_size);
267 2 : }
268 :
269 : // TODO Removing from the bottom of the layer map is expensive.
270 : // Maybe instead discard all layer map historic versions that
271 : // won't be needed for page reconstruction for this timeline,
272 : // and mark what we can't delete yet as deleted from the layer
273 : // map index without actually rebuilding the index.
274 5129 : updates.remove_historic(desc);
275 5129 : mapping.remove(layer);
276 5129 :
277 5129 : Ok(())
278 5129 : }
279 :
280 11041 : pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
281 11041 : self.layer_fmgr.contains(layer)
282 11041 : }
283 : }
284 :
285 : pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
286 : HashMap<PersistentLayerKey, Arc<T>>,
287 : );
288 :
289 : impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
290 45716453 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
291 45716453 : // The assumption for the `expect()` is that all code maintains the following invariant:
292 45716453 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
293 45716453 : self.0
294 45716453 : .get(&desc.key())
295 45716453 : .with_context(|| format!("get layer from desc: {}", desc.filename()))
296 45716453 : .expect("not found")
297 45716453 : .clone()
298 45716453 : }
299 :
300 22133 : pub(crate) fn insert(&mut self, layer: Arc<T>) {
301 22133 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
302 22133 : if present.is_some() && cfg!(debug_assertions) {
303 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
304 22133 : }
305 22133 : }
306 :
307 11041 : pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
308 11041 : self.0.contains_key(&layer.layer_desc().key())
309 11041 : }
310 :
311 1398 : pub(crate) fn new() -> Self {
312 1398 : Self(HashMap::new())
313 1398 : }
314 :
315 5129 : pub(crate) fn remove(&mut self, layer: Arc<T>) {
316 5129 : let present = self.0.remove(&layer.layer_desc().key());
317 5129 : if present.is_none() && cfg!(debug_assertions) {
318 0 : panic!(
319 0 : "removing layer that is not present in layer mapping: {:?}",
320 0 : layer.layer_desc()
321 0 : )
322 5129 : }
323 5129 : }
324 :
325 1221 : pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
326 1221 : let key = expected.layer_desc().key();
327 1221 : let other = new.layer_desc().key();
328 1221 :
329 1221 : let expected_l0 = LayerMap::is_l0(expected.layer_desc());
330 1221 : let new_l0 = LayerMap::is_l0(new.layer_desc());
331 1221 :
332 1221 : fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
333 1 : "layermap-replace-notfound"
334 1221 : ));
335 :
336 1220 : anyhow::ensure!(
337 1220 : key == other,
338 0 : "expected and new layer have different keys: {key:?} != {other:?}"
339 : );
340 :
341 1220 : anyhow::ensure!(
342 1220 : expected_l0 == new_l0,
343 0 : "one layer is l0 while the other is not: {expected_l0} != {new_l0}"
344 : );
345 :
346 1220 : if let Some(layer) = self.0.get_mut(&key) {
347 1219 : anyhow::ensure!(
348 1219 : compare_arced_layers(&expected, layer),
349 1 : "another layer was found instead of expected, expected={expected:?}, new={new:?}",
350 1 : expected = Arc::as_ptr(&expected),
351 1 : new = Arc::as_ptr(layer),
352 : );
353 1218 : *layer = new;
354 1218 : Ok(())
355 : } else {
356 1 : anyhow::bail!("layer was not found");
357 : }
358 1221 : }
359 : }
|