Line data Source code
1 : //! An in-memory layer stores recently received key-value pairs.
2 : //!
3 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
4 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
5 : //! its position in the file, is kept in memory, though.
6 : //!
7 : use crate::config::PageServerConf;
8 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
9 : use crate::repository::{Key, Value};
10 : use crate::tenant::block_io::BlockReader;
11 : use crate::tenant::ephemeral_file::EphemeralFile;
12 : use crate::tenant::storage_layer::ValueReconstructResult;
13 : use crate::tenant::timeline::GetVectoredError;
14 : use crate::tenant::{PageReconstructError, Timeline};
15 : use crate::{page_cache, walrecord};
16 : use anyhow::{anyhow, ensure, Result};
17 : use pageserver_api::keyspace::KeySpace;
18 : use pageserver_api::models::InMemoryLayerInfo;
19 : use pageserver_api::shard::TenantShardId;
20 : use std::collections::{BTreeMap, BinaryHeap, HashSet};
21 : use std::sync::{Arc, OnceLock};
22 : use std::time::Instant;
23 : use tracing::*;
24 : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
25 : // avoid binding to Write (conflicts with std::io::Write)
26 : // while being able to use std::fmt::Write's methods
27 : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
28 : use std::cmp::Ordering;
29 : use std::fmt::Write;
30 : use std::ops::Range;
31 : use std::sync::atomic::Ordering as AtomicOrdering;
32 : use std::sync::atomic::{AtomicU64, AtomicUsize};
33 : use tokio::sync::{RwLock, RwLockWriteGuard};
34 :
35 : use super::{
36 : DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
37 : ValuesReconstructState,
38 : };
39 :
40 : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
41 : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
42 :
43 : pub struct InMemoryLayer {
44 : conf: &'static PageServerConf,
45 : tenant_shard_id: TenantShardId,
46 : timeline_id: TimelineId,
47 : file_id: InMemoryLayerFileId,
48 :
49 : /// This layer contains all the changes from 'start_lsn'. The
50 : /// start is inclusive.
51 : start_lsn: Lsn,
52 :
53 : /// Frozen layers have an exclusive end LSN.
54 : /// Writes are only allowed when this is `None`.
55 : end_lsn: OnceLock<Lsn>,
56 :
57 : /// Used for traversal path. Cached representation of the in-memory layer before frozen.
58 : local_path_str: Arc<str>,
59 :
60 : /// Used for traversal path. Cached representation of the in-memory layer after frozen.
61 : frozen_local_path_str: OnceLock<Arc<str>>,
62 :
63 : opened_at: Instant,
64 :
65 : /// The above fields never change, except for `end_lsn`, which is only set once.
66 : /// All other changing parts are in `inner`, and protected by a mutex.
67 : inner: RwLock<InMemoryLayerInner>,
68 : }
69 :
70 : impl std::fmt::Debug for InMemoryLayer {
71 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 0 : f.debug_struct("InMemoryLayer")
73 0 : .field("start_lsn", &self.start_lsn)
74 0 : .field("end_lsn", &self.end_lsn)
75 0 : .field("inner", &self.inner)
76 0 : .finish()
77 0 : }
78 : }
79 :
80 : pub struct InMemoryLayerInner {
81 : /// All versions of all pages in the layer are kept here. Indexed
82 : /// by block number and LSN. The value is an offset into the
83 : /// ephemeral file where the page version is stored.
84 : index: BTreeMap<Key, VecMap<Lsn, u64>>,
85 :
86 : /// The values are stored in a serialized format in this file.
87 : /// Each serialized Value is preceded by a 'u32' length field.
88 : /// PerSeg::page_versions map stores offsets into this file.
89 : file: EphemeralFile,
90 :
91 : resource_units: GlobalResourceUnits,
92 : }
93 :
94 : impl std::fmt::Debug for InMemoryLayerInner {
95 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 0 : f.debug_struct("InMemoryLayerInner").finish()
97 0 : }
98 : }
99 :
100 : /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
101 : /// to minimize contention.
102 : ///
103 : /// This global state is used to implement behaviors that require a global view of the system, e.g.
104 : /// rolling layers proactively to limit the total amount of dirty data.
105 : pub(crate) struct GlobalResources {
106 : // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
107 : // Zero means unlimited.
108 : pub(crate) max_dirty_bytes: AtomicU64,
109 : // How many bytes are in all EphemeralFile objects
110 : dirty_bytes: AtomicU64,
111 : // How many layers are contributing to dirty_bytes
112 : dirty_layers: AtomicUsize,
113 : }
114 :
115 : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
116 : struct GlobalResourceUnits {
117 : // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
118 : // for decrementing the global counter by this many bytes when dropped.
119 : dirty_bytes: u64,
120 : }
121 :
122 : impl GlobalResourceUnits {
123 : // Hint for the layer append path to update us when the layer size differs from the last
124 : // call to update_size by this much. If we don't reach this threshold, we'll still get
125 : // updated when the Timeline "ticks" in the background.
126 : const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
127 :
128 1050 : fn new() -> Self {
129 1050 : GLOBAL_RESOURCES
130 1050 : .dirty_layers
131 1050 : .fetch_add(1, AtomicOrdering::Relaxed);
132 1050 : Self { dirty_bytes: 0 }
133 1050 : }
134 :
135 : /// Do not call this frequently: all timelines will write to these same global atomics,
136 : /// so this is a relatively expensive operation. Wait at least a few seconds between calls.
137 : ///
138 : /// Returns the effective layer size limit that should be applied, if any, to keep
139 : /// the total number of dirty bytes below the configured maximum.
140 940 : fn publish_size(&mut self, size: u64) -> Option<u64> {
141 940 : let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
142 928 : Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
143 : Ordering::Greater => {
144 10 : let delta = size - self.dirty_bytes;
145 10 : let old = GLOBAL_RESOURCES
146 10 : .dirty_bytes
147 10 : .fetch_add(delta, AtomicOrdering::Relaxed);
148 10 : old + delta
149 : }
150 : Ordering::Less => {
151 2 : let delta = self.dirty_bytes - size;
152 2 : let old = GLOBAL_RESOURCES
153 2 : .dirty_bytes
154 2 : .fetch_sub(delta, AtomicOrdering::Relaxed);
155 2 : old - delta
156 : }
157 : };
158 :
159 : // This is a sloppy update: concurrent updates to the counter will race, and the exact
160 : // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
161 : // That's okay: as long as the metric contains some recent value, it doesn't have to always
162 : // be literally the last update.
163 940 : TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
164 940 :
165 940 : self.dirty_bytes = size;
166 940 :
167 940 : let max_dirty_bytes = GLOBAL_RESOURCES
168 940 : .max_dirty_bytes
169 940 : .load(AtomicOrdering::Relaxed);
170 940 : if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
171 : // Set the layer file limit to the average layer size: this implies that all above-average
172 : // sized layers will be elegible for freezing. They will be frozen in the order they
173 : // next enter publish_size.
174 0 : Some(
175 0 : new_global_dirty_bytes
176 0 : / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
177 0 : )
178 : } else {
179 940 : None
180 : }
181 940 : }
182 :
183 : // Call publish_size if the input size differs from last published size by more than
184 : // the drift limit
185 5066038 : fn maybe_publish_size(&mut self, size: u64) {
186 5066038 : let publish = match size.cmp(&self.dirty_bytes) {
187 0 : Ordering::Equal => false,
188 5066038 : Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
189 0 : Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
190 : };
191 :
192 5066038 : if publish {
193 10 : self.publish_size(size);
194 5066028 : }
195 5066038 : }
196 : }
197 :
198 : impl Drop for GlobalResourceUnits {
199 930 : fn drop(&mut self) {
200 930 : GLOBAL_RESOURCES
201 930 : .dirty_layers
202 930 : .fetch_sub(1, AtomicOrdering::Relaxed);
203 930 :
204 930 : // Subtract our contribution to the global total dirty bytes
205 930 : self.publish_size(0);
206 930 : }
207 : }
208 :
209 : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
210 : max_dirty_bytes: AtomicU64::new(0),
211 : dirty_bytes: AtomicU64::new(0),
212 : dirty_layers: AtomicUsize::new(0),
213 : };
214 :
215 : impl InMemoryLayer {
216 2 : pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
217 2 : self.file_id
218 2 : }
219 :
220 928 : pub(crate) fn get_timeline_id(&self) -> TimelineId {
221 928 : self.timeline_id
222 928 : }
223 :
224 0 : pub(crate) fn info(&self) -> InMemoryLayerInfo {
225 0 : let lsn_start = self.start_lsn;
226 :
227 0 : if let Some(&lsn_end) = self.end_lsn.get() {
228 0 : InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
229 : } else {
230 0 : InMemoryLayerInfo::Open { lsn_start }
231 : }
232 0 : }
233 :
234 0 : pub(crate) fn try_len(&self) -> Option<u64> {
235 0 : self.inner.try_read().map(|i| i.file.len()).ok()
236 0 : }
237 :
238 5066038 : pub(crate) fn assert_writable(&self) {
239 5066038 : assert!(self.end_lsn.get().is_none());
240 5066038 : }
241 :
242 5499439 : pub(crate) fn end_lsn_or_max(&self) -> Lsn {
243 5499439 : self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
244 5499439 : }
245 :
246 5498511 : pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
247 5498511 : self.start_lsn..self.end_lsn_or_max()
248 5498511 : }
249 :
250 605583 : pub(crate) fn local_path_str(&self) -> &Arc<str> {
251 605583 : self.frozen_local_path_str
252 605583 : .get()
253 605583 : .unwrap_or(&self.local_path_str)
254 605583 : }
255 :
256 : /// debugging function to print out the contents of the layer
257 : ///
258 : /// this is likely completly unused
259 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
260 0 : let inner = self.inner.read().await;
261 :
262 0 : let end_str = self.end_lsn_or_max();
263 0 :
264 0 : println!(
265 0 : "----- in-memory layer for tli {} LSNs {}-{} ----",
266 0 : self.timeline_id, self.start_lsn, end_str,
267 0 : );
268 0 :
269 0 : if !verbose {
270 0 : return Ok(());
271 0 : }
272 0 :
273 0 : let cursor = inner.file.block_cursor();
274 0 : let mut buf = Vec::new();
275 0 : for (key, vec_map) in inner.index.iter() {
276 0 : for (lsn, pos) in vec_map.as_slice() {
277 0 : let mut desc = String::new();
278 0 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
279 0 : let val = Value::des(&buf);
280 0 : match val {
281 0 : Ok(Value::Image(img)) => {
282 0 : write!(&mut desc, " img {} bytes", img.len())?;
283 : }
284 0 : Ok(Value::WalRecord(rec)) => {
285 0 : let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
286 0 : write!(
287 0 : &mut desc,
288 0 : " rec {} bytes will_init: {} {}",
289 0 : buf.len(),
290 0 : rec.will_init(),
291 0 : wal_desc
292 0 : )?;
293 : }
294 0 : Err(err) => {
295 0 : write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
296 : }
297 : }
298 0 : println!(" key {} at {}: {}", key, lsn, desc);
299 : }
300 : }
301 :
302 0 : Ok(())
303 0 : }
304 :
305 : /// Look up given value in the layer.
306 605583 : pub(crate) async fn get_value_reconstruct_data(
307 605583 : &self,
308 605583 : key: Key,
309 605583 : lsn_range: Range<Lsn>,
310 605583 : reconstruct_state: &mut ValueReconstructState,
311 605583 : ctx: &RequestContext,
312 605583 : ) -> anyhow::Result<ValueReconstructResult> {
313 605583 : ensure!(lsn_range.start >= self.start_lsn);
314 605583 : let mut need_image = true;
315 605583 :
316 605583 : let ctx = RequestContextBuilder::extend(ctx)
317 605583 : .page_content_kind(PageContentKind::InMemoryLayer)
318 605583 : .build();
319 :
320 605583 : let inner = self.inner.read().await;
321 :
322 605583 : let reader = inner.file.block_cursor();
323 :
324 : // Scan the page versions backwards, starting from `lsn`.
325 605583 : if let Some(vec_map) = inner.index.get(&key) {
326 496260 : let slice = vec_map.slice_range(lsn_range);
327 496266 : for (entry_lsn, pos) in slice.iter().rev() {
328 496266 : let buf = reader.read_blob(*pos, &ctx).await?;
329 496266 : let value = Value::des(&buf)?;
330 496266 : match value {
331 496256 : Value::Image(img) => {
332 496256 : reconstruct_state.img = Some((*entry_lsn, img));
333 496256 : return Ok(ValueReconstructResult::Complete);
334 : }
335 10 : Value::WalRecord(rec) => {
336 10 : let will_init = rec.will_init();
337 10 : reconstruct_state.records.push((*entry_lsn, rec));
338 10 : if will_init {
339 : // This WAL record initializes the page, so no need to go further back
340 0 : need_image = false;
341 0 : break;
342 10 : }
343 : }
344 : }
345 : }
346 109323 : }
347 :
348 : // release lock on 'inner'
349 :
350 : // If an older page image is needed to reconstruct the page, let the
351 : // caller know.
352 109327 : if need_image {
353 109327 : Ok(ValueReconstructResult::Continue)
354 : } else {
355 0 : Ok(ValueReconstructResult::Complete)
356 : }
357 605583 : }
358 :
359 : // Look up the keys in the provided keyspace and update
360 : // the reconstruct state with whatever is found.
361 : //
362 : // If the key is cached, go no further than the cached Lsn.
363 2 : pub(crate) async fn get_values_reconstruct_data(
364 2 : &self,
365 2 : keyspace: KeySpace,
366 2 : end_lsn: Lsn,
367 2 : reconstruct_state: &mut ValuesReconstructState,
368 2 : ctx: &RequestContext,
369 2 : ) -> Result<(), GetVectoredError> {
370 2 : let ctx = RequestContextBuilder::extend(ctx)
371 2 : .page_content_kind(PageContentKind::InMemoryLayer)
372 2 : .build();
373 :
374 2 : let inner = self.inner.read().await;
375 2 : let reader = inner.file.block_cursor();
376 2 :
377 2 : #[derive(Eq, PartialEq, Ord, PartialOrd)]
378 2 : struct BlockRead {
379 2 : key: Key,
380 2 : lsn: Lsn,
381 2 : block_offset: u64,
382 2 : }
383 2 :
384 2 : let mut planned_block_reads = BinaryHeap::new();
385 :
386 2 : for range in keyspace.ranges.iter() {
387 2000 : for (key, vec_map) in inner.index.range(range.start..range.end) {
388 2000 : let lsn_range = match reconstruct_state.get_cached_lsn(key) {
389 0 : Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
390 2000 : None => self.start_lsn..end_lsn,
391 : };
392 :
393 2000 : let slice = vec_map.slice_range(lsn_range);
394 2000 : for (entry_lsn, pos) in slice.iter().rev() {
395 2000 : planned_block_reads.push(BlockRead {
396 2000 : key: *key,
397 2000 : lsn: *entry_lsn,
398 2000 : block_offset: *pos,
399 2000 : });
400 2000 : }
401 : }
402 : }
403 :
404 2 : let keyspace_size = keyspace.total_raw_size();
405 2 :
406 2 : let mut completed_keys = HashSet::new();
407 2002 : while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
408 2000 : let block_read = planned_block_reads.pop().unwrap();
409 2000 : if completed_keys.contains(&block_read.key) {
410 0 : continue;
411 2000 : }
412 :
413 2000 : let buf = reader.read_blob(block_read.block_offset, &ctx).await;
414 2000 : if let Err(e) = buf {
415 0 : reconstruct_state
416 0 : .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
417 0 : completed_keys.insert(block_read.key);
418 0 : continue;
419 2000 : }
420 2000 :
421 2000 : let value = Value::des(&buf.unwrap());
422 2000 : if let Err(e) = value {
423 0 : reconstruct_state
424 0 : .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
425 0 : completed_keys.insert(block_read.key);
426 0 : continue;
427 2000 : }
428 2000 :
429 2000 : let key_situation =
430 2000 : reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
431 2000 : if key_situation == ValueReconstructSituation::Complete {
432 2000 : completed_keys.insert(block_read.key);
433 2000 : }
434 : }
435 :
436 2 : reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
437 2 :
438 2 : Ok(())
439 2 : }
440 : }
441 :
442 2906 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
443 2906 : write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
444 2906 : }
445 :
446 1978 : fn inmem_layer_log_display(
447 1978 : mut f: impl Write,
448 1978 : timeline: TimelineId,
449 1978 : start_lsn: Lsn,
450 1978 : end_lsn: Lsn,
451 1978 : ) -> std::fmt::Result {
452 1978 : write!(f, "timeline {} in-memory ", timeline)?;
453 1978 : inmem_layer_display(f, start_lsn, end_lsn)
454 1978 : }
455 :
456 : impl std::fmt::Display for InMemoryLayer {
457 928 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458 928 : let end_lsn = self.end_lsn_or_max();
459 928 : inmem_layer_display(f, self.start_lsn, end_lsn)
460 928 : }
461 : }
462 :
463 : impl InMemoryLayer {
464 : /// Get layer size.
465 4780028 : pub async fn size(&self) -> Result<u64> {
466 4780028 : let inner = self.inner.read().await;
467 4780028 : Ok(inner.file.len())
468 4780028 : }
469 :
470 : /// Create a new, empty, in-memory layer
471 1050 : pub async fn create(
472 1050 : conf: &'static PageServerConf,
473 1050 : timeline_id: TimelineId,
474 1050 : tenant_shard_id: TenantShardId,
475 1050 : start_lsn: Lsn,
476 1050 : ) -> Result<InMemoryLayer> {
477 1050 : trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
478 :
479 1050 : let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
480 1050 : let key = InMemoryLayerFileId(file.page_cache_file_id());
481 1050 :
482 1050 : Ok(InMemoryLayer {
483 1050 : file_id: key,
484 1050 : local_path_str: {
485 1050 : let mut buf = String::new();
486 1050 : inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
487 1050 : buf.into()
488 1050 : },
489 1050 : frozen_local_path_str: OnceLock::new(),
490 1050 : conf,
491 1050 : timeline_id,
492 1050 : tenant_shard_id,
493 1050 : start_lsn,
494 1050 : end_lsn: OnceLock::new(),
495 1050 : opened_at: Instant::now(),
496 1050 : inner: RwLock::new(InMemoryLayerInner {
497 1050 : index: BTreeMap::new(),
498 1050 : file,
499 1050 : resource_units: GlobalResourceUnits::new(),
500 1050 : }),
501 1050 : })
502 1050 : }
503 :
504 : // Write operations
505 :
506 : /// Common subroutine of the public put_wal_record() and put_page_image() functions.
507 : /// Adds the page version to the in-memory tree
508 :
509 5066038 : pub(crate) async fn put_value(
510 5066038 : &self,
511 5066038 : key: Key,
512 5066038 : lsn: Lsn,
513 5066038 : buf: &[u8],
514 5066038 : ctx: &RequestContext,
515 5066038 : ) -> Result<()> {
516 5066038 : let mut inner = self.inner.write().await;
517 5066038 : self.assert_writable();
518 5066038 : self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
519 5066038 : }
520 :
521 5066038 : async fn put_value_locked(
522 5066038 : &self,
523 5066038 : locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
524 5066038 : key: Key,
525 5066038 : lsn: Lsn,
526 5066038 : buf: &[u8],
527 5066038 : ctx: &RequestContext,
528 5066038 : ) -> Result<()> {
529 5066038 : trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
530 :
531 5066038 : let off = {
532 5066038 : locked_inner
533 5066038 : .file
534 5066038 : .write_blob(
535 5066038 : buf,
536 5066038 : &RequestContextBuilder::extend(ctx)
537 5066038 : .page_content_kind(PageContentKind::InMemoryLayer)
538 5066038 : .build(),
539 5066038 : )
540 3560 : .await?
541 : };
542 :
543 5066038 : let vec_map = locked_inner.index.entry(key).or_default();
544 5066038 : let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
545 5066038 : if old.is_some() {
546 : // We already had an entry for this LSN. That's odd..
547 0 : warn!("Key {} at {} already exists", key, lsn);
548 5066038 : }
549 :
550 5066038 : let size = locked_inner.file.len();
551 5066038 : locked_inner.resource_units.maybe_publish_size(size);
552 5066038 :
553 5066038 : Ok(())
554 5066038 : }
555 :
556 58 : pub(crate) fn get_opened_at(&self) -> Instant {
557 58 : self.opened_at
558 58 : }
559 :
560 0 : pub(crate) async fn tick(&self) -> Option<u64> {
561 0 : let mut inner = self.inner.write().await;
562 0 : let size = inner.file.len();
563 0 : inner.resource_units.publish_size(size)
564 0 : }
565 :
566 2 : pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
567 2 : // TODO: Currently, we just leak the storage for any deleted keys
568 2 : Ok(())
569 2 : }
570 :
571 : /// Records the end_lsn for non-dropped layers.
572 : /// `end_lsn` is exclusive
573 928 : pub async fn freeze(&self, end_lsn: Lsn) {
574 928 : let inner = self.inner.write().await;
575 :
576 928 : assert!(
577 928 : self.start_lsn < end_lsn,
578 0 : "{} >= {}",
579 : self.start_lsn,
580 : end_lsn
581 : );
582 928 : self.end_lsn.set(end_lsn).expect("end_lsn set only once");
583 928 :
584 928 : self.frozen_local_path_str
585 928 : .set({
586 928 : let mut buf = String::new();
587 928 : inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
588 928 : .unwrap();
589 928 : buf.into()
590 928 : })
591 928 : .expect("frozen_local_path_str set only once");
592 :
593 4240854 : for vec_map in inner.index.values() {
594 4362294 : for (lsn, _pos) in vec_map.as_slice() {
595 4362294 : assert!(*lsn < end_lsn);
596 : }
597 : }
598 928 : }
599 :
600 : /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
601 : /// layer will only contain the key range the user specifies, and may return `None`
602 : /// if there are no matching keys.
603 : ///
604 : /// Returns a new delta layer with all the same data as this in-memory layer
605 928 : pub(crate) async fn write_to_disk(
606 928 : &self,
607 928 : timeline: &Arc<Timeline>,
608 928 : ctx: &RequestContext,
609 928 : key_range: Option<Range<Key>>,
610 928 : ) -> Result<Option<ResidentLayer>> {
611 : // Grab the lock in read-mode. We hold it over the I/O, but because this
612 : // layer is not writeable anymore, no one should be trying to acquire the
613 : // write lock on it, so we shouldn't block anyone. There's one exception
614 : // though: another thread might have grabbed a reference to this layer
615 : // in `get_layer_for_write' just before the checkpointer called
616 : // `freeze`, and then `write_to_disk` on it. When the thread gets the
617 : // lock, it will see that it's not writeable anymore and retry, but it
618 : // would have to wait until we release it. That race condition is very
619 : // rare though, so we just accept the potential latency hit for now.
620 928 : let inner = self.inner.read().await;
621 :
622 928 : let end_lsn = *self.end_lsn.get().unwrap();
623 :
624 928 : let keys: Vec<_> = if let Some(key_range) = key_range {
625 102 : inner
626 102 : .index
627 102 : .iter()
628 816 : .filter(|(k, _)| key_range.contains(k))
629 102 : .map(|(k, m)| (k.to_i128(), m))
630 102 : .collect()
631 : } else {
632 4240038 : inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect()
633 : };
634 :
635 928 : if keys.is_empty() {
636 102 : return Ok(None);
637 826 : }
638 :
639 826 : let mut delta_layer_writer = DeltaLayerWriter::new(
640 826 : self.conf,
641 826 : self.timeline_id,
642 826 : self.tenant_shard_id,
643 826 : Key::MIN,
644 826 : self.start_lsn..end_lsn,
645 826 : )
646 424 : .await?;
647 :
648 826 : let mut buf = Vec::new();
649 826 :
650 826 : let cursor = inner.file.block_cursor();
651 826 :
652 826 : let ctx = RequestContextBuilder::extend(ctx)
653 826 : .page_content_kind(PageContentKind::InMemoryLayer)
654 826 : .build();
655 4240038 : for (key, vec_map) in inner.index.iter() {
656 : // Write all page versions
657 4361478 : for (lsn, pos) in vec_map.as_slice() {
658 4361478 : cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
659 4361478 : let will_init = Value::des(&buf)?.will_init();
660 : let res;
661 4361478 : (buf, res) = delta_layer_writer
662 4361478 : .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
663 2717 : .await;
664 4361478 : res?;
665 : }
666 : }
667 :
668 : // MAX is used here because we identify L0 layers by full key range
669 6301 : let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, &ctx).await?;
670 826 : Ok(Some(delta_layer))
671 928 : }
672 : }
|