Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use pageserver_api::shard::TenantShardId;
3 : use std::{collections::HashMap, sync::Arc};
4 : use tracing::trace;
5 : use utils::{
6 : id::TimelineId,
7 : lsn::{AtomicLsn, Lsn},
8 : };
9 :
10 : use crate::{
11 : config::PageServerConf,
12 : metrics::TimelineMetrics,
13 : tenant::{
14 : layer_map::{BatchedUpdates, LayerMap},
15 : storage_layer::{
16 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
17 : ResidentLayer,
18 : },
19 : },
20 : };
21 :
22 : /// Provides semantic APIs to manipulate the layer map.
23 : pub(crate) struct LayerManager {
24 : layer_map: LayerMap,
25 : layer_fmgr: LayerFileManager<Layer>,
26 : }
27 :
28 : impl LayerManager {
29 1568 : pub(crate) fn create() -> Self {
30 1568 : Self {
31 1568 : layer_map: LayerMap::default(),
32 1568 : layer_fmgr: LayerFileManager::new(),
33 1568 : }
34 1568 : }
35 :
36 23952731 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
37 23952731 : self.layer_fmgr.get_from_desc(desc)
38 23952731 : }
39 :
40 : /// Get an immutable reference to the layer map.
41 : ///
42 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
43 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
44 32493414 : pub(crate) fn layer_map(&self) -> &LayerMap {
45 32493414 : &self.layer_map
46 32493414 : }
47 :
48 : /// Called from `load_layer_map`. Initialize the layer manager with:
49 : /// 1. all on-disk layers
50 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
51 412 : pub(crate) fn initialize_local_layers(
52 412 : &mut self,
53 412 : on_disk_layers: Vec<Layer>,
54 412 : next_open_layer_at: Lsn,
55 412 : ) {
56 412 : let mut updates = self.layer_map.batch_update();
57 57240 : for layer in on_disk_layers {
58 56828 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
59 56828 : }
60 412 : updates.flush();
61 412 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
62 412 : }
63 :
64 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
65 1144 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
66 1144 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
67 1144 : }
68 :
69 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
70 : /// called within `get_layer_for_write`.
71 2951969 : pub(crate) async fn get_layer_for_write(
72 2951969 : &mut self,
73 2951969 : lsn: Lsn,
74 2951969 : last_record_lsn: Lsn,
75 2951969 : conf: &'static PageServerConf,
76 2951969 : timeline_id: TimelineId,
77 2951969 : tenant_shard_id: TenantShardId,
78 2951969 : ) -> Result<Arc<InMemoryLayer>> {
79 2951969 : ensure!(lsn.is_aligned());
80 :
81 : ensure!(
82 2951969 : lsn > last_record_lsn,
83 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
84 : lsn,
85 : last_record_lsn,
86 : );
87 :
88 : // Do we have a layer open for writing already?
89 2951969 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
90 2946256 : if open_layer.get_lsn_range().start > lsn {
91 0 : bail!(
92 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
93 0 : open_layer.get_lsn_range().start,
94 0 : lsn
95 0 : );
96 2946256 : }
97 2946256 :
98 2946256 : Arc::clone(open_layer)
99 : } else {
100 : // No writeable layer yet. Create one.
101 5713 : let start_lsn = self
102 5713 : .layer_map
103 5713 : .next_open_layer_at
104 5713 : .context("No next open layer found")?;
105 :
106 0 : trace!(
107 0 : "creating in-memory layer at {}/{} for record at {}",
108 0 : timeline_id,
109 0 : start_lsn,
110 0 : lsn
111 0 : );
112 :
113 5713 : let new_layer =
114 5713 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
115 5713 : let layer = Arc::new(new_layer);
116 5713 :
117 5713 : self.layer_map.open_layer = Some(layer.clone());
118 5713 : self.layer_map.next_open_layer_at = None;
119 5713 :
120 5713 : layer
121 : };
122 :
123 2951969 : Ok(layer)
124 2951969 : }
125 :
126 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
127 5489 : pub(crate) async fn try_freeze_in_memory_layer(
128 5489 : &mut self,
129 5489 : Lsn(last_record_lsn): Lsn,
130 5489 : last_freeze_at: &AtomicLsn,
131 5489 : ) {
132 5489 : let end_lsn = Lsn(last_record_lsn + 1);
133 :
134 5489 : if let Some(open_layer) = &self.layer_map.open_layer {
135 5288 : let open_layer_rc = Arc::clone(open_layer);
136 5288 : // Does this layer need freezing?
137 5288 : open_layer.freeze(end_lsn).await;
138 :
139 : // The layer is no longer open, update the layer map to reflect this.
140 : // We will replace it with on-disk historics below.
141 5288 : self.layer_map.frozen_layers.push_back(open_layer_rc);
142 5288 : self.layer_map.open_layer = None;
143 5288 : self.layer_map.next_open_layer_at = Some(end_lsn);
144 5288 : last_freeze_at.store(end_lsn);
145 201 : }
146 5489 : }
147 :
148 : /// Add image layers to the layer map, called from `create_image_layers`.
149 1597 : pub(crate) fn track_new_image_layers(
150 1597 : &mut self,
151 1597 : image_layers: &[ResidentLayer],
152 1597 : metrics: &TimelineMetrics,
153 1597 : ) {
154 1597 : let mut updates = self.layer_map.batch_update();
155 7626 : for layer in image_layers {
156 6029 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
157 6029 :
158 6029 : // record these here instead of Layer::finish_creating because otherwise partial
159 6029 : // failure with create_image_layers would balloon up the physical size gauge. downside
160 6029 : // is that all layers need to be created before metrics are updated.
161 6029 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
162 6029 : }
163 1597 : updates.flush();
164 1597 : }
165 :
166 : /// Flush a frozen layer and add the written delta layer to the layer map.
167 5263 : pub(crate) fn finish_flush_l0_layer(
168 5263 : &mut self,
169 5263 : delta_layer: Option<&ResidentLayer>,
170 5263 : frozen_layer_for_check: &Arc<InMemoryLayer>,
171 5263 : metrics: &TimelineMetrics,
172 5263 : ) {
173 5263 : let inmem = self
174 5263 : .layer_map
175 5263 : .frozen_layers
176 5263 : .pop_front()
177 5263 : .expect("there must be a inmem layer to flush");
178 5263 :
179 5263 : // Only one task may call this function at a time (for this
180 5263 : // timeline). If two tasks tried to flush the same frozen
181 5263 : // layer to disk at the same time, that would not work.
182 5263 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
183 :
184 5263 : if let Some(l) = delta_layer {
185 5183 : let mut updates = self.layer_map.batch_update();
186 5183 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
187 5183 : metrics.record_new_file_metrics(l.layer_desc().file_size);
188 5183 : updates.flush();
189 5183 : }
190 5263 : }
191 :
192 : /// Called when compaction is completed.
193 295 : pub(crate) fn finish_compact_l0(
194 295 : &mut self,
195 295 : compact_from: &[Layer],
196 295 : compact_to: &[ResidentLayer],
197 295 : metrics: &TimelineMetrics,
198 295 : ) {
199 295 : let mut updates = self.layer_map.batch_update();
200 11025 : for l in compact_to {
201 10730 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
202 10730 : metrics.record_new_file_metrics(l.layer_desc().file_size);
203 10730 : }
204 4508 : for l in compact_from {
205 4213 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
206 4213 : }
207 295 : updates.flush();
208 295 : }
209 :
210 : /// Called when garbage collect has selected the layers to be removed.
211 20 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
212 20 : let mut updates = self.layer_map.batch_update();
213 1217 : for doomed_layer in gc_layers {
214 1197 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
215 1197 : }
216 20 : updates.flush()
217 20 : }
218 :
219 : /// Helper function to insert a layer into the layer map and file manager.
220 78770 : fn insert_historic_layer(
221 78770 : layer: Layer,
222 78770 : updates: &mut BatchedUpdates<'_>,
223 78770 : mapping: &mut LayerFileManager<Layer>,
224 78770 : ) {
225 78770 : updates.insert_historic(layer.layer_desc().clone());
226 78770 : mapping.insert(layer);
227 78770 : }
228 :
229 : /// Removes the layer from local FS (if present) and from memory.
230 : /// Remote storage is not affected by this operation.
231 5410 : fn delete_historic_layer(
232 5410 : // we cannot remove layers otherwise, since gc and compaction will race
233 5410 : layer: &Layer,
234 5410 : updates: &mut BatchedUpdates<'_>,
235 5410 : mapping: &mut LayerFileManager<Layer>,
236 5410 : ) {
237 5410 : let desc = layer.layer_desc();
238 5410 :
239 5410 : // TODO Removing from the bottom of the layer map is expensive.
240 5410 : // Maybe instead discard all layer map historic versions that
241 5410 : // won't be needed for page reconstruction for this timeline,
242 5410 : // and mark what we can't delete yet as deleted from the layer
243 5410 : // map index without actually rebuilding the index.
244 5410 : updates.remove_historic(desc);
245 5410 : mapping.remove(layer);
246 5410 : layer.delete_on_drop();
247 5410 : }
248 :
249 10770 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
250 10770 : self.layer_fmgr.contains(layer)
251 10770 : }
252 : }
253 :
254 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
255 :
256 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
257 23952738 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
258 23952738 : // The assumption for the `expect()` is that all code maintains the following invariant:
259 23952738 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
260 23952738 : self.0
261 23952738 : .get(&desc.key())
262 23952738 : .with_context(|| format!("get layer from desc: {}", desc.filename()))
263 23952738 : .expect("not found")
264 23952738 : .clone()
265 23952738 : }
266 :
267 78770 : pub(crate) fn insert(&mut self, layer: T) {
268 78770 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
269 78770 : if present.is_some() && cfg!(debug_assertions) {
270 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
271 78770 : }
272 78770 : }
273 :
274 10770 : pub(crate) fn contains(&self, layer: &T) -> bool {
275 10770 : self.0.contains_key(&layer.layer_desc().key())
276 10770 : }
277 :
278 1568 : pub(crate) fn new() -> Self {
279 1568 : Self(HashMap::new())
280 1568 : }
281 :
282 5410 : pub(crate) fn remove(&mut self, layer: &T) {
283 5410 : let present = self.0.remove(&layer.layer_desc().key());
284 5410 : if present.is_none() && cfg!(debug_assertions) {
285 0 : panic!(
286 0 : "removing layer that is not present in layer mapping: {:?}",
287 0 : layer.layer_desc()
288 0 : )
289 5410 : }
290 5410 : }
291 : }
|