Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use futures::StreamExt;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : metrics::TimelineMetrics,
14 : tenant::{
15 : layer_map::{BatchedUpdates, LayerMap},
16 : storage_layer::{
17 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
18 : ResidentLayer,
19 : },
20 : },
21 : };
22 :
23 : /// Provides semantic APIs to manipulate the layer map.
24 1590 : #[derive(Default)]
25 : pub(crate) struct LayerManager {
26 : layer_map: LayerMap,
27 : layer_fmgr: LayerFileManager<Layer>,
28 : }
29 :
30 : impl LayerManager {
31 16837615 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
32 16837615 : self.layer_fmgr.get_from_desc(desc)
33 16837615 : }
34 :
35 : /// Get an immutable reference to the layer map.
36 : ///
37 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
38 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
39 24419805 : pub(crate) fn layer_map(&self) -> &LayerMap {
40 24419805 : &self.layer_map
41 24419805 : }
42 :
43 : /// Called from `load_layer_map`. Initialize the layer manager with:
44 : /// 1. all on-disk layers
45 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
46 433 : pub(crate) fn initialize_local_layers(
47 433 : &mut self,
48 433 : on_disk_layers: Vec<Layer>,
49 433 : next_open_layer_at: Lsn,
50 433 : ) {
51 433 : let mut updates = self.layer_map.batch_update();
52 53620 : for layer in on_disk_layers {
53 53187 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
54 53187 : }
55 433 : updates.flush();
56 433 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
57 433 : }
58 :
59 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
60 1145 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
61 1145 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
62 1145 : }
63 :
64 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
65 : /// called within `get_layer_for_write`.
66 2946591 : pub(crate) async fn get_layer_for_write(
67 2946591 : &mut self,
68 2946591 : lsn: Lsn,
69 2946591 : last_record_lsn: Lsn,
70 2946591 : conf: &'static PageServerConf,
71 2946591 : timeline_id: TimelineId,
72 2946591 : tenant_shard_id: TenantShardId,
73 2946592 : ) -> Result<Arc<InMemoryLayer>> {
74 2946592 : ensure!(lsn.is_aligned());
75 :
76 2946592 : ensure!(
77 2946592 : lsn > last_record_lsn,
78 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
79 : lsn,
80 : last_record_lsn,
81 : );
82 :
83 : // Do we have a layer open for writing already?
84 2946592 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
85 2940896 : if open_layer.get_lsn_range().start > lsn {
86 0 : bail!(
87 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
88 0 : open_layer.get_lsn_range().start,
89 0 : lsn
90 0 : );
91 2940896 : }
92 2940896 :
93 2940896 : Arc::clone(open_layer)
94 : } else {
95 : // No writeable layer yet. Create one.
96 5696 : let start_lsn = self
97 5696 : .layer_map
98 5696 : .next_open_layer_at
99 5696 : .context("No next open layer found")?;
100 :
101 0 : trace!(
102 0 : "creating in-memory layer at {}/{} for record at {}",
103 0 : timeline_id,
104 0 : start_lsn,
105 0 : lsn
106 0 : );
107 :
108 5696 : let new_layer =
109 5696 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
110 5696 : let layer = Arc::new(new_layer);
111 5696 :
112 5696 : self.layer_map.open_layer = Some(layer.clone());
113 5696 : self.layer_map.next_open_layer_at = None;
114 5696 :
115 5696 : layer
116 : };
117 :
118 2946592 : Ok(layer)
119 2946592 : }
120 :
121 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
122 5472 : pub(crate) async fn try_freeze_in_memory_layer(
123 5472 : &mut self,
124 5472 : Lsn(last_record_lsn): Lsn,
125 5472 : last_freeze_at: &AtomicLsn,
126 5472 : ) {
127 5472 : let end_lsn = Lsn(last_record_lsn + 1);
128 :
129 5472 : if let Some(open_layer) = &self.layer_map.open_layer {
130 5260 : let open_layer_rc = Arc::clone(open_layer);
131 5260 : // Does this layer need freezing?
132 5260 : open_layer.freeze(end_lsn).await;
133 :
134 : // The layer is no longer open, update the layer map to reflect this.
135 : // We will replace it with on-disk historics below.
136 5260 : self.layer_map.frozen_layers.push_back(open_layer_rc);
137 5260 : self.layer_map.open_layer = None;
138 5260 : self.layer_map.next_open_layer_at = Some(end_lsn);
139 5260 : last_freeze_at.store(end_lsn);
140 212 : }
141 5472 : }
142 :
143 : /// Add image layers to the layer map, called from `create_image_layers`.
144 1720 : pub(crate) fn track_new_image_layers(
145 1720 : &mut self,
146 1720 : image_layers: &[ResidentLayer],
147 1720 : metrics: &TimelineMetrics,
148 1720 : ) {
149 1720 : let mut updates = self.layer_map.batch_update();
150 8192 : for layer in image_layers {
151 6472 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
152 6472 :
153 6472 : // record these here instead of Layer::finish_creating because otherwise partial
154 6472 : // failure with create_image_layers would balloon up the physical size gauge. downside
155 6472 : // is that all layers need to be created before metrics are updated.
156 6472 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
157 6472 : }
158 1720 : updates.flush();
159 1720 : }
160 :
161 : /// Flush a frozen layer and add the written delta layer to the layer map.
162 5235 : pub(crate) fn finish_flush_l0_layer(
163 5235 : &mut self,
164 5235 : delta_layer: Option<&ResidentLayer>,
165 5235 : frozen_layer_for_check: &Arc<InMemoryLayer>,
166 5235 : metrics: &TimelineMetrics,
167 5235 : ) {
168 5235 : let inmem = self
169 5235 : .layer_map
170 5235 : .frozen_layers
171 5235 : .pop_front()
172 5235 : .expect("there must be a inmem layer to flush");
173 5235 :
174 5235 : // Only one task may call this function at a time (for this
175 5235 : // timeline). If two tasks tried to flush the same frozen
176 5235 : // layer to disk at the same time, that would not work.
177 5235 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
178 :
179 5235 : if let Some(l) = delta_layer {
180 5155 : let mut updates = self.layer_map.batch_update();
181 5155 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
182 5155 : metrics.record_new_file_metrics(l.layer_desc().file_size);
183 5155 : updates.flush();
184 5155 : }
185 5235 : }
186 :
187 : /// Called when compaction is completed.
188 296 : pub(crate) fn finish_compact_l0(
189 296 : &mut self,
190 296 : compact_from: &[Layer],
191 296 : compact_to: &[ResidentLayer],
192 296 : metrics: &TimelineMetrics,
193 296 : ) {
194 296 : let mut updates = self.layer_map.batch_update();
195 11032 : for l in compact_to {
196 10736 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
197 10736 : metrics.record_new_file_metrics(l.layer_desc().file_size);
198 10736 : }
199 4429 : for l in compact_from {
200 4133 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
201 4133 : }
202 296 : updates.flush();
203 296 : }
204 :
205 : /// Called when garbage collect has selected the layers to be removed.
206 19 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
207 19 : let mut updates = self.layer_map.batch_update();
208 1105 : for doomed_layer in gc_layers {
209 1086 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
210 1086 : }
211 19 : updates.flush()
212 19 : }
213 :
214 : /// Helper function to insert a layer into the layer map and file manager.
215 75550 : fn insert_historic_layer(
216 75550 : layer: Layer,
217 75550 : updates: &mut BatchedUpdates<'_>,
218 75550 : mapping: &mut LayerFileManager<Layer>,
219 75550 : ) {
220 75550 : updates.insert_historic(layer.layer_desc().clone());
221 75550 : mapping.insert(layer);
222 75550 : }
223 :
224 : /// Removes the layer from local FS (if present) and from memory.
225 : /// Remote storage is not affected by this operation.
226 5219 : fn delete_historic_layer(
227 5219 : // we cannot remove layers otherwise, since gc and compaction will race
228 5219 : layer: &Layer,
229 5219 : updates: &mut BatchedUpdates<'_>,
230 5219 : mapping: &mut LayerFileManager<Layer>,
231 5219 : ) {
232 5219 : let desc = layer.layer_desc();
233 5219 :
234 5219 : // TODO Removing from the bottom of the layer map is expensive.
235 5219 : // Maybe instead discard all layer map historic versions that
236 5219 : // won't be needed for page reconstruction for this timeline,
237 5219 : // and mark what we can't delete yet as deleted from the layer
238 5219 : // map index without actually rebuilding the index.
239 5219 : updates.remove_historic(desc);
240 5219 : mapping.remove(layer);
241 5219 : layer.delete_on_drop();
242 5219 : }
243 :
244 49 : pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream<Item = Layer> + '_ {
245 49 : // for small layer maps, we most likely have all resident, but for larger more are likely
246 49 : // to be evicted assuming lots of layers correlated with longer lifespan.
247 49 :
248 49 : let layers = self
249 49 : .layer_map()
250 49 : .iter_historic_layers()
251 3078 : .map(|desc| self.get_from_desc(&desc));
252 49 :
253 49 : let layers = futures::stream::iter(layers);
254 49 :
255 3078 : layers.filter_map(|layer| async move {
256 3078 : // TODO(#6028): this query does not really need to see the ResidentLayer
257 3078 : match layer.keep_resident().await {
258 3054 : Ok(Some(layer)) => Some(layer.drop_eviction_guard()),
259 24 : Ok(None) => None,
260 0 : Err(e) => {
261 0 : // these should not happen, but we cannot make them statically impossible right
262 0 : // now.
263 0 : tracing::warn!(%layer, "failed to keep the layer resident: {e:#}");
264 0 : None
265 : }
266 : }
267 6156 : })
268 49 : }
269 :
270 10764 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
271 10764 : self.layer_fmgr.contains(layer)
272 10764 : }
273 : }
274 :
275 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
276 :
277 : impl<T> Default for LayerFileManager<T> {
278 1590 : fn default() -> Self {
279 1590 : Self(HashMap::default())
280 1590 : }
281 : }
282 :
283 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
284 16837634 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
285 16837634 : // The assumption for the `expect()` is that all code maintains the following invariant:
286 16837634 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
287 16837634 : self.0
288 16837634 : .get(&desc.key())
289 16837634 : .with_context(|| format!("get layer from desc: {}", desc.filename()))
290 16837634 : .expect("not found")
291 16837634 : .clone()
292 16837634 : }
293 :
294 75550 : pub(crate) fn insert(&mut self, layer: T) {
295 75550 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
296 75550 : if present.is_some() && cfg!(debug_assertions) {
297 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
298 75550 : }
299 75550 : }
300 :
301 10764 : pub(crate) fn contains(&self, layer: &T) -> bool {
302 10764 : self.0.contains_key(&layer.layer_desc().key())
303 10764 : }
304 :
305 5219 : pub(crate) fn remove(&mut self, layer: &T) {
306 5219 : let present = self.0.remove(&layer.layer_desc().key());
307 5219 : if present.is_none() && cfg!(debug_assertions) {
308 0 : panic!(
309 0 : "removing layer that is not present in layer mapping: {:?}",
310 0 : layer.layer_desc()
311 0 : )
312 5219 : }
313 5219 : }
314 : }
|