Line data Source code
1 : use anyhow::{bail, ensure, Context, Result};
2 : use futures::StreamExt;
3 : use pageserver_api::shard::TenantShardId;
4 : use std::{collections::HashMap, sync::Arc};
5 : use tracing::trace;
6 : use utils::{
7 : id::TimelineId,
8 : lsn::{AtomicLsn, Lsn},
9 : };
10 :
11 : use crate::{
12 : config::PageServerConf,
13 : metrics::TimelineMetrics,
14 : tenant::{
15 : layer_map::{BatchedUpdates, LayerMap},
16 : storage_layer::{
17 : AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
18 : ResidentLayer,
19 : },
20 : },
21 : };
22 :
23 : /// Provides semantic APIs to manipulate the layer map.
24 1592 : #[derive(Default)]
25 : pub(crate) struct LayerManager {
26 : layer_map: LayerMap,
27 : layer_fmgr: LayerFileManager<Layer>,
28 : }
29 :
30 : impl LayerManager {
31 16862196 : pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
32 16862196 : self.layer_fmgr.get_from_desc(desc)
33 16862196 : }
34 :
35 : /// Get an immutable reference to the layer map.
36 : ///
37 : /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
38 : /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
39 24426310 : pub(crate) fn layer_map(&self) -> &LayerMap {
40 24426310 : &self.layer_map
41 24426310 : }
42 :
43 : /// Called from `load_layer_map`. Initialize the layer manager with:
44 : /// 1. all on-disk layers
45 : /// 2. next open layer (with disk disk_consistent_lsn LSN)
46 431 : pub(crate) fn initialize_local_layers(
47 431 : &mut self,
48 431 : on_disk_layers: Vec<Layer>,
49 431 : next_open_layer_at: Lsn,
50 431 : ) {
51 431 : let mut updates = self.layer_map.batch_update();
52 53475 : for layer in on_disk_layers {
53 53044 : Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
54 53044 : }
55 431 : updates.flush();
56 431 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
57 431 : }
58 :
59 : /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
60 1149 : pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
61 1149 : self.layer_map.next_open_layer_at = Some(next_open_layer_at);
62 1149 : }
63 :
64 : /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
65 : /// called within `get_layer_for_write`.
66 2907667 : pub(crate) async fn get_layer_for_write(
67 2907667 : &mut self,
68 2907667 : lsn: Lsn,
69 2907667 : last_record_lsn: Lsn,
70 2907667 : conf: &'static PageServerConf,
71 2907667 : timeline_id: TimelineId,
72 2907667 : tenant_shard_id: TenantShardId,
73 2907667 : ) -> Result<Arc<InMemoryLayer>> {
74 2907667 : ensure!(lsn.is_aligned());
75 :
76 2907667 : ensure!(
77 2907667 : lsn > last_record_lsn,
78 0 : "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
79 : lsn,
80 : last_record_lsn,
81 : );
82 :
83 : // Do we have a layer open for writing already?
84 2907667 : let layer = if let Some(open_layer) = &self.layer_map.open_layer {
85 2902027 : if open_layer.get_lsn_range().start > lsn {
86 0 : bail!(
87 0 : "unexpected open layer in the future: open layers starts at {}, write lsn {}",
88 0 : open_layer.get_lsn_range().start,
89 0 : lsn
90 0 : );
91 2902027 : }
92 2902027 :
93 2902027 : Arc::clone(open_layer)
94 : } else {
95 : // No writeable layer yet. Create one.
96 5640 : let start_lsn = self
97 5640 : .layer_map
98 5640 : .next_open_layer_at
99 5640 : .context("No next open layer found")?;
100 :
101 0 : trace!(
102 0 : "creating in-memory layer at {}/{} for record at {}",
103 0 : timeline_id,
104 0 : start_lsn,
105 0 : lsn
106 0 : );
107 :
108 5640 : let new_layer =
109 5640 : InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
110 5640 : let layer = Arc::new(new_layer);
111 5640 :
112 5640 : self.layer_map.open_layer = Some(layer.clone());
113 5640 : self.layer_map.next_open_layer_at = None;
114 5640 :
115 5640 : layer
116 : };
117 :
118 2907667 : Ok(layer)
119 2907667 : }
120 :
121 : /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
122 5410 : pub(crate) async fn try_freeze_in_memory_layer(
123 5410 : &mut self,
124 5410 : Lsn(last_record_lsn): Lsn,
125 5410 : last_freeze_at: &AtomicLsn,
126 5410 : ) {
127 5410 : let end_lsn = Lsn(last_record_lsn + 1);
128 :
129 5410 : if let Some(open_layer) = &self.layer_map.open_layer {
130 5202 : let open_layer_rc = Arc::clone(open_layer);
131 5202 : // Does this layer need freezing?
132 5202 : open_layer.freeze(end_lsn).await;
133 :
134 : // The layer is no longer open, update the layer map to reflect this.
135 : // We will replace it with on-disk historics below.
136 5202 : self.layer_map.frozen_layers.push_back(open_layer_rc);
137 5202 : self.layer_map.open_layer = None;
138 5202 : self.layer_map.next_open_layer_at = Some(end_lsn);
139 5202 : last_freeze_at.store(end_lsn);
140 208 : }
141 5410 : }
142 :
143 : /// Add image layers to the layer map, called from `create_image_layers`.
144 1699 : pub(crate) fn track_new_image_layers(
145 1699 : &mut self,
146 1699 : image_layers: &[ResidentLayer],
147 1699 : metrics: &TimelineMetrics,
148 1699 : ) {
149 1699 : let mut updates = self.layer_map.batch_update();
150 8174 : for layer in image_layers {
151 6475 : Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
152 6475 :
153 6475 : // record these here instead of Layer::finish_creating because otherwise partial
154 6475 : // failure with create_image_layers would balloon up the physical size gauge. downside
155 6475 : // is that all layers need to be created before metrics are updated.
156 6475 : metrics.record_new_file_metrics(layer.layer_desc().file_size);
157 6475 : }
158 1699 : updates.flush();
159 1699 : }
160 :
161 : /// Flush a frozen layer and add the written delta layer to the layer map.
162 5177 : pub(crate) fn finish_flush_l0_layer(
163 5177 : &mut self,
164 5177 : delta_layer: Option<&ResidentLayer>,
165 5177 : frozen_layer_for_check: &Arc<InMemoryLayer>,
166 5177 : metrics: &TimelineMetrics,
167 5177 : ) {
168 5177 : let inmem = self
169 5177 : .layer_map
170 5177 : .frozen_layers
171 5177 : .pop_front()
172 5177 : .expect("there must be a inmem layer to flush");
173 5177 :
174 5177 : // Only one task may call this function at a time (for this
175 5177 : // timeline). If two tasks tried to flush the same frozen
176 5177 : // layer to disk at the same time, that would not work.
177 5177 : assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
178 :
179 5177 : if let Some(l) = delta_layer {
180 5097 : let mut updates = self.layer_map.batch_update();
181 5097 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
182 5097 : metrics.record_new_file_metrics(l.layer_desc().file_size);
183 5097 : updates.flush();
184 5097 : }
185 5177 : }
186 :
187 : /// Called when compaction is completed.
188 295 : pub(crate) fn finish_compact_l0(
189 295 : &mut self,
190 295 : compact_from: &[Layer],
191 295 : compact_to: &[ResidentLayer],
192 295 : metrics: &TimelineMetrics,
193 295 : ) {
194 295 : let mut updates = self.layer_map.batch_update();
195 11021 : for l in compact_to {
196 10726 : Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
197 10726 : metrics.record_new_file_metrics(l.layer_desc().file_size);
198 10726 : }
199 4372 : for l in compact_from {
200 4077 : Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
201 4077 : }
202 295 : updates.flush();
203 295 : }
204 :
205 : /// Called when garbage collect has selected the layers to be removed.
206 20 : pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
207 20 : let mut updates = self.layer_map.batch_update();
208 1100 : for doomed_layer in gc_layers {
209 1080 : Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
210 1080 : }
211 20 : updates.flush()
212 20 : }
213 :
214 : /// Helper function to insert a layer into the layer map and file manager.
215 75342 : fn insert_historic_layer(
216 75342 : layer: Layer,
217 75342 : updates: &mut BatchedUpdates<'_>,
218 75342 : mapping: &mut LayerFileManager<Layer>,
219 75342 : ) {
220 75342 : updates.insert_historic(layer.layer_desc().clone());
221 75342 : mapping.insert(layer);
222 75342 : }
223 :
224 : /// Removes the layer from local FS (if present) and from memory.
225 : /// Remote storage is not affected by this operation.
226 5157 : fn delete_historic_layer(
227 5157 : // we cannot remove layers otherwise, since gc and compaction will race
228 5157 : layer: &Layer,
229 5157 : updates: &mut BatchedUpdates<'_>,
230 5157 : mapping: &mut LayerFileManager<Layer>,
231 5157 : ) {
232 5157 : let desc = layer.layer_desc();
233 5157 :
234 5157 : // TODO Removing from the bottom of the layer map is expensive.
235 5157 : // Maybe instead discard all layer map historic versions that
236 5157 : // won't be needed for page reconstruction for this timeline,
237 5157 : // and mark what we can't delete yet as deleted from the layer
238 5157 : // map index without actually rebuilding the index.
239 5157 : updates.remove_historic(desc);
240 5157 : mapping.remove(layer);
241 5157 : layer.delete_on_drop();
242 5157 : }
243 :
244 47 : pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream<Item = Layer> + '_ {
245 47 : // for small layer maps, we most likely have all resident, but for larger more are likely
246 47 : // to be evicted assuming lots of layers correlated with longer lifespan.
247 47 :
248 47 : let layers = self
249 47 : .layer_map()
250 47 : .iter_historic_layers()
251 3040 : .map(|desc| self.get_from_desc(&desc));
252 47 :
253 47 : let layers = futures::stream::iter(layers);
254 47 :
255 3040 : layers.filter_map(|layer| async move {
256 3040 : // TODO(#6028): this query does not really need to see the ResidentLayer
257 3040 : match layer.keep_resident().await {
258 3039 : Ok(Some(layer)) => Some(layer.drop_eviction_guard()),
259 1 : Ok(None) => None,
260 0 : Err(e) => {
261 0 : // these should not happen, but we cannot make them statically impossible right
262 0 : // now.
263 0 : tracing::warn!(%layer, "failed to keep the layer resident: {e:#}");
264 0 : None
265 : }
266 : }
267 6080 : })
268 47 : }
269 :
270 10754 : pub(crate) fn contains(&self, layer: &Layer) -> bool {
271 10754 : self.layer_fmgr.contains(layer)
272 10754 : }
273 : }
274 :
275 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
276 :
277 : impl<T> Default for LayerFileManager<T> {
278 1592 : fn default() -> Self {
279 1592 : Self(HashMap::default())
280 1592 : }
281 : }
282 :
283 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
284 16862219 : fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
285 16862219 : // The assumption for the `expect()` is that all code maintains the following invariant:
286 16862219 : // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
287 16862219 : self.0
288 16862219 : .get(&desc.key())
289 16862219 : .with_context(|| format!("get layer from desc: {}", desc.filename()))
290 16862219 : .expect("not found")
291 16862219 : .clone()
292 16862219 : }
293 :
294 75342 : pub(crate) fn insert(&mut self, layer: T) {
295 75342 : let present = self.0.insert(layer.layer_desc().key(), layer.clone());
296 75342 : if present.is_some() && cfg!(debug_assertions) {
297 0 : panic!("overwriting a layer: {:?}", layer.layer_desc())
298 75342 : }
299 75342 : }
300 :
301 10754 : pub(crate) fn contains(&self, layer: &T) -> bool {
302 10754 : self.0.contains_key(&layer.layer_desc().key())
303 10754 : }
304 :
305 5157 : pub(crate) fn remove(&mut self, layer: &T) {
306 5157 : let present = self.0.remove(&layer.layer_desc().key());
307 5157 : if present.is_none() && cfg!(debug_assertions) {
308 0 : panic!(
309 0 : "removing layer that is not present in layer mapping: {:?}",
310 0 : layer.layer_desc()
311 0 : )
312 5157 : }
313 5157 : }
314 : }
|