TLA Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use std::{collections::HashMap, sync::Arc};
3 : use tracing::trace;
4 : use utils::{
5 : id::{TenantId, TimelineId},
6 : lsn::{AtomicLsn, Lsn},
7 : };
8 :
9 : use crate::{
10 : config::PageServerConf,
11 : metrics::TimelineMetrics,
12 : tenant::{
13 : layer_map::{BatchedUpdates, LayerMap},
14 : storage_layer::{
15 : AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
16 : PersistentLayerDesc, PersistentLayerKey,
17 : },
18 : timeline::compare_arced_layers,
19 : },
20 : };
21 :
22 : /// Provides semantic APIs to manipulate the layer map.
23 : pub(crate) struct LayerManager {
24 : layer_map: LayerMap,
25 : layer_fmgr: LayerFileManager,
26 : }
27 :
28 : /// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
29 : /// scheduling deletes in remote client.
30 : pub(crate) struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
31 :
32 : impl ApplyGcResultGuard<'_> {
33 CBC 20 : pub(crate) fn flush(self) {
34 20 : self.0.flush();
35 20 : }
36 : }
37 :
38 : impl LayerManager {
39 1302 : pub(crate) fn create() -> Self {
40 1302 : Self {
41 1302 : layer_map: LayerMap::default(),
42 1302 : layer_fmgr: LayerFileManager::new(),
43 1302 : }
44 1302 : }
45 :
46 23869889 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
47 23869889 : self.layer_fmgr.get_from_desc(desc)
48 23869889 : }
49 :
50 : /// Get an immutable reference to the layer map.
51 : ///
52 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
53 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
54 30228675 : pub(crate) fn layer_map(&self) -> &LayerMap {
55 30228675 : &self.layer_map
56 30228675 : }
57 :
58 : /// Replace layers in the layer file manager, used in evictions and layer downloads.
59 1961 : pub(crate) fn replace_and_verify(
60 1961 : &mut self,
61 1961 : expected: Arc<dyn PersistentLayer>,
62 1961 : new: Arc<dyn PersistentLayer>,
63 1961 : ) -> Result<()> {
64 1961 : self.layer_fmgr.replace_and_verify(expected, new)
65 1961 : }
66 :
67 : /// Called from `load_layer_map`. Initialize the layer manager with:
68 : /// 1. all on-disk layers
69 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
70 314 : pub(crate) fn initialize_local_layers(
71 314 : &mut self,
72 314 : on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
73 314 : next_open_layer_at: Lsn,
74 314 : ) {
75 314 : let mut updates = self.layer_map.batch_update();
76 8392 : for layer in on_disk_layers {
77 8078 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
78 8078 : }
79 314 : updates.flush();
80 314 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
81 314 : }
82 :
83 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
84 967 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
85 967 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
86 967 : }
87 :
88 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
89 : /// called within `get_layer_for_write`.
90 76639831 : pub(crate) async fn get_layer_for_write(
91 76639831 : &mut self,
92 76639831 : lsn: Lsn,
93 76639831 : last_record_lsn: Lsn,
94 76639831 : conf: &'static PageServerConf,
95 76639831 : timeline_id: TimelineId,
96 76639831 : tenant_id: TenantId,
97 76639945 : ) -> Result<Arc<InMemoryLayer>> {
98 76639945 : ensure!(lsn.is_aligned());
99 :
100 76639945 : ensure!(
101 76639945 : lsn > last_record_lsn,
102 UBC 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
103 : lsn,
104 : last_record_lsn,
105 : );
106 :
107 : // Do we have a layer open for writing already?
108 CBC 76639945 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
109 76634266 : if open_layer.get_lsn_range().start > lsn {
110 UBC 0 : bail!(
111 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
112 0 : open_layer.get_lsn_range().start,
113 0 : lsn
114 0 : );
115 CBC 76634266 : }
116 76634266 :
117 76634266 : Arc::clone(open_layer)
118 : } else {
119 : // No writeable layer yet. Create one.
120 5679 : let start_lsn = self
121 5679 : .layer_map
122 5679 : .next_open_layer_at
123 5679 : .context("No next open layer found")?;
124 :
125 UBC 0 : trace!(
126 0 : "creating in-memory layer at {}/{} for record at {}",
127 0 : timeline_id,
128 0 : start_lsn,
129 0 : lsn
130 0 : );
131 :
132 CBC 5679 : let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?;
133 5679 : let layer = Arc::new(new_layer);
134 5679 :
135 5679 : self.layer_map.open_layer = Some(layer.clone());
136 5679 : self.layer_map.next_open_layer_at = None;
137 5679 :
138 5679 : layer
139 : };
140 :
141 76639945 : Ok(layer)
142 76639945 : }
143 :
144 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
145 5575 : pub(crate) async fn try_freeze_in_memory_layer(
146 5575 : &mut self,
147 5575 : Lsn(last_record_lsn): Lsn,
148 5575 : last_freeze_at: &AtomicLsn,
149 5575 : ) {
150 5575 : let end_lsn = Lsn(last_record_lsn + 1);
151 :
152 5575 : if let Some(open_layer) = &self.layer_map.open_layer {
153 5385 : let open_layer_rc = Arc::clone(open_layer);
154 5385 : // Does this layer need freezing?
155 5385 : open_layer.freeze(end_lsn).await;
156 :
157 : // The layer is no longer open, update the layer map to reflect this.
158 : // We will replace it with on-disk historics below.
159 5385 : self.layer_map.frozen_layers.push_back(open_layer_rc);
160 5385 : self.layer_map.open_layer = None;
161 5385 : self.layer_map.next_open_layer_at = Some(end_lsn);
162 5385 : last_freeze_at.store(end_lsn);
163 190 : }
164 5575 : }
165 :
166 : /// Add image layers to the layer map, called from `create_image_layers`.
167 1296 : pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
168 1296 : let mut updates = self.layer_map.batch_update();
169 4698 : for layer in image_layers {
170 3402 : Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
171 3402 : }
172 1296 : updates.flush();
173 1296 : }
174 :
175 : /// Flush a frozen layer and add the written delta layer to the layer map.
176 5347 : pub(crate) fn finish_flush_l0_layer(
177 5347 : &mut self,
178 5347 : delta_layer: Option<DeltaLayer>,
179 5347 : frozen_layer_for_check: &Arc<InMemoryLayer>,
180 5347 : ) {
181 5347 : let l = self.layer_map.frozen_layers.pop_front();
182 5347 : let mut updates = self.layer_map.batch_update();
183 5347 :
184 5347 : // Only one thread may call this function at a time (for this
185 5347 : // timeline). If two threads tried to flush the same frozen
186 5347 : // layer to disk at the same time, that would not work.
187 5347 : assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
188 :
189 5347 : if let Some(delta_layer) = delta_layer {
190 5308 : Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
191 5308 : }
192 5347 : updates.flush();
193 5347 : }
194 :
195 : /// Called when compaction is completed.
196 304 : pub(crate) fn finish_compact_l0(
197 304 : &mut self,
198 304 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
199 304 : compact_from: Vec<Arc<dyn PersistentLayer>>,
200 304 : compact_to: Vec<Arc<dyn PersistentLayer>>,
201 304 : metrics: &TimelineMetrics,
202 304 : ) -> Result<()> {
203 304 : let mut updates = self.layer_map.batch_update();
204 10166 : for l in compact_to {
205 9862 : Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
206 9862 : }
207 4584 : for l in compact_from {
208 : // NB: the layer file identified by descriptor `l` is guaranteed to be present
209 : // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
210 : // time, even though we dropped `Timeline::layers` inbetween.
211 4280 : Self::delete_historic_layer(
212 4280 : layer_removal_cs.clone(),
213 4280 : l,
214 4280 : &mut updates,
215 4280 : metrics,
216 4280 : &mut self.layer_fmgr,
217 4280 : )?;
218 : }
219 304 : updates.flush();
220 304 : Ok(())
221 304 : }
222 :
223 : /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
224 25 : pub(crate) fn finish_gc_timeline(
225 25 : &mut self,
226 25 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
227 25 : gc_layers: Vec<Arc<dyn PersistentLayer>>,
228 25 : metrics: &TimelineMetrics,
229 25 : ) -> Result<ApplyGcResultGuard> {
230 25 : let mut updates = self.layer_map.batch_update();
231 3294 : for doomed_layer in gc_layers {
232 3269 : Self::delete_historic_layer(
233 3269 : layer_removal_cs.clone(),
234 3269 : doomed_layer,
235 3269 : &mut updates,
236 3269 : metrics,
237 3269 : &mut self.layer_fmgr,
238 3269 : )?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
239 : }
240 25 : Ok(ApplyGcResultGuard(updates))
241 25 : }
242 :
243 : /// Helper function to insert a layer into the layer map and file manager.
244 26650 : fn insert_historic_layer(
245 26650 : layer: Arc<dyn PersistentLayer>,
246 26650 : updates: &mut BatchedUpdates<'_>,
247 26650 : mapping: &mut LayerFileManager,
248 26650 : ) {
249 26650 : updates.insert_historic(layer.layer_desc().clone());
250 26650 : mapping.insert(layer);
251 26650 : }
252 :
253 : /// Removes the layer from local FS (if present) and from memory.
254 : /// Remote storage is not affected by this operation.
255 7549 : fn delete_historic_layer(
256 7549 : // we cannot remove layers otherwise, since gc and compaction will race
257 7549 : _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
258 7549 : layer: Arc<dyn PersistentLayer>,
259 7549 : updates: &mut BatchedUpdates<'_>,
260 7549 : metrics: &TimelineMetrics,
261 7549 : mapping: &mut LayerFileManager,
262 7549 : ) -> anyhow::Result<()> {
263 7549 : let desc = layer.layer_desc();
264 7549 : if !layer.is_remote_layer() {
265 6851 : layer.delete_resident_layer_file()?;
266 6851 : metrics.resident_physical_size_sub(desc.file_size);
267 698 : }
268 :
269 : // TODO Removing from the bottom of the layer map is expensive.
270 : // Maybe instead discard all layer map historic versions that
271 : // won't be needed for page reconstruction for this timeline,
272 : // and mark what we can't delete yet as deleted from the layer
273 : // map index without actually rebuilding the index.
274 7549 : updates.remove_historic(desc);
275 7549 : mapping.remove(layer);
276 7549 :
277 7549 : Ok(())
278 7549 : }
279 :
280 9890 : pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
281 9890 : self.layer_fmgr.contains(layer)
282 9890 : }
283 : }
284 :
285 : pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
286 : HashMap<PersistentLayerKey, Arc<T>>,
287 : );
288 :
289 : impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
290 23869909 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
291 23869909 : // The assumption for the `expect()` is that all code maintains the following invariant:
292 23869909 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
293 23869909 : self.0
294 23869909 : .get(&desc.key())
295 23869909 : .with_context(|| format!("get layer from desc: {}", desc.filename()))
296 23869909 : .expect("not found")
297 23869909 : .clone()
298 23869909 : }
299 :
300 26653 : pub(crate) fn insert(&mut self, layer: Arc<T>) {
301 26653 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
302 26653 : if present.is_some() && cfg!(debug_assertions) {
303 UBC 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
304 CBC 26653 : }
305 26653 : }
306 :
307 9890 : pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
308 9890 : self.0.contains_key(&layer.layer_desc().key())
309 9890 : }
310 :
311 1306 : pub(crate) fn new() -> Self {
312 1306 : Self(HashMap::new())
313 1306 : }
314 :
315 7549 : pub(crate) fn remove(&mut self, layer: Arc<T>) {
316 7549 : let present = self.0.remove(&layer.layer_desc().key());
317 7549 : if present.is_none() && cfg!(debug_assertions) {
318 UBC 0 : panic!(
319 0 : "removing layer that is not present in layer mapping: {:?}",
320 0 : layer.layer_desc()
321 0 : )
322 CBC 7549 : }
323 7549 : }
324 :
325 1965 : pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
326 1965 : let key = expected.layer_desc().key();
327 1965 : let other = new.layer_desc().key();
328 1965 :
329 1965 : let expected_l0 = LayerMap::is_l0(expected.layer_desc());
330 1965 : let new_l0 = LayerMap::is_l0(new.layer_desc());
331 1965 :
332 1965 : fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
333 1 : "layermap-replace-notfound"
334 1965 : ));
335 :
336 1964 : anyhow::ensure!(
337 1964 : key == other,
338 UBC 0 : "expected and new layer have different keys: {key:?} != {other:?}"
339 : );
340 :
341 CBC 1964 : anyhow::ensure!(
342 1964 : expected_l0 == new_l0,
343 UBC 0 : "one layer is l0 while the other is not: {expected_l0} != {new_l0}"
344 : );
345 :
346 CBC 1964 : if let Some(layer) = self.0.get_mut(&key) {
347 1963 : anyhow::ensure!(
348 1963 : compare_arced_layers(&expected, layer),
349 1 : "another layer was found instead of expected, expected={expected:?}, new={new:?}",
350 1 : expected = Arc::as_ptr(&expected),
351 1 : new = Arc::as_ptr(layer),
352 : );
353 1962 : *layer = new;
354 1962 : Ok(())
355 : } else {
356 1 : anyhow::bail!("layer was not found");
357 : }
358 1965 : }
359 : }
|