Line data Source code
1 : //! An in-memory layer stores recently received key-value pairs.
2 : //!
3 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
4 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
5 : //! its position in the file, is kept in memory, though.
6 : //!
7 : use crate::config::PageServerConf;
8 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
9 : use crate::page_cache::PAGE_SZ;
10 : use crate::repository::{Key, Value};
11 : use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
12 : use crate::tenant::ephemeral_file::EphemeralFile;
13 : use crate::tenant::storage_layer::ValueReconstructResult;
14 : use crate::tenant::timeline::GetVectoredError;
15 : use crate::tenant::{PageReconstructError, Timeline};
16 : use crate::{l0_flush, page_cache, walrecord};
17 : use anyhow::{anyhow, ensure, Result};
18 : use pageserver_api::keyspace::KeySpace;
19 : use pageserver_api::models::InMemoryLayerInfo;
20 : use pageserver_api::shard::TenantShardId;
21 : use std::collections::{BTreeMap, BinaryHeap, HashSet};
22 : use std::sync::{Arc, OnceLock};
23 : use std::time::Instant;
24 : use tracing::*;
25 : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
26 : // avoid binding to Write (conflicts with std::io::Write)
27 : // while being able to use std::fmt::Write's methods
28 : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
29 : use std::cmp::Ordering;
30 : use std::fmt::Write;
31 : use std::ops::Range;
32 : use std::sync::atomic::Ordering as AtomicOrdering;
33 : use std::sync::atomic::{AtomicU64, AtomicUsize};
34 : use tokio::sync::{RwLock, RwLockWriteGuard};
35 :
36 : use super::{
37 : DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
38 : ValuesReconstructState,
39 : };
40 :
41 : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
42 : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
43 :
44 : pub struct InMemoryLayer {
45 : conf: &'static PageServerConf,
46 : tenant_shard_id: TenantShardId,
47 : timeline_id: TimelineId,
48 : file_id: InMemoryLayerFileId,
49 :
50 : /// This layer contains all the changes from 'start_lsn'. The
51 : /// start is inclusive.
52 : start_lsn: Lsn,
53 :
54 : /// Frozen layers have an exclusive end LSN.
55 : /// Writes are only allowed when this is `None`.
56 : pub(crate) end_lsn: OnceLock<Lsn>,
57 :
58 : /// Used for traversal path. Cached representation of the in-memory layer before frozen.
59 : local_path_str: Arc<str>,
60 :
61 : /// Used for traversal path. Cached representation of the in-memory layer after frozen.
62 : frozen_local_path_str: OnceLock<Arc<str>>,
63 :
64 : opened_at: Instant,
65 :
66 : /// The above fields never change, except for `end_lsn`, which is only set once.
67 : /// All other changing parts are in `inner`, and protected by a mutex.
68 : inner: RwLock<InMemoryLayerInner>,
69 : }
70 :
71 : impl std::fmt::Debug for InMemoryLayer {
72 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 0 : f.debug_struct("InMemoryLayer")
74 0 : .field("start_lsn", &self.start_lsn)
75 0 : .field("end_lsn", &self.end_lsn)
76 0 : .field("inner", &self.inner)
77 0 : .finish()
78 0 : }
79 : }
80 :
81 : pub struct InMemoryLayerInner {
82 : /// All versions of all pages in the layer are kept here. Indexed
83 : /// by block number and LSN. The value is an offset into the
84 : /// ephemeral file where the page version is stored.
85 : index: BTreeMap<Key, VecMap<Lsn, u64>>,
86 :
87 : /// The values are stored in a serialized format in this file.
88 : /// Each serialized Value is preceded by a 'u32' length field.
89 : /// PerSeg::page_versions map stores offsets into this file.
90 : file: EphemeralFile,
91 :
92 : resource_units: GlobalResourceUnits,
93 : }
94 :
95 : impl std::fmt::Debug for InMemoryLayerInner {
96 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 0 : f.debug_struct("InMemoryLayerInner").finish()
98 0 : }
99 : }
100 :
101 : /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
102 : /// to minimize contention.
103 : ///
104 : /// This global state is used to implement behaviors that require a global view of the system, e.g.
105 : /// rolling layers proactively to limit the total amount of dirty data.
106 : pub(crate) struct GlobalResources {
107 : // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
108 : // Zero means unlimited.
109 : pub(crate) max_dirty_bytes: AtomicU64,
110 : // How many bytes are in all EphemeralFile objects
111 : dirty_bytes: AtomicU64,
112 : // How many layers are contributing to dirty_bytes
113 : dirty_layers: AtomicUsize,
114 : }
115 :
116 : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
117 : struct GlobalResourceUnits {
118 : // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
119 : // for decrementing the global counter by this many bytes when dropped.
120 : dirty_bytes: u64,
121 : }
122 :
123 : impl GlobalResourceUnits {
124 : // Hint for the layer append path to update us when the layer size differs from the last
125 : // call to update_size by this much. If we don't reach this threshold, we'll still get
126 : // updated when the Timeline "ticks" in the background.
127 : const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
128 :
129 1244 : fn new() -> Self {
130 1244 : GLOBAL_RESOURCES
131 1244 : .dirty_layers
132 1244 : .fetch_add(1, AtomicOrdering::Relaxed);
133 1244 : Self { dirty_bytes: 0 }
134 1244 : }
135 :
136 : /// Do not call this frequently: all timelines will write to these same global atomics,
137 : /// so this is a relatively expensive operation. Wait at least a few seconds between calls.
138 : ///
139 : /// Returns the effective layer size limit that should be applied, if any, to keep
140 : /// the total number of dirty bytes below the configured maximum.
141 1126 : fn publish_size(&mut self, size: u64) -> Option<u64> {
142 1126 : let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
143 1114 : Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
144 : Ordering::Greater => {
145 10 : let delta = size - self.dirty_bytes;
146 10 : let old = GLOBAL_RESOURCES
147 10 : .dirty_bytes
148 10 : .fetch_add(delta, AtomicOrdering::Relaxed);
149 10 : old + delta
150 : }
151 : Ordering::Less => {
152 2 : let delta = self.dirty_bytes - size;
153 2 : let old = GLOBAL_RESOURCES
154 2 : .dirty_bytes
155 2 : .fetch_sub(delta, AtomicOrdering::Relaxed);
156 2 : old - delta
157 : }
158 : };
159 :
160 : // This is a sloppy update: concurrent updates to the counter will race, and the exact
161 : // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
162 : // That's okay: as long as the metric contains some recent value, it doesn't have to always
163 : // be literally the last update.
164 1126 : TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
165 1126 :
166 1126 : self.dirty_bytes = size;
167 1126 :
168 1126 : let max_dirty_bytes = GLOBAL_RESOURCES
169 1126 : .max_dirty_bytes
170 1126 : .load(AtomicOrdering::Relaxed);
171 1126 : if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
172 : // Set the layer file limit to the average layer size: this implies that all above-average
173 : // sized layers will be elegible for freezing. They will be frozen in the order they
174 : // next enter publish_size.
175 0 : Some(
176 0 : new_global_dirty_bytes
177 0 : / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
178 0 : )
179 : } else {
180 1126 : None
181 : }
182 1126 : }
183 :
184 : // Call publish_size if the input size differs from last published size by more than
185 : // the drift limit
186 5090450 : fn maybe_publish_size(&mut self, size: u64) {
187 5090450 : let publish = match size.cmp(&self.dirty_bytes) {
188 0 : Ordering::Equal => false,
189 5090450 : Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
190 0 : Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
191 : };
192 :
193 5090450 : if publish {
194 10 : self.publish_size(size);
195 5090440 : }
196 5090450 : }
197 : }
198 :
199 : impl Drop for GlobalResourceUnits {
200 1116 : fn drop(&mut self) {
201 1116 : GLOBAL_RESOURCES
202 1116 : .dirty_layers
203 1116 : .fetch_sub(1, AtomicOrdering::Relaxed);
204 1116 :
205 1116 : // Subtract our contribution to the global total dirty bytes
206 1116 : self.publish_size(0);
207 1116 : }
208 : }
209 :
210 : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
211 : max_dirty_bytes: AtomicU64::new(0),
212 : dirty_bytes: AtomicU64::new(0),
213 : dirty_layers: AtomicUsize::new(0),
214 : };
215 :
216 : impl InMemoryLayer {
217 14 : pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
218 14 : self.file_id
219 14 : }
220 :
221 1114 : pub(crate) fn get_timeline_id(&self) -> TimelineId {
222 1114 : self.timeline_id
223 1114 : }
224 :
225 0 : pub(crate) fn info(&self) -> InMemoryLayerInfo {
226 0 : let lsn_start = self.start_lsn;
227 :
228 0 : if let Some(&lsn_end) = self.end_lsn.get() {
229 0 : InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
230 : } else {
231 0 : InMemoryLayerInfo::Open { lsn_start }
232 : }
233 0 : }
234 :
235 0 : pub(crate) fn try_len(&self) -> Option<u64> {
236 0 : self.inner.try_read().map(|i| i.file.len()).ok()
237 0 : }
238 :
239 5090450 : pub(crate) fn assert_writable(&self) {
240 5090450 : assert!(self.end_lsn.get().is_none());
241 5090450 : }
242 :
243 721720 : pub(crate) fn end_lsn_or_max(&self) -> Lsn {
244 721720 : self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
245 721720 : }
246 :
247 720606 : pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
248 720606 : self.start_lsn..self.end_lsn_or_max()
249 720606 : }
250 :
251 606260 : pub(crate) fn local_path_str(&self) -> &Arc<str> {
252 606260 : self.frozen_local_path_str
253 606260 : .get()
254 606260 : .unwrap_or(&self.local_path_str)
255 606260 : }
256 :
257 : /// debugging function to print out the contents of the layer
258 : ///
259 : /// this is likely completly unused
260 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
261 0 : let inner = self.inner.read().await;
262 :
263 0 : let end_str = self.end_lsn_or_max();
264 0 :
265 0 : println!(
266 0 : "----- in-memory layer for tli {} LSNs {}-{} ----",
267 0 : self.timeline_id, self.start_lsn, end_str,
268 0 : );
269 0 :
270 0 : if !verbose {
271 0 : return Ok(());
272 0 : }
273 0 :
274 0 : let cursor = inner.file.block_cursor();
275 0 : let mut buf = Vec::new();
276 0 : for (key, vec_map) in inner.index.iter() {
277 0 : for (lsn, pos) in vec_map.as_slice() {
278 0 : let mut desc = String::new();
279 0 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
280 0 : let val = Value::des(&buf);
281 0 : match val {
282 0 : Ok(Value::Image(img)) => {
283 0 : write!(&mut desc, " img {} bytes", img.len())?;
284 : }
285 0 : Ok(Value::WalRecord(rec)) => {
286 0 : let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
287 0 : write!(
288 0 : &mut desc,
289 0 : " rec {} bytes will_init: {} {}",
290 0 : buf.len(),
291 0 : rec.will_init(),
292 0 : wal_desc
293 0 : )?;
294 : }
295 0 : Err(err) => {
296 0 : write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
297 : }
298 : }
299 0 : println!(" key {} at {}: {}", key, lsn, desc);
300 : }
301 : }
302 :
303 0 : Ok(())
304 0 : }
305 :
306 : /// Look up given value in the layer.
307 606260 : pub(crate) async fn get_value_reconstruct_data(
308 606260 : &self,
309 606260 : key: Key,
310 606260 : lsn_range: Range<Lsn>,
311 606260 : reconstruct_state: &mut ValueReconstructState,
312 606260 : ctx: &RequestContext,
313 606260 : ) -> anyhow::Result<ValueReconstructResult> {
314 606260 : ensure!(lsn_range.start >= self.start_lsn);
315 606260 : let mut need_image = true;
316 606260 :
317 606260 : let ctx = RequestContextBuilder::extend(ctx)
318 606260 : .page_content_kind(PageContentKind::InMemoryLayer)
319 606260 : .build();
320 :
321 606260 : let inner = self.inner.read().await;
322 :
323 606260 : let reader = inner.file.block_cursor();
324 :
325 : // Scan the page versions backwards, starting from `lsn`.
326 606260 : if let Some(vec_map) = inner.index.get(&key) {
327 496893 : let slice = vec_map.slice_range(lsn_range);
328 496903 : for (entry_lsn, pos) in slice.iter().rev() {
329 496903 : let buf = reader.read_blob(*pos, &ctx).await?;
330 496903 : let value = Value::des(&buf)?;
331 496903 : match value {
332 496883 : Value::Image(img) => {
333 496883 : reconstruct_state.img = Some((*entry_lsn, img));
334 496883 : return Ok(ValueReconstructResult::Complete);
335 : }
336 20 : Value::WalRecord(rec) => {
337 20 : let will_init = rec.will_init();
338 20 : reconstruct_state.records.push((*entry_lsn, rec));
339 20 : if will_init {
340 : // This WAL record initializes the page, so no need to go further back
341 0 : need_image = false;
342 0 : break;
343 20 : }
344 : }
345 : }
346 : }
347 109367 : }
348 :
349 : // release lock on 'inner'
350 :
351 : // If an older page image is needed to reconstruct the page, let the
352 : // caller know.
353 109377 : if need_image {
354 109377 : Ok(ValueReconstructResult::Continue)
355 : } else {
356 0 : Ok(ValueReconstructResult::Complete)
357 : }
358 606260 : }
359 :
360 : // Look up the keys in the provided keyspace and update
361 : // the reconstruct state with whatever is found.
362 : //
363 : // If the key is cached, go no further than the cached Lsn.
364 14 : pub(crate) async fn get_values_reconstruct_data(
365 14 : &self,
366 14 : keyspace: KeySpace,
367 14 : end_lsn: Lsn,
368 14 : reconstruct_state: &mut ValuesReconstructState,
369 14 : ctx: &RequestContext,
370 14 : ) -> Result<(), GetVectoredError> {
371 14 : let ctx = RequestContextBuilder::extend(ctx)
372 14 : .page_content_kind(PageContentKind::InMemoryLayer)
373 14 : .build();
374 :
375 14 : let inner = self.inner.read().await;
376 14 : let reader = inner.file.block_cursor();
377 14 :
378 14 : #[derive(Eq, PartialEq, Ord, PartialOrd)]
379 14 : struct BlockRead {
380 14 : key: Key,
381 14 : lsn: Lsn,
382 14 : block_offset: u64,
383 14 : }
384 14 :
385 14 : let mut planned_block_reads = BinaryHeap::new();
386 :
387 14 : for range in keyspace.ranges.iter() {
388 2018 : for (key, vec_map) in inner.index.range(range.start..range.end) {
389 2018 : let lsn_range = match reconstruct_state.get_cached_lsn(key) {
390 0 : Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
391 2018 : None => self.start_lsn..end_lsn,
392 : };
393 :
394 2018 : let slice = vec_map.slice_range(lsn_range);
395 2022 : for (entry_lsn, pos) in slice.iter().rev() {
396 2022 : planned_block_reads.push(BlockRead {
397 2022 : key: *key,
398 2022 : lsn: *entry_lsn,
399 2022 : block_offset: *pos,
400 2022 : });
401 2022 : }
402 : }
403 : }
404 :
405 14 : let keyspace_size = keyspace.total_raw_size();
406 14 :
407 14 : let mut completed_keys = HashSet::new();
408 2036 : while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
409 2022 : let block_read = planned_block_reads.pop().unwrap();
410 2022 : if completed_keys.contains(&block_read.key) {
411 4 : continue;
412 2018 : }
413 :
414 : // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
415 2018 : let buf = reader.read_blob(block_read.block_offset, &ctx).await;
416 2018 : if let Err(e) = buf {
417 0 : reconstruct_state
418 0 : .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
419 0 : completed_keys.insert(block_read.key);
420 0 : continue;
421 2018 : }
422 2018 :
423 2018 : let value = Value::des(&buf.unwrap());
424 2018 : if let Err(e) = value {
425 0 : reconstruct_state
426 0 : .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
427 0 : completed_keys.insert(block_read.key);
428 0 : continue;
429 2018 : }
430 2018 :
431 2018 : let key_situation =
432 2018 : reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
433 2018 : if key_situation == ValueReconstructSituation::Complete {
434 2018 : completed_keys.insert(block_read.key);
435 2018 : }
436 : }
437 :
438 14 : reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
439 14 :
440 14 : Ok(())
441 14 : }
442 : }
443 :
444 3472 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
445 3472 : write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
446 3472 : }
447 :
448 2358 : fn inmem_layer_log_display(
449 2358 : mut f: impl Write,
450 2358 : timeline: TimelineId,
451 2358 : start_lsn: Lsn,
452 2358 : end_lsn: Lsn,
453 2358 : ) -> std::fmt::Result {
454 2358 : write!(f, "timeline {} in-memory ", timeline)?;
455 2358 : inmem_layer_display(f, start_lsn, end_lsn)
456 2358 : }
457 :
458 : impl std::fmt::Display for InMemoryLayer {
459 1114 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460 1114 : let end_lsn = self.end_lsn_or_max();
461 1114 : inmem_layer_display(f, self.start_lsn, end_lsn)
462 1114 : }
463 : }
464 :
465 : impl InMemoryLayer {
466 : /// Get layer size.
467 1244 : pub async fn size(&self) -> Result<u64> {
468 1244 : let inner = self.inner.read().await;
469 1244 : Ok(inner.file.len())
470 1244 : }
471 :
472 : /// Create a new, empty, in-memory layer
473 1244 : pub async fn create(
474 1244 : conf: &'static PageServerConf,
475 1244 : timeline_id: TimelineId,
476 1244 : tenant_shard_id: TenantShardId,
477 1244 : start_lsn: Lsn,
478 1244 : ctx: &RequestContext,
479 1244 : ) -> Result<InMemoryLayer> {
480 1244 : trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
481 :
482 1244 : let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
483 1244 : let key = InMemoryLayerFileId(file.page_cache_file_id());
484 1244 :
485 1244 : Ok(InMemoryLayer {
486 1244 : file_id: key,
487 1244 : local_path_str: {
488 1244 : let mut buf = String::new();
489 1244 : inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
490 1244 : buf.into()
491 1244 : },
492 1244 : frozen_local_path_str: OnceLock::new(),
493 1244 : conf,
494 1244 : timeline_id,
495 1244 : tenant_shard_id,
496 1244 : start_lsn,
497 1244 : end_lsn: OnceLock::new(),
498 1244 : opened_at: Instant::now(),
499 1244 : inner: RwLock::new(InMemoryLayerInner {
500 1244 : index: BTreeMap::new(),
501 1244 : file,
502 1244 : resource_units: GlobalResourceUnits::new(),
503 1244 : }),
504 1244 : })
505 1244 : }
506 :
507 : // Write operations
508 :
509 : /// Common subroutine of the public put_wal_record() and put_page_image() functions.
510 : /// Adds the page version to the in-memory tree
511 :
512 5090450 : pub(crate) async fn put_value(
513 5090450 : &self,
514 5090450 : key: Key,
515 5090450 : lsn: Lsn,
516 5090450 : buf: &[u8],
517 5090450 : ctx: &RequestContext,
518 5090450 : ) -> Result<()> {
519 5090450 : let mut inner = self.inner.write().await;
520 5090450 : self.assert_writable();
521 5090450 : self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
522 5090450 : }
523 :
524 5090450 : async fn put_value_locked(
525 5090450 : &self,
526 5090450 : locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
527 5090450 : key: Key,
528 5090450 : lsn: Lsn,
529 5090450 : buf: &[u8],
530 5090450 : ctx: &RequestContext,
531 5090450 : ) -> Result<()> {
532 5090450 : trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
533 :
534 5090450 : let off = {
535 5090450 : locked_inner
536 5090450 : .file
537 5090450 : .write_blob(
538 5090450 : buf,
539 5090450 : &RequestContextBuilder::extend(ctx)
540 5090450 : .page_content_kind(PageContentKind::InMemoryLayer)
541 5090450 : .build(),
542 5090450 : )
543 3577 : .await?
544 : };
545 :
546 5090450 : let vec_map = locked_inner.index.entry(key).or_default();
547 5090450 : let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
548 5090450 : if old.is_some() {
549 : // We already had an entry for this LSN. That's odd..
550 0 : warn!("Key {} at {} already exists", key, lsn);
551 5090450 : }
552 :
553 5090450 : let size = locked_inner.file.len();
554 5090450 : locked_inner.resource_units.maybe_publish_size(size);
555 5090450 :
556 5090450 : Ok(())
557 5090450 : }
558 :
559 4803026 : pub(crate) fn get_opened_at(&self) -> Instant {
560 4803026 : self.opened_at
561 4803026 : }
562 :
563 0 : pub(crate) async fn tick(&self) -> Option<u64> {
564 0 : let mut inner = self.inner.write().await;
565 0 : let size = inner.file.len();
566 0 : inner.resource_units.publish_size(size)
567 0 : }
568 :
569 2 : pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
570 2 : // TODO: Currently, we just leak the storage for any deleted keys
571 2 : Ok(())
572 2 : }
573 :
574 : /// Records the end_lsn for non-dropped layers.
575 : /// `end_lsn` is exclusive
576 1114 : pub async fn freeze(&self, end_lsn: Lsn) {
577 1114 : let inner = self.inner.write().await;
578 :
579 1114 : assert!(
580 1114 : self.start_lsn < end_lsn,
581 0 : "{} >= {}",
582 : self.start_lsn,
583 : end_lsn
584 : );
585 1114 : self.end_lsn.set(end_lsn).expect("end_lsn set only once");
586 1114 :
587 1114 : self.frozen_local_path_str
588 1114 : .set({
589 1114 : let mut buf = String::new();
590 1114 : inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
591 1114 : .unwrap();
592 1114 : buf.into()
593 1114 : })
594 1114 : .expect("frozen_local_path_str set only once");
595 :
596 4255741 : for vec_map in inner.index.values() {
597 4386684 : for (lsn, _pos) in vec_map.as_slice() {
598 4386684 : assert!(*lsn < end_lsn);
599 : }
600 : }
601 1114 : }
602 :
603 : /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
604 : /// layer will only contain the key range the user specifies, and may return `None`
605 : /// if there are no matching keys.
606 : ///
607 : /// Returns a new delta layer with all the same data as this in-memory layer
608 1114 : pub(crate) async fn write_to_disk(
609 1114 : &self,
610 1114 : timeline: &Arc<Timeline>,
611 1114 : ctx: &RequestContext,
612 1114 : key_range: Option<Range<Key>>,
613 1114 : ) -> Result<Option<ResidentLayer>> {
614 : // Grab the lock in read-mode. We hold it over the I/O, but because this
615 : // layer is not writeable anymore, no one should be trying to acquire the
616 : // write lock on it, so we shouldn't block anyone. There's one exception
617 : // though: another thread might have grabbed a reference to this layer
618 : // in `get_layer_for_write' just before the checkpointer called
619 : // `freeze`, and then `write_to_disk` on it. When the thread gets the
620 : // lock, it will see that it's not writeable anymore and retry, but it
621 : // would have to wait until we release it. That race condition is very
622 : // rare though, so we just accept the potential latency hit for now.
623 1114 : let inner = self.inner.read().await;
624 :
625 1114 : let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
626 : use l0_flush::Inner;
627 1114 : let _concurrency_permit = match &*l0_flush_global_state {
628 1114 : Inner::PageCached => None,
629 0 : Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
630 : };
631 :
632 1114 : let end_lsn = *self.end_lsn.get().unwrap();
633 :
634 1114 : let key_count = if let Some(key_range) = key_range {
635 146 : inner
636 146 : .index
637 146 : .iter()
638 1166 : .filter(|(k, _)| key_range.contains(k))
639 146 : .count()
640 : } else {
641 968 : inner.index.len()
642 : };
643 1114 : if key_count == 0 {
644 146 : return Ok(None);
645 968 : }
646 :
647 968 : let mut delta_layer_writer = DeltaLayerWriter::new(
648 968 : self.conf,
649 968 : self.timeline_id,
650 968 : self.tenant_shard_id,
651 968 : Key::MIN,
652 968 : self.start_lsn..end_lsn,
653 968 : ctx,
654 968 : )
655 496 : .await?;
656 :
657 968 : match &*l0_flush_global_state {
658 : l0_flush::Inner::PageCached => {
659 968 : let ctx = RequestContextBuilder::extend(ctx)
660 968 : .page_content_kind(PageContentKind::InMemoryLayer)
661 968 : .build();
662 968 :
663 968 : let mut buf = Vec::new();
664 968 :
665 968 : let cursor = inner.file.block_cursor();
666 :
667 4254575 : for (key, vec_map) in inner.index.iter() {
668 : // Write all page versions
669 4385518 : for (lsn, pos) in vec_map.as_slice() {
670 4385518 : cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
671 4385518 : let will_init = Value::des(&buf)?.will_init();
672 : let res;
673 4385518 : (buf, res) = delta_layer_writer
674 4385518 : .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
675 2729 : .await;
676 4385518 : res?;
677 : }
678 : }
679 : }
680 : l0_flush::Inner::Direct { .. } => {
681 0 : let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
682 0 : assert_eq!(
683 0 : file_contents.len() % PAGE_SZ,
684 : 0,
685 0 : "needed by BlockReaderRef::Slice"
686 : );
687 0 : assert_eq!(file_contents.len(), {
688 0 : let written = usize::try_from(inner.file.len()).unwrap();
689 0 : if written % PAGE_SZ == 0 {
690 0 : written
691 : } else {
692 0 : written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
693 : }
694 : });
695 :
696 0 : let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
697 0 :
698 0 : let mut buf = Vec::new();
699 :
700 0 : for (key, vec_map) in inner.index.iter() {
701 : // Write all page versions
702 0 : for (lsn, pos) in vec_map.as_slice() {
703 : // TODO: once we have blob lengths in the in-memory index, we can
704 : // 1. get rid of the blob_io / BlockReaderRef::Slice business and
705 : // 2. load the file contents into a Bytes and
706 : // 3. the use `Bytes::slice` to get the `buf` that is our blob
707 : // 4. pass that `buf` into `put_value_bytes`
708 : // => https://github.com/neondatabase/neon/issues/8183
709 0 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
710 0 : let will_init = Value::des(&buf)?.will_init();
711 : let res;
712 0 : (buf, res) = delta_layer_writer
713 0 : .put_value_bytes(*key, *lsn, buf, will_init, ctx)
714 0 : .await;
715 0 : res?;
716 : }
717 : }
718 :
719 : // Hold the permit until the IO is done; if we didn't, one could drop this future,
720 : // thereby releasing the permit, but the Vec<u8> remains allocated until the IO completes.
721 : // => we'd have more concurrenct Vec<u8> than allowed as per the semaphore.
722 0 : drop(_concurrency_permit);
723 : }
724 : }
725 :
726 : // MAX is used here because we identify L0 layers by full key range
727 6668 : let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
728 968 : Ok(Some(delta_layer))
729 1114 : }
730 : }
|