Line data Source code
1 : //! An in-memory layer stores recently received key-value pairs.
2 : //!
3 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
4 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
5 : //! its position in the file, is kept in memory, though.
6 : //!
7 : use crate::config::PageServerConf;
8 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
9 : use crate::page_cache::PAGE_SZ;
10 : use crate::repository::{Key, Value};
11 : use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
12 : use crate::tenant::ephemeral_file::EphemeralFile;
13 : use crate::tenant::storage_layer::ValueReconstructResult;
14 : use crate::tenant::timeline::GetVectoredError;
15 : use crate::tenant::{PageReconstructError, Timeline};
16 : use crate::{l0_flush, page_cache, walrecord};
17 : use anyhow::{anyhow, ensure, Result};
18 : use pageserver_api::keyspace::KeySpace;
19 : use pageserver_api::models::InMemoryLayerInfo;
20 : use pageserver_api::shard::TenantShardId;
21 : use std::collections::BTreeMap;
22 : use std::sync::{Arc, OnceLock};
23 : use std::time::Instant;
24 : use tracing::*;
25 : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
26 : // avoid binding to Write (conflicts with std::io::Write)
27 : // while being able to use std::fmt::Write's methods
28 : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
29 : use std::cmp::Ordering;
30 : use std::fmt::Write;
31 : use std::ops::Range;
32 : use std::sync::atomic::Ordering as AtomicOrdering;
33 : use std::sync::atomic::{AtomicU64, AtomicUsize};
34 : use tokio::sync::{RwLock, RwLockWriteGuard};
35 :
36 : use super::{
37 : DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
38 : ValuesReconstructState,
39 : };
40 :
41 : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
42 : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
43 :
44 : pub struct InMemoryLayer {
45 : conf: &'static PageServerConf,
46 : tenant_shard_id: TenantShardId,
47 : timeline_id: TimelineId,
48 : file_id: InMemoryLayerFileId,
49 :
50 : /// This layer contains all the changes from 'start_lsn'. The
51 : /// start is inclusive.
52 : start_lsn: Lsn,
53 :
54 : /// Frozen layers have an exclusive end LSN.
55 : /// Writes are only allowed when this is `None`.
56 : pub(crate) end_lsn: OnceLock<Lsn>,
57 :
58 : /// Used for traversal path. Cached representation of the in-memory layer before frozen.
59 : local_path_str: Arc<str>,
60 :
61 : /// Used for traversal path. Cached representation of the in-memory layer after frozen.
62 : frozen_local_path_str: OnceLock<Arc<str>>,
63 :
64 : opened_at: Instant,
65 :
66 : /// The above fields never change, except for `end_lsn`, which is only set once.
67 : /// All other changing parts are in `inner`, and protected by a mutex.
68 : inner: RwLock<InMemoryLayerInner>,
69 : }
70 :
71 : impl std::fmt::Debug for InMemoryLayer {
72 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 0 : f.debug_struct("InMemoryLayer")
74 0 : .field("start_lsn", &self.start_lsn)
75 0 : .field("end_lsn", &self.end_lsn)
76 0 : .field("inner", &self.inner)
77 0 : .finish()
78 0 : }
79 : }
80 :
81 : pub struct InMemoryLayerInner {
82 : /// All versions of all pages in the layer are kept here. Indexed
83 : /// by block number and LSN. The value is an offset into the
84 : /// ephemeral file where the page version is stored.
85 : index: BTreeMap<Key, VecMap<Lsn, u64>>,
86 :
87 : /// The values are stored in a serialized format in this file.
88 : /// Each serialized Value is preceded by a 'u32' length field.
89 : /// PerSeg::page_versions map stores offsets into this file.
90 : file: EphemeralFile,
91 :
92 : resource_units: GlobalResourceUnits,
93 : }
94 :
95 : impl std::fmt::Debug for InMemoryLayerInner {
96 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 0 : f.debug_struct("InMemoryLayerInner").finish()
98 0 : }
99 : }
100 :
101 : /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
102 : /// to minimize contention.
103 : ///
104 : /// This global state is used to implement behaviors that require a global view of the system, e.g.
105 : /// rolling layers proactively to limit the total amount of dirty data.
106 : pub(crate) struct GlobalResources {
107 : // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
108 : // Zero means unlimited.
109 : pub(crate) max_dirty_bytes: AtomicU64,
110 : // How many bytes are in all EphemeralFile objects
111 : dirty_bytes: AtomicU64,
112 : // How many layers are contributing to dirty_bytes
113 : dirty_layers: AtomicUsize,
114 : }
115 :
116 : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
117 : struct GlobalResourceUnits {
118 : // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
119 : // for decrementing the global counter by this many bytes when dropped.
120 : dirty_bytes: u64,
121 : }
122 :
123 : impl GlobalResourceUnits {
124 : // Hint for the layer append path to update us when the layer size differs from the last
125 : // call to update_size by this much. If we don't reach this threshold, we'll still get
126 : // updated when the Timeline "ticks" in the background.
127 : const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
128 :
129 1256 : fn new() -> Self {
130 1256 : GLOBAL_RESOURCES
131 1256 : .dirty_layers
132 1256 : .fetch_add(1, AtomicOrdering::Relaxed);
133 1256 : Self { dirty_bytes: 0 }
134 1256 : }
135 :
136 : /// Do not call this frequently: all timelines will write to these same global atomics,
137 : /// so this is a relatively expensive operation. Wait at least a few seconds between calls.
138 : ///
139 : /// Returns the effective layer size limit that should be applied, if any, to keep
140 : /// the total number of dirty bytes below the configured maximum.
141 1138 : fn publish_size(&mut self, size: u64) -> Option<u64> {
142 1138 : let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
143 1126 : Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
144 : Ordering::Greater => {
145 10 : let delta = size - self.dirty_bytes;
146 10 : let old = GLOBAL_RESOURCES
147 10 : .dirty_bytes
148 10 : .fetch_add(delta, AtomicOrdering::Relaxed);
149 10 : old + delta
150 : }
151 : Ordering::Less => {
152 2 : let delta = self.dirty_bytes - size;
153 2 : let old = GLOBAL_RESOURCES
154 2 : .dirty_bytes
155 2 : .fetch_sub(delta, AtomicOrdering::Relaxed);
156 2 : old - delta
157 : }
158 : };
159 :
160 : // This is a sloppy update: concurrent updates to the counter will race, and the exact
161 : // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
162 : // That's okay: as long as the metric contains some recent value, it doesn't have to always
163 : // be literally the last update.
164 1138 : TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
165 1138 :
166 1138 : self.dirty_bytes = size;
167 1138 :
168 1138 : let max_dirty_bytes = GLOBAL_RESOURCES
169 1138 : .max_dirty_bytes
170 1138 : .load(AtomicOrdering::Relaxed);
171 1138 : if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
172 : // Set the layer file limit to the average layer size: this implies that all above-average
173 : // sized layers will be elegible for freezing. They will be frozen in the order they
174 : // next enter publish_size.
175 0 : Some(
176 0 : new_global_dirty_bytes
177 0 : / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
178 0 : )
179 : } else {
180 1138 : None
181 : }
182 1138 : }
183 :
184 : // Call publish_size if the input size differs from last published size by more than
185 : // the drift limit
186 5090546 : fn maybe_publish_size(&mut self, size: u64) {
187 5090546 : let publish = match size.cmp(&self.dirty_bytes) {
188 0 : Ordering::Equal => false,
189 5090546 : Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
190 0 : Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
191 : };
192 :
193 5090546 : if publish {
194 10 : self.publish_size(size);
195 5090536 : }
196 5090546 : }
197 : }
198 :
199 : impl Drop for GlobalResourceUnits {
200 1128 : fn drop(&mut self) {
201 1128 : GLOBAL_RESOURCES
202 1128 : .dirty_layers
203 1128 : .fetch_sub(1, AtomicOrdering::Relaxed);
204 1128 :
205 1128 : // Subtract our contribution to the global total dirty bytes
206 1128 : self.publish_size(0);
207 1128 : }
208 : }
209 :
210 : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
211 : max_dirty_bytes: AtomicU64::new(0),
212 : dirty_bytes: AtomicU64::new(0),
213 : dirty_layers: AtomicUsize::new(0),
214 : };
215 :
216 : impl InMemoryLayer {
217 606197 : pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
218 606197 : self.file_id
219 606197 : }
220 :
221 1126 : pub(crate) fn get_timeline_id(&self) -> TimelineId {
222 1126 : self.timeline_id
223 1126 : }
224 :
225 0 : pub(crate) fn info(&self) -> InMemoryLayerInfo {
226 0 : let lsn_start = self.start_lsn;
227 :
228 0 : if let Some(&lsn_end) = self.end_lsn.get() {
229 0 : InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
230 : } else {
231 0 : InMemoryLayerInfo::Open { lsn_start }
232 : }
233 0 : }
234 :
235 0 : pub(crate) fn try_len(&self) -> Option<u64> {
236 0 : self.inner.try_read().map(|i| i.file.len()).ok()
237 0 : }
238 :
239 5090546 : pub(crate) fn assert_writable(&self) {
240 5090546 : assert!(self.end_lsn.get().is_none());
241 5090546 : }
242 :
243 1519345 : pub(crate) fn end_lsn_or_max(&self) -> Lsn {
244 1519345 : self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
245 1519345 : }
246 :
247 1518219 : pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
248 1518219 : self.start_lsn..self.end_lsn_or_max()
249 1518219 : }
250 :
251 0 : pub(crate) fn local_path_str(&self) -> &Arc<str> {
252 0 : self.frozen_local_path_str
253 0 : .get()
254 0 : .unwrap_or(&self.local_path_str)
255 0 : }
256 :
257 : /// debugging function to print out the contents of the layer
258 : ///
259 : /// this is likely completly unused
260 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
261 0 : let inner = self.inner.read().await;
262 :
263 0 : let end_str = self.end_lsn_or_max();
264 0 :
265 0 : println!(
266 0 : "----- in-memory layer for tli {} LSNs {}-{} ----",
267 0 : self.timeline_id, self.start_lsn, end_str,
268 0 : );
269 0 :
270 0 : if !verbose {
271 0 : return Ok(());
272 0 : }
273 0 :
274 0 : let cursor = inner.file.block_cursor();
275 0 : let mut buf = Vec::new();
276 0 : for (key, vec_map) in inner.index.iter() {
277 0 : for (lsn, pos) in vec_map.as_slice() {
278 0 : let mut desc = String::new();
279 0 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
280 0 : let val = Value::des(&buf);
281 0 : match val {
282 0 : Ok(Value::Image(img)) => {
283 0 : write!(&mut desc, " img {} bytes", img.len())?;
284 : }
285 0 : Ok(Value::WalRecord(rec)) => {
286 0 : let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
287 0 : write!(
288 0 : &mut desc,
289 0 : " rec {} bytes will_init: {} {}",
290 0 : buf.len(),
291 0 : rec.will_init(),
292 0 : wal_desc
293 0 : )?;
294 : }
295 0 : Err(err) => {
296 0 : write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
297 : }
298 : }
299 0 : println!(" key {} at {}: {}", key, lsn, desc);
300 : }
301 : }
302 :
303 0 : Ok(())
304 0 : }
305 :
306 : /// Look up given value in the layer.
307 0 : pub(crate) async fn get_value_reconstruct_data(
308 0 : &self,
309 0 : key: Key,
310 0 : lsn_range: Range<Lsn>,
311 0 : reconstruct_state: &mut ValueReconstructState,
312 0 : ctx: &RequestContext,
313 0 : ) -> anyhow::Result<ValueReconstructResult> {
314 0 : ensure!(lsn_range.start >= self.start_lsn);
315 0 : let mut need_image = true;
316 0 :
317 0 : let ctx = RequestContextBuilder::extend(ctx)
318 0 : .page_content_kind(PageContentKind::InMemoryLayer)
319 0 : .build();
320 :
321 0 : let inner = self.inner.read().await;
322 :
323 0 : let reader = inner.file.block_cursor();
324 :
325 : // Scan the page versions backwards, starting from `lsn`.
326 0 : if let Some(vec_map) = inner.index.get(&key) {
327 0 : let slice = vec_map.slice_range(lsn_range);
328 0 : for (entry_lsn, pos) in slice.iter().rev() {
329 0 : let buf = reader.read_blob(*pos, &ctx).await?;
330 0 : let value = Value::des(&buf)?;
331 0 : match value {
332 0 : Value::Image(img) => {
333 0 : reconstruct_state.img = Some((*entry_lsn, img));
334 0 : return Ok(ValueReconstructResult::Complete);
335 : }
336 0 : Value::WalRecord(rec) => {
337 0 : let will_init = rec.will_init();
338 0 : reconstruct_state.records.push((*entry_lsn, rec));
339 0 : if will_init {
340 : // This WAL record initializes the page, so no need to go further back
341 0 : need_image = false;
342 0 : break;
343 0 : }
344 : }
345 : }
346 : }
347 0 : }
348 :
349 : // release lock on 'inner'
350 :
351 : // If an older page image is needed to reconstruct the page, let the
352 : // caller know.
353 0 : if need_image {
354 0 : Ok(ValueReconstructResult::Continue)
355 : } else {
356 0 : Ok(ValueReconstructResult::Complete)
357 : }
358 0 : }
359 :
360 : // Look up the keys in the provided keyspace and update
361 : // the reconstruct state with whatever is found.
362 : //
363 : // If the key is cached, go no further than the cached Lsn.
364 606197 : pub(crate) async fn get_values_reconstruct_data(
365 606197 : &self,
366 606197 : keyspace: KeySpace,
367 606197 : end_lsn: Lsn,
368 606197 : reconstruct_state: &mut ValuesReconstructState,
369 606197 : ctx: &RequestContext,
370 606197 : ) -> Result<(), GetVectoredError> {
371 606197 : let ctx = RequestContextBuilder::extend(ctx)
372 606197 : .page_content_kind(PageContentKind::InMemoryLayer)
373 606197 : .build();
374 :
375 606197 : let inner = self.inner.read().await;
376 606197 : let reader = inner.file.block_cursor();
377 :
378 606197 : for range in keyspace.ranges.iter() {
379 606197 : for (key, vec_map) in inner.index.range(range.start..range.end) {
380 499107 : let lsn_range = match reconstruct_state.get_cached_lsn(key) {
381 0 : Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
382 499107 : None => self.start_lsn..end_lsn,
383 : };
384 :
385 499107 : let slice = vec_map.slice_range(lsn_range);
386 :
387 499117 : for (entry_lsn, pos) in slice.iter().rev() {
388 : // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
389 499117 : let buf = reader.read_blob(*pos, &ctx).await;
390 499117 : if let Err(e) = buf {
391 0 : reconstruct_state
392 0 : .on_key_error(*key, PageReconstructError::from(anyhow!(e)));
393 0 : break;
394 499117 : }
395 499117 :
396 499117 : let value = Value::des(&buf.unwrap());
397 499117 : if let Err(e) = value {
398 0 : reconstruct_state
399 0 : .on_key_error(*key, PageReconstructError::from(anyhow!(e)));
400 0 : break;
401 499117 : }
402 499117 :
403 499117 : let key_situation =
404 499117 : reconstruct_state.update_key(key, *entry_lsn, value.unwrap());
405 499117 : if key_situation == ValueReconstructSituation::Complete {
406 499097 : break;
407 20 : }
408 : }
409 : }
410 : }
411 :
412 606197 : reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
413 606197 :
414 606197 : Ok(())
415 606197 : }
416 : }
417 :
418 3508 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
419 3508 : write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
420 3508 : }
421 :
422 2382 : fn inmem_layer_log_display(
423 2382 : mut f: impl Write,
424 2382 : timeline: TimelineId,
425 2382 : start_lsn: Lsn,
426 2382 : end_lsn: Lsn,
427 2382 : ) -> std::fmt::Result {
428 2382 : write!(f, "timeline {} in-memory ", timeline)?;
429 2382 : inmem_layer_display(f, start_lsn, end_lsn)
430 2382 : }
431 :
432 : impl std::fmt::Display for InMemoryLayer {
433 1126 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 1126 : let end_lsn = self.end_lsn_or_max();
435 1126 : inmem_layer_display(f, self.start_lsn, end_lsn)
436 1126 : }
437 : }
438 :
439 : impl InMemoryLayer {
440 : /// Get layer size.
441 1256 : pub async fn size(&self) -> Result<u64> {
442 1256 : let inner = self.inner.read().await;
443 1256 : Ok(inner.file.len())
444 1256 : }
445 :
446 : /// Create a new, empty, in-memory layer
447 1256 : pub async fn create(
448 1256 : conf: &'static PageServerConf,
449 1256 : timeline_id: TimelineId,
450 1256 : tenant_shard_id: TenantShardId,
451 1256 : start_lsn: Lsn,
452 1256 : ctx: &RequestContext,
453 1256 : ) -> Result<InMemoryLayer> {
454 1256 : trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
455 :
456 1256 : let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
457 1256 : let key = InMemoryLayerFileId(file.page_cache_file_id());
458 1256 :
459 1256 : Ok(InMemoryLayer {
460 1256 : file_id: key,
461 1256 : local_path_str: {
462 1256 : let mut buf = String::new();
463 1256 : inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
464 1256 : buf.into()
465 1256 : },
466 1256 : frozen_local_path_str: OnceLock::new(),
467 1256 : conf,
468 1256 : timeline_id,
469 1256 : tenant_shard_id,
470 1256 : start_lsn,
471 1256 : end_lsn: OnceLock::new(),
472 1256 : opened_at: Instant::now(),
473 1256 : inner: RwLock::new(InMemoryLayerInner {
474 1256 : index: BTreeMap::new(),
475 1256 : file,
476 1256 : resource_units: GlobalResourceUnits::new(),
477 1256 : }),
478 1256 : })
479 1256 : }
480 :
481 : // Write operations
482 :
483 : /// Common subroutine of the public put_wal_record() and put_page_image() functions.
484 : /// Adds the page version to the in-memory tree
485 :
486 5090546 : pub(crate) async fn put_value(
487 5090546 : &self,
488 5090546 : key: Key,
489 5090546 : lsn: Lsn,
490 5090546 : buf: &[u8],
491 5090546 : ctx: &RequestContext,
492 5090546 : ) -> Result<()> {
493 5090546 : let mut inner = self.inner.write().await;
494 5090546 : self.assert_writable();
495 5090546 : self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
496 5090546 : }
497 :
498 5090546 : async fn put_value_locked(
499 5090546 : &self,
500 5090546 : locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
501 5090546 : key: Key,
502 5090546 : lsn: Lsn,
503 5090546 : buf: &[u8],
504 5090546 : ctx: &RequestContext,
505 5090546 : ) -> Result<()> {
506 5090546 : trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
507 :
508 5090546 : let off = {
509 5090546 : locked_inner
510 5090546 : .file
511 5090546 : .write_blob(
512 5090546 : buf,
513 5090546 : &RequestContextBuilder::extend(ctx)
514 5090546 : .page_content_kind(PageContentKind::InMemoryLayer)
515 5090546 : .build(),
516 5090546 : )
517 3320 : .await?
518 : };
519 :
520 5090546 : let vec_map = locked_inner.index.entry(key).or_default();
521 5090546 : let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
522 5090546 : if old.is_some() {
523 : // We already had an entry for this LSN. That's odd..
524 0 : warn!("Key {} at {} already exists", key, lsn);
525 5090546 : }
526 :
527 5090546 : let size = locked_inner.file.len();
528 5090546 : locked_inner.resource_units.maybe_publish_size(size);
529 5090546 :
530 5090546 : Ok(())
531 5090546 : }
532 :
533 4803026 : pub(crate) fn get_opened_at(&self) -> Instant {
534 4803026 : self.opened_at
535 4803026 : }
536 :
537 0 : pub(crate) async fn tick(&self) -> Option<u64> {
538 0 : let mut inner = self.inner.write().await;
539 0 : let size = inner.file.len();
540 0 : inner.resource_units.publish_size(size)
541 0 : }
542 :
543 2 : pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
544 2 : // TODO: Currently, we just leak the storage for any deleted keys
545 2 : Ok(())
546 2 : }
547 :
548 : /// Records the end_lsn for non-dropped layers.
549 : /// `end_lsn` is exclusive
550 1126 : pub async fn freeze(&self, end_lsn: Lsn) {
551 1126 : let inner = self.inner.write().await;
552 :
553 1126 : assert!(
554 1126 : self.start_lsn < end_lsn,
555 0 : "{} >= {}",
556 : self.start_lsn,
557 : end_lsn
558 : );
559 1126 : self.end_lsn.set(end_lsn).expect("end_lsn set only once");
560 1126 :
561 1126 : self.frozen_local_path_str
562 1126 : .set({
563 1126 : let mut buf = String::new();
564 1126 : inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
565 1126 : .unwrap();
566 1126 : buf.into()
567 1126 : })
568 1126 : .expect("frozen_local_path_str set only once");
569 :
570 4255938 : for vec_map in inner.index.values() {
571 4386780 : for (lsn, _pos) in vec_map.as_slice() {
572 4386780 : assert!(*lsn < end_lsn);
573 : }
574 : }
575 1126 : }
576 :
577 : /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
578 : /// layer will only contain the key range the user specifies, and may return `None`
579 : /// if there are no matching keys.
580 : ///
581 : /// Returns a new delta layer with all the same data as this in-memory layer
582 1126 : pub(crate) async fn write_to_disk(
583 1126 : &self,
584 1126 : timeline: &Arc<Timeline>,
585 1126 : ctx: &RequestContext,
586 1126 : key_range: Option<Range<Key>>,
587 1126 : ) -> Result<Option<ResidentLayer>> {
588 : // Grab the lock in read-mode. We hold it over the I/O, but because this
589 : // layer is not writeable anymore, no one should be trying to acquire the
590 : // write lock on it, so we shouldn't block anyone. There's one exception
591 : // though: another thread might have grabbed a reference to this layer
592 : // in `get_layer_for_write' just before the checkpointer called
593 : // `freeze`, and then `write_to_disk` on it. When the thread gets the
594 : // lock, it will see that it's not writeable anymore and retry, but it
595 : // would have to wait until we release it. That race condition is very
596 : // rare though, so we just accept the potential latency hit for now.
597 1126 : let inner = self.inner.read().await;
598 :
599 1126 : let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
600 : use l0_flush::Inner;
601 1126 : let _concurrency_permit = match &*l0_flush_global_state {
602 0 : Inner::PageCached => None,
603 1126 : Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
604 : };
605 :
606 1126 : let end_lsn = *self.end_lsn.get().unwrap();
607 :
608 1126 : let key_count = if let Some(key_range) = key_range {
609 158 : inner
610 158 : .index
611 158 : .iter()
612 1262 : .filter(|(k, _)| key_range.contains(k))
613 158 : .count()
614 : } else {
615 968 : inner.index.len()
616 : };
617 1126 : if key_count == 0 {
618 158 : return Ok(None);
619 968 : }
620 :
621 968 : let mut delta_layer_writer = DeltaLayerWriter::new(
622 968 : self.conf,
623 968 : self.timeline_id,
624 968 : self.tenant_shard_id,
625 968 : Key::MIN,
626 968 : self.start_lsn..end_lsn,
627 968 : ctx,
628 968 : )
629 491 : .await?;
630 :
631 968 : match &*l0_flush_global_state {
632 : l0_flush::Inner::PageCached => {
633 0 : let ctx = RequestContextBuilder::extend(ctx)
634 0 : .page_content_kind(PageContentKind::InMemoryLayer)
635 0 : .build();
636 0 :
637 0 : let mut buf = Vec::new();
638 0 :
639 0 : let cursor = inner.file.block_cursor();
640 :
641 0 : for (key, vec_map) in inner.index.iter() {
642 : // Write all page versions
643 0 : for (lsn, pos) in vec_map.as_slice() {
644 0 : cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
645 0 : let will_init = Value::des(&buf)?.will_init();
646 : let res;
647 0 : (buf, res) = delta_layer_writer
648 0 : .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
649 0 : .await;
650 0 : res?;
651 : }
652 : }
653 : }
654 : l0_flush::Inner::Direct { .. } => {
655 968 : let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
656 968 : assert_eq!(
657 968 : file_contents.len() % PAGE_SZ,
658 : 0,
659 0 : "needed by BlockReaderRef::Slice"
660 : );
661 968 : assert_eq!(file_contents.len(), {
662 968 : let written = usize::try_from(inner.file.len()).unwrap();
663 968 : if written % PAGE_SZ == 0 {
664 0 : written
665 : } else {
666 968 : written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
667 : }
668 : });
669 :
670 968 : let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
671 968 :
672 968 : let mut buf = Vec::new();
673 :
674 4254676 : for (key, vec_map) in inner.index.iter() {
675 : // Write all page versions
676 4385518 : for (lsn, pos) in vec_map.as_slice() {
677 : // TODO: once we have blob lengths in the in-memory index, we can
678 : // 1. get rid of the blob_io / BlockReaderRef::Slice business and
679 : // 2. load the file contents into a Bytes and
680 : // 3. the use `Bytes::slice` to get the `buf` that is our blob
681 : // 4. pass that `buf` into `put_value_bytes`
682 : // => https://github.com/neondatabase/neon/issues/8183
683 4385518 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
684 4385518 : let will_init = Value::des(&buf)?.will_init();
685 : let res;
686 4385518 : (buf, res) = delta_layer_writer
687 4385518 : .put_value_bytes(*key, *lsn, buf, will_init, ctx)
688 2728 : .await;
689 4385518 : res?;
690 : }
691 : }
692 : }
693 : }
694 :
695 : // MAX is used here because we identify L0 layers by full key range
696 6657 : let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
697 :
698 : // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
699 : //
700 : // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
701 : // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
702 : // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
703 : //
704 : // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
705 : // we dirtied when writing to the filesystem have been flushed and marked !dirty.
706 968 : drop(_concurrency_permit);
707 968 :
708 968 : Ok(Some(delta_layer))
709 1126 : }
710 : }
|