Line data Source code
1 : //! An in-memory layer stores recently received key-value pairs.
2 : //!
3 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
4 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
5 : //! its position in the file, is kept in memory, though.
6 : //!
7 : use crate::config::PageServerConf;
8 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
9 : use crate::repository::{Key, Value};
10 : use crate::tenant::block_io::BlockReader;
11 : use crate::tenant::ephemeral_file::EphemeralFile;
12 : use crate::tenant::storage_layer::ValueReconstructResult;
13 : use crate::tenant::timeline::GetVectoredError;
14 : use crate::tenant::{PageReconstructError, Timeline};
15 : use crate::{page_cache, walrecord};
16 : use anyhow::{anyhow, ensure, Result};
17 : use pageserver_api::keyspace::KeySpace;
18 : use pageserver_api::models::InMemoryLayerInfo;
19 : use pageserver_api::shard::TenantShardId;
20 : use std::collections::{BTreeMap, BinaryHeap, HashSet};
21 : use std::sync::{Arc, OnceLock};
22 : use std::time::Instant;
23 : use tracing::*;
24 : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
25 : // avoid binding to Write (conflicts with std::io::Write)
26 : // while being able to use std::fmt::Write's methods
27 : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
28 : use std::cmp::Ordering;
29 : use std::fmt::Write;
30 : use std::ops::Range;
31 : use std::sync::atomic::Ordering as AtomicOrdering;
32 : use std::sync::atomic::{AtomicU64, AtomicUsize};
33 : use tokio::sync::{RwLock, RwLockWriteGuard};
34 :
35 : use super::{
36 : DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
37 : ValuesReconstructState,
38 : };
39 :
40 : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
41 : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
42 :
43 : pub struct InMemoryLayer {
44 : conf: &'static PageServerConf,
45 : tenant_shard_id: TenantShardId,
46 : timeline_id: TimelineId,
47 : file_id: InMemoryLayerFileId,
48 :
49 : /// This layer contains all the changes from 'start_lsn'. The
50 : /// start is inclusive.
51 : start_lsn: Lsn,
52 :
53 : /// Frozen layers have an exclusive end LSN.
54 : /// Writes are only allowed when this is `None`.
55 : pub(crate) end_lsn: OnceLock<Lsn>,
56 :
57 : /// Used for traversal path. Cached representation of the in-memory layer before frozen.
58 : local_path_str: Arc<str>,
59 :
60 : /// Used for traversal path. Cached representation of the in-memory layer after frozen.
61 : frozen_local_path_str: OnceLock<Arc<str>>,
62 :
63 : opened_at: Instant,
64 :
65 : /// The above fields never change, except for `end_lsn`, which is only set once.
66 : /// All other changing parts are in `inner`, and protected by a mutex.
67 : inner: RwLock<InMemoryLayerInner>,
68 : }
69 :
70 : impl std::fmt::Debug for InMemoryLayer {
71 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 0 : f.debug_struct("InMemoryLayer")
73 0 : .field("start_lsn", &self.start_lsn)
74 0 : .field("end_lsn", &self.end_lsn)
75 0 : .field("inner", &self.inner)
76 0 : .finish()
77 0 : }
78 : }
79 :
80 : pub struct InMemoryLayerInner {
81 : /// All versions of all pages in the layer are kept here. Indexed
82 : /// by block number and LSN. The value is an offset into the
83 : /// ephemeral file where the page version is stored.
84 : index: BTreeMap<Key, VecMap<Lsn, u64>>,
85 :
86 : /// The values are stored in a serialized format in this file.
87 : /// Each serialized Value is preceded by a 'u32' length field.
88 : /// PerSeg::page_versions map stores offsets into this file.
89 : file: EphemeralFile,
90 :
91 : resource_units: GlobalResourceUnits,
92 : }
93 :
94 : impl std::fmt::Debug for InMemoryLayerInner {
95 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 0 : f.debug_struct("InMemoryLayerInner").finish()
97 0 : }
98 : }
99 :
100 : /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
101 : /// to minimize contention.
102 : ///
103 : /// This global state is used to implement behaviors that require a global view of the system, e.g.
104 : /// rolling layers proactively to limit the total amount of dirty data.
105 : pub(crate) struct GlobalResources {
106 : // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
107 : // Zero means unlimited.
108 : pub(crate) max_dirty_bytes: AtomicU64,
109 : // How many bytes are in all EphemeralFile objects
110 : dirty_bytes: AtomicU64,
111 : // How many layers are contributing to dirty_bytes
112 : dirty_layers: AtomicUsize,
113 : }
114 :
115 : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
116 : struct GlobalResourceUnits {
117 : // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
118 : // for decrementing the global counter by this many bytes when dropped.
119 : dirty_bytes: u64,
120 : }
121 :
122 : impl GlobalResourceUnits {
123 : // Hint for the layer append path to update us when the layer size differs from the last
124 : // call to update_size by this much. If we don't reach this threshold, we'll still get
125 : // updated when the Timeline "ticks" in the background.
126 : const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
127 :
128 1238 : fn new() -> Self {
129 1238 : GLOBAL_RESOURCES
130 1238 : .dirty_layers
131 1238 : .fetch_add(1, AtomicOrdering::Relaxed);
132 1238 : Self { dirty_bytes: 0 }
133 1238 : }
134 :
135 : /// Do not call this frequently: all timelines will write to these same global atomics,
136 : /// so this is a relatively expensive operation. Wait at least a few seconds between calls.
137 : ///
138 : /// Returns the effective layer size limit that should be applied, if any, to keep
139 : /// the total number of dirty bytes below the configured maximum.
140 1120 : fn publish_size(&mut self, size: u64) -> Option<u64> {
141 1120 : let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
142 1108 : Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
143 : Ordering::Greater => {
144 10 : let delta = size - self.dirty_bytes;
145 10 : let old = GLOBAL_RESOURCES
146 10 : .dirty_bytes
147 10 : .fetch_add(delta, AtomicOrdering::Relaxed);
148 10 : old + delta
149 : }
150 : Ordering::Less => {
151 2 : let delta = self.dirty_bytes - size;
152 2 : let old = GLOBAL_RESOURCES
153 2 : .dirty_bytes
154 2 : .fetch_sub(delta, AtomicOrdering::Relaxed);
155 2 : old - delta
156 : }
157 : };
158 :
159 : // This is a sloppy update: concurrent updates to the counter will race, and the exact
160 : // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
161 : // That's okay: as long as the metric contains some recent value, it doesn't have to always
162 : // be literally the last update.
163 1120 : TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
164 1120 :
165 1120 : self.dirty_bytes = size;
166 1120 :
167 1120 : let max_dirty_bytes = GLOBAL_RESOURCES
168 1120 : .max_dirty_bytes
169 1120 : .load(AtomicOrdering::Relaxed);
170 1120 : if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
171 : // Set the layer file limit to the average layer size: this implies that all above-average
172 : // sized layers will be elegible for freezing. They will be frozen in the order they
173 : // next enter publish_size.
174 0 : Some(
175 0 : new_global_dirty_bytes
176 0 : / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
177 0 : )
178 : } else {
179 1120 : None
180 : }
181 1120 : }
182 :
183 : // Call publish_size if the input size differs from last published size by more than
184 : // the drift limit
185 5090402 : fn maybe_publish_size(&mut self, size: u64) {
186 5090402 : let publish = match size.cmp(&self.dirty_bytes) {
187 0 : Ordering::Equal => false,
188 5090402 : Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
189 0 : Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
190 : };
191 :
192 5090402 : if publish {
193 10 : self.publish_size(size);
194 5090392 : }
195 5090402 : }
196 : }
197 :
198 : impl Drop for GlobalResourceUnits {
199 1110 : fn drop(&mut self) {
200 1110 : GLOBAL_RESOURCES
201 1110 : .dirty_layers
202 1110 : .fetch_sub(1, AtomicOrdering::Relaxed);
203 1110 :
204 1110 : // Subtract our contribution to the global total dirty bytes
205 1110 : self.publish_size(0);
206 1110 : }
207 : }
208 :
209 : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
210 : max_dirty_bytes: AtomicU64::new(0),
211 : dirty_bytes: AtomicU64::new(0),
212 : dirty_layers: AtomicUsize::new(0),
213 : };
214 :
215 : impl InMemoryLayer {
216 14 : pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
217 14 : self.file_id
218 14 : }
219 :
220 1108 : pub(crate) fn get_timeline_id(&self) -> TimelineId {
221 1108 : self.timeline_id
222 1108 : }
223 :
224 0 : pub(crate) fn info(&self) -> InMemoryLayerInfo {
225 0 : let lsn_start = self.start_lsn;
226 :
227 0 : if let Some(&lsn_end) = self.end_lsn.get() {
228 0 : InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
229 : } else {
230 0 : InMemoryLayerInfo::Open { lsn_start }
231 : }
232 0 : }
233 :
234 0 : pub(crate) fn try_len(&self) -> Option<u64> {
235 0 : self.inner.try_read().map(|i| i.file.len()).ok()
236 0 : }
237 :
238 5090402 : pub(crate) fn assert_writable(&self) {
239 5090402 : assert!(self.end_lsn.get().is_none());
240 5090402 : }
241 :
242 722222 : pub(crate) fn end_lsn_or_max(&self) -> Lsn {
243 722222 : self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
244 722222 : }
245 :
246 721114 : pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
247 721114 : self.start_lsn..self.end_lsn_or_max()
248 721114 : }
249 :
250 606205 : pub(crate) fn local_path_str(&self) -> &Arc<str> {
251 606205 : self.frozen_local_path_str
252 606205 : .get()
253 606205 : .unwrap_or(&self.local_path_str)
254 606205 : }
255 :
256 : /// debugging function to print out the contents of the layer
257 : ///
258 : /// this is likely completly unused
259 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
260 0 : let inner = self.inner.read().await;
261 :
262 0 : let end_str = self.end_lsn_or_max();
263 0 :
264 0 : println!(
265 0 : "----- in-memory layer for tli {} LSNs {}-{} ----",
266 0 : self.timeline_id, self.start_lsn, end_str,
267 0 : );
268 0 :
269 0 : if !verbose {
270 0 : return Ok(());
271 0 : }
272 0 :
273 0 : let cursor = inner.file.block_cursor();
274 0 : let mut buf = Vec::new();
275 0 : for (key, vec_map) in inner.index.iter() {
276 0 : for (lsn, pos) in vec_map.as_slice() {
277 0 : let mut desc = String::new();
278 0 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
279 0 : let val = Value::des(&buf);
280 0 : match val {
281 0 : Ok(Value::Image(img)) => {
282 0 : write!(&mut desc, " img {} bytes", img.len())?;
283 : }
284 0 : Ok(Value::WalRecord(rec)) => {
285 0 : let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
286 0 : write!(
287 0 : &mut desc,
288 0 : " rec {} bytes will_init: {} {}",
289 0 : buf.len(),
290 0 : rec.will_init(),
291 0 : wal_desc
292 0 : )?;
293 : }
294 0 : Err(err) => {
295 0 : write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
296 : }
297 : }
298 0 : println!(" key {} at {}: {}", key, lsn, desc);
299 : }
300 : }
301 :
302 0 : Ok(())
303 0 : }
304 :
305 : /// Look up given value in the layer.
306 606205 : pub(crate) async fn get_value_reconstruct_data(
307 606205 : &self,
308 606205 : key: Key,
309 606205 : lsn_range: Range<Lsn>,
310 606205 : reconstruct_state: &mut ValueReconstructState,
311 606205 : ctx: &RequestContext,
312 606205 : ) -> anyhow::Result<ValueReconstructResult> {
313 606205 : ensure!(lsn_range.start >= self.start_lsn);
314 606205 : let mut need_image = true;
315 606205 :
316 606205 : let ctx = RequestContextBuilder::extend(ctx)
317 606205 : .page_content_kind(PageContentKind::InMemoryLayer)
318 606205 : .build();
319 :
320 606205 : let inner = self.inner.read().await;
321 :
322 606205 : let reader = inner.file.block_cursor();
323 :
324 : // Scan the page versions backwards, starting from `lsn`.
325 606205 : if let Some(vec_map) = inner.index.get(&key) {
326 496814 : let slice = vec_map.slice_range(lsn_range);
327 496824 : for (entry_lsn, pos) in slice.iter().rev() {
328 496824 : let buf = reader.read_blob(*pos, &ctx).await?;
329 496824 : let value = Value::des(&buf)?;
330 496824 : match value {
331 496804 : Value::Image(img) => {
332 496804 : reconstruct_state.img = Some((*entry_lsn, img));
333 496804 : return Ok(ValueReconstructResult::Complete);
334 : }
335 20 : Value::WalRecord(rec) => {
336 20 : let will_init = rec.will_init();
337 20 : reconstruct_state.records.push((*entry_lsn, rec));
338 20 : if will_init {
339 : // This WAL record initializes the page, so no need to go further back
340 0 : need_image = false;
341 0 : break;
342 20 : }
343 : }
344 : }
345 : }
346 109391 : }
347 :
348 : // release lock on 'inner'
349 :
350 : // If an older page image is needed to reconstruct the page, let the
351 : // caller know.
352 109401 : if need_image {
353 109401 : Ok(ValueReconstructResult::Continue)
354 : } else {
355 0 : Ok(ValueReconstructResult::Complete)
356 : }
357 606205 : }
358 :
359 : // Look up the keys in the provided keyspace and update
360 : // the reconstruct state with whatever is found.
361 : //
362 : // If the key is cached, go no further than the cached Lsn.
363 14 : pub(crate) async fn get_values_reconstruct_data(
364 14 : &self,
365 14 : keyspace: KeySpace,
366 14 : end_lsn: Lsn,
367 14 : reconstruct_state: &mut ValuesReconstructState,
368 14 : ctx: &RequestContext,
369 14 : ) -> Result<(), GetVectoredError> {
370 14 : let ctx = RequestContextBuilder::extend(ctx)
371 14 : .page_content_kind(PageContentKind::InMemoryLayer)
372 14 : .build();
373 :
374 14 : let inner = self.inner.read().await;
375 14 : let reader = inner.file.block_cursor();
376 14 :
377 14 : #[derive(Eq, PartialEq, Ord, PartialOrd)]
378 14 : struct BlockRead {
379 14 : key: Key,
380 14 : lsn: Lsn,
381 14 : block_offset: u64,
382 14 : }
383 14 :
384 14 : let mut planned_block_reads = BinaryHeap::new();
385 :
386 14 : for range in keyspace.ranges.iter() {
387 2018 : for (key, vec_map) in inner.index.range(range.start..range.end) {
388 2018 : let lsn_range = match reconstruct_state.get_cached_lsn(key) {
389 0 : Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
390 2018 : None => self.start_lsn..end_lsn,
391 : };
392 :
393 2018 : let slice = vec_map.slice_range(lsn_range);
394 2022 : for (entry_lsn, pos) in slice.iter().rev() {
395 2022 : planned_block_reads.push(BlockRead {
396 2022 : key: *key,
397 2022 : lsn: *entry_lsn,
398 2022 : block_offset: *pos,
399 2022 : });
400 2022 : }
401 : }
402 : }
403 :
404 14 : let keyspace_size = keyspace.total_raw_size();
405 14 :
406 14 : let mut completed_keys = HashSet::new();
407 2036 : while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
408 2022 : let block_read = planned_block_reads.pop().unwrap();
409 2022 : if completed_keys.contains(&block_read.key) {
410 4 : continue;
411 2018 : }
412 :
413 2018 : let buf = reader.read_blob(block_read.block_offset, &ctx).await;
414 2018 : if let Err(e) = buf {
415 0 : reconstruct_state
416 0 : .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
417 0 : completed_keys.insert(block_read.key);
418 0 : continue;
419 2018 : }
420 2018 :
421 2018 : let value = Value::des(&buf.unwrap());
422 2018 : if let Err(e) = value {
423 0 : reconstruct_state
424 0 : .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
425 0 : completed_keys.insert(block_read.key);
426 0 : continue;
427 2018 : }
428 2018 :
429 2018 : let key_situation =
430 2018 : reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
431 2018 : if key_situation == ValueReconstructSituation::Complete {
432 2018 : completed_keys.insert(block_read.key);
433 2018 : }
434 : }
435 :
436 14 : reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
437 14 :
438 14 : Ok(())
439 14 : }
440 : }
441 :
442 3454 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
443 3454 : write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
444 3454 : }
445 :
446 2346 : fn inmem_layer_log_display(
447 2346 : mut f: impl Write,
448 2346 : timeline: TimelineId,
449 2346 : start_lsn: Lsn,
450 2346 : end_lsn: Lsn,
451 2346 : ) -> std::fmt::Result {
452 2346 : write!(f, "timeline {} in-memory ", timeline)?;
453 2346 : inmem_layer_display(f, start_lsn, end_lsn)
454 2346 : }
455 :
456 : impl std::fmt::Display for InMemoryLayer {
457 1108 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458 1108 : let end_lsn = self.end_lsn_or_max();
459 1108 : inmem_layer_display(f, self.start_lsn, end_lsn)
460 1108 : }
461 : }
462 :
463 : impl InMemoryLayer {
464 : /// Get layer size.
465 1238 : pub async fn size(&self) -> Result<u64> {
466 1238 : let inner = self.inner.read().await;
467 1238 : Ok(inner.file.len())
468 1238 : }
469 :
470 : /// Create a new, empty, in-memory layer
471 1238 : pub async fn create(
472 1238 : conf: &'static PageServerConf,
473 1238 : timeline_id: TimelineId,
474 1238 : tenant_shard_id: TenantShardId,
475 1238 : start_lsn: Lsn,
476 1238 : ctx: &RequestContext,
477 1238 : ) -> Result<InMemoryLayer> {
478 1238 : trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
479 :
480 1238 : let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
481 1238 : let key = InMemoryLayerFileId(file.page_cache_file_id());
482 1238 :
483 1238 : Ok(InMemoryLayer {
484 1238 : file_id: key,
485 1238 : local_path_str: {
486 1238 : let mut buf = String::new();
487 1238 : inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
488 1238 : buf.into()
489 1238 : },
490 1238 : frozen_local_path_str: OnceLock::new(),
491 1238 : conf,
492 1238 : timeline_id,
493 1238 : tenant_shard_id,
494 1238 : start_lsn,
495 1238 : end_lsn: OnceLock::new(),
496 1238 : opened_at: Instant::now(),
497 1238 : inner: RwLock::new(InMemoryLayerInner {
498 1238 : index: BTreeMap::new(),
499 1238 : file,
500 1238 : resource_units: GlobalResourceUnits::new(),
501 1238 : }),
502 1238 : })
503 1238 : }
504 :
505 : // Write operations
506 :
507 : /// Common subroutine of the public put_wal_record() and put_page_image() functions.
508 : /// Adds the page version to the in-memory tree
509 :
510 5090402 : pub(crate) async fn put_value(
511 5090402 : &self,
512 5090402 : key: Key,
513 5090402 : lsn: Lsn,
514 5090402 : buf: &[u8],
515 5090402 : ctx: &RequestContext,
516 5090402 : ) -> Result<()> {
517 5090402 : let mut inner = self.inner.write().await;
518 5090402 : self.assert_writable();
519 5090402 : self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
520 5090402 : }
521 :
522 5090402 : async fn put_value_locked(
523 5090402 : &self,
524 5090402 : locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
525 5090402 : key: Key,
526 5090402 : lsn: Lsn,
527 5090402 : buf: &[u8],
528 5090402 : ctx: &RequestContext,
529 5090402 : ) -> Result<()> {
530 5090402 : trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
531 :
532 5090402 : let off = {
533 5090402 : locked_inner
534 5090402 : .file
535 5090402 : .write_blob(
536 5090402 : buf,
537 5090402 : &RequestContextBuilder::extend(ctx)
538 5090402 : .page_content_kind(PageContentKind::InMemoryLayer)
539 5090402 : .build(),
540 5090402 : )
541 3572 : .await?
542 : };
543 :
544 5090402 : let vec_map = locked_inner.index.entry(key).or_default();
545 5090402 : let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
546 5090402 : if old.is_some() {
547 : // We already had an entry for this LSN. That's odd..
548 0 : warn!("Key {} at {} already exists", key, lsn);
549 5090402 : }
550 :
551 5090402 : let size = locked_inner.file.len();
552 5090402 : locked_inner.resource_units.maybe_publish_size(size);
553 5090402 :
554 5090402 : Ok(())
555 5090402 : }
556 :
557 4803026 : pub(crate) fn get_opened_at(&self) -> Instant {
558 4803026 : self.opened_at
559 4803026 : }
560 :
561 0 : pub(crate) async fn tick(&self) -> Option<u64> {
562 0 : let mut inner = self.inner.write().await;
563 0 : let size = inner.file.len();
564 0 : inner.resource_units.publish_size(size)
565 0 : }
566 :
567 2 : pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
568 2 : // TODO: Currently, we just leak the storage for any deleted keys
569 2 : Ok(())
570 2 : }
571 :
572 : /// Records the end_lsn for non-dropped layers.
573 : /// `end_lsn` is exclusive
574 1108 : pub async fn freeze(&self, end_lsn: Lsn) {
575 1108 : let inner = self.inner.write().await;
576 :
577 1108 : assert!(
578 1108 : self.start_lsn < end_lsn,
579 0 : "{} >= {}",
580 : self.start_lsn,
581 : end_lsn
582 : );
583 1108 : self.end_lsn.set(end_lsn).expect("end_lsn set only once");
584 1108 :
585 1108 : self.frozen_local_path_str
586 1108 : .set({
587 1108 : let mut buf = String::new();
588 1108 : inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
589 1108 : .unwrap();
590 1108 : buf.into()
591 1108 : })
592 1108 : .expect("frozen_local_path_str set only once");
593 :
594 4255764 : for vec_map in inner.index.values() {
595 4386636 : for (lsn, _pos) in vec_map.as_slice() {
596 4386636 : assert!(*lsn < end_lsn);
597 : }
598 : }
599 1108 : }
600 :
601 : /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
602 : /// layer will only contain the key range the user specifies, and may return `None`
603 : /// if there are no matching keys.
604 : ///
605 : /// Returns a new delta layer with all the same data as this in-memory layer
606 1108 : pub(crate) async fn write_to_disk(
607 1108 : &self,
608 1108 : timeline: &Arc<Timeline>,
609 1108 : ctx: &RequestContext,
610 1108 : key_range: Option<Range<Key>>,
611 1108 : ) -> Result<Option<ResidentLayer>> {
612 : // Grab the lock in read-mode. We hold it over the I/O, but because this
613 : // layer is not writeable anymore, no one should be trying to acquire the
614 : // write lock on it, so we shouldn't block anyone. There's one exception
615 : // though: another thread might have grabbed a reference to this layer
616 : // in `get_layer_for_write' just before the checkpointer called
617 : // `freeze`, and then `write_to_disk` on it. When the thread gets the
618 : // lock, it will see that it's not writeable anymore and retry, but it
619 : // would have to wait until we release it. That race condition is very
620 : // rare though, so we just accept the potential latency hit for now.
621 1108 : let inner = self.inner.read().await;
622 :
623 1108 : let end_lsn = *self.end_lsn.get().unwrap();
624 :
625 1108 : let keys: Vec<_> = if let Some(key_range) = key_range {
626 140 : inner
627 140 : .index
628 140 : .iter()
629 1118 : .filter(|(k, _)| key_range.contains(k))
630 140 : .map(|(k, m)| (k.to_i128(), m))
631 140 : .collect()
632 : } else {
633 4254646 : inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect()
634 : };
635 :
636 1108 : if keys.is_empty() {
637 140 : return Ok(None);
638 968 : }
639 :
640 968 : let mut delta_layer_writer = DeltaLayerWriter::new(
641 968 : self.conf,
642 968 : self.timeline_id,
643 968 : self.tenant_shard_id,
644 968 : Key::MIN,
645 968 : self.start_lsn..end_lsn,
646 968 : ctx,
647 968 : )
648 490 : .await?;
649 :
650 968 : let mut buf = Vec::new();
651 968 :
652 968 : let cursor = inner.file.block_cursor();
653 968 :
654 968 : let ctx = RequestContextBuilder::extend(ctx)
655 968 : .page_content_kind(PageContentKind::InMemoryLayer)
656 968 : .build();
657 4254646 : for (key, vec_map) in inner.index.iter() {
658 : // Write all page versions
659 4385518 : for (lsn, pos) in vec_map.as_slice() {
660 4385518 : cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
661 4385518 : let will_init = Value::des(&buf)?.will_init();
662 : let res;
663 4385518 : (buf, res) = delta_layer_writer
664 4385518 : .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
665 2729 : .await;
666 4385518 : res?;
667 : }
668 : }
669 :
670 : // MAX is used here because we identify L0 layers by full key range
671 6676 : let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, &ctx).await?;
672 968 : Ok(Some(delta_layer))
673 1108 : }
674 : }
|