Line data Source code
1 : //! An in-memory layer stores recently received key-value pairs.
2 : //!
3 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
4 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
5 : //! its position in the file, is kept in memory, though.
6 : //!
7 : use crate::config::PageServerConf;
8 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
9 : use crate::page_cache::PAGE_SZ;
10 : use crate::repository::{Key, Value};
11 : use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
12 : use crate::tenant::ephemeral_file::EphemeralFile;
13 : use crate::tenant::timeline::GetVectoredError;
14 : use crate::tenant::PageReconstructError;
15 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
16 : use crate::{l0_flush, page_cache};
17 : use anyhow::{anyhow, Result};
18 : use camino::Utf8PathBuf;
19 : use pageserver_api::key::CompactKey;
20 : use pageserver_api::keyspace::KeySpace;
21 : use pageserver_api::models::InMemoryLayerInfo;
22 : use pageserver_api::shard::TenantShardId;
23 : use std::collections::BTreeMap;
24 : use std::sync::{Arc, OnceLock};
25 : use std::time::Instant;
26 : use tracing::*;
27 : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
28 : // avoid binding to Write (conflicts with std::io::Write)
29 : // while being able to use std::fmt::Write's methods
30 : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
31 : use std::cmp::Ordering;
32 : use std::fmt::Write;
33 : use std::ops::Range;
34 : use std::sync::atomic::Ordering as AtomicOrdering;
35 : use std::sync::atomic::{AtomicU64, AtomicUsize};
36 : use tokio::sync::{RwLock, RwLockWriteGuard};
37 :
38 : use super::{
39 : DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
40 : };
41 :
42 : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
43 : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
44 :
45 : pub struct InMemoryLayer {
46 : conf: &'static PageServerConf,
47 : tenant_shard_id: TenantShardId,
48 : timeline_id: TimelineId,
49 : file_id: InMemoryLayerFileId,
50 :
51 : /// This layer contains all the changes from 'start_lsn'. The
52 : /// start is inclusive.
53 : start_lsn: Lsn,
54 :
55 : /// Frozen layers have an exclusive end LSN.
56 : /// Writes are only allowed when this is `None`.
57 : pub(crate) end_lsn: OnceLock<Lsn>,
58 :
59 : /// Used for traversal path. Cached representation of the in-memory layer after frozen.
60 : frozen_local_path_str: OnceLock<Arc<str>>,
61 :
62 : opened_at: Instant,
63 :
64 : /// The above fields never change, except for `end_lsn`, which is only set once.
65 : /// All other changing parts are in `inner`, and protected by a mutex.
66 : inner: RwLock<InMemoryLayerInner>,
67 : }
68 :
69 : impl std::fmt::Debug for InMemoryLayer {
70 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 0 : f.debug_struct("InMemoryLayer")
72 0 : .field("start_lsn", &self.start_lsn)
73 0 : .field("end_lsn", &self.end_lsn)
74 0 : .field("inner", &self.inner)
75 0 : .finish()
76 0 : }
77 : }
78 :
79 : pub struct InMemoryLayerInner {
80 : /// All versions of all pages in the layer are kept here. Indexed
81 : /// by block number and LSN. The value is an offset into the
82 : /// ephemeral file where the page version is stored.
83 : index: BTreeMap<CompactKey, VecMap<Lsn, u64>>,
84 :
85 : /// The values are stored in a serialized format in this file.
86 : /// Each serialized Value is preceded by a 'u32' length field.
87 : /// PerSeg::page_versions map stores offsets into this file.
88 : file: EphemeralFile,
89 :
90 : resource_units: GlobalResourceUnits,
91 : }
92 :
93 : impl std::fmt::Debug for InMemoryLayerInner {
94 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 0 : f.debug_struct("InMemoryLayerInner").finish()
96 0 : }
97 : }
98 :
99 : /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
100 : /// to minimize contention.
101 : ///
102 : /// This global state is used to implement behaviors that require a global view of the system, e.g.
103 : /// rolling layers proactively to limit the total amount of dirty data.
104 : pub(crate) struct GlobalResources {
105 : // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
106 : // Zero means unlimited.
107 : pub(crate) max_dirty_bytes: AtomicU64,
108 : // How many bytes are in all EphemeralFile objects
109 : dirty_bytes: AtomicU64,
110 : // How many layers are contributing to dirty_bytes
111 : dirty_layers: AtomicUsize,
112 : }
113 :
114 : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
115 : struct GlobalResourceUnits {
116 : // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
117 : // for decrementing the global counter by this many bytes when dropped.
118 : dirty_bytes: u64,
119 : }
120 :
121 : impl GlobalResourceUnits {
122 : // Hint for the layer append path to update us when the layer size differs from the last
123 : // call to update_size by this much. If we don't reach this threshold, we'll still get
124 : // updated when the Timeline "ticks" in the background.
125 : const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
126 :
127 1264 : fn new() -> Self {
128 1264 : GLOBAL_RESOURCES
129 1264 : .dirty_layers
130 1264 : .fetch_add(1, AtomicOrdering::Relaxed);
131 1264 : Self { dirty_bytes: 0 }
132 1264 : }
133 :
134 : /// Do not call this frequently: all timelines will write to these same global atomics,
135 : /// so this is a relatively expensive operation. Wait at least a few seconds between calls.
136 : ///
137 : /// Returns the effective layer size limit that should be applied, if any, to keep
138 : /// the total number of dirty bytes below the configured maximum.
139 1146 : fn publish_size(&mut self, size: u64) -> Option<u64> {
140 1146 : let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
141 1134 : Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
142 : Ordering::Greater => {
143 10 : let delta = size - self.dirty_bytes;
144 10 : let old = GLOBAL_RESOURCES
145 10 : .dirty_bytes
146 10 : .fetch_add(delta, AtomicOrdering::Relaxed);
147 10 : old + delta
148 : }
149 : Ordering::Less => {
150 2 : let delta = self.dirty_bytes - size;
151 2 : let old = GLOBAL_RESOURCES
152 2 : .dirty_bytes
153 2 : .fetch_sub(delta, AtomicOrdering::Relaxed);
154 2 : old - delta
155 : }
156 : };
157 :
158 : // This is a sloppy update: concurrent updates to the counter will race, and the exact
159 : // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
160 : // That's okay: as long as the metric contains some recent value, it doesn't have to always
161 : // be literally the last update.
162 1146 : TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
163 1146 :
164 1146 : self.dirty_bytes = size;
165 1146 :
166 1146 : let max_dirty_bytes = GLOBAL_RESOURCES
167 1146 : .max_dirty_bytes
168 1146 : .load(AtomicOrdering::Relaxed);
169 1146 : if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
170 : // Set the layer file limit to the average layer size: this implies that all above-average
171 : // sized layers will be elegible for freezing. They will be frozen in the order they
172 : // next enter publish_size.
173 0 : Some(
174 0 : new_global_dirty_bytes
175 0 : / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
176 0 : )
177 : } else {
178 1146 : None
179 : }
180 1146 : }
181 :
182 : // Call publish_size if the input size differs from last published size by more than
183 : // the drift limit
184 5090610 : fn maybe_publish_size(&mut self, size: u64) {
185 5090610 : let publish = match size.cmp(&self.dirty_bytes) {
186 0 : Ordering::Equal => false,
187 5090610 : Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
188 0 : Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
189 : };
190 :
191 5090610 : if publish {
192 10 : self.publish_size(size);
193 5090600 : }
194 5090610 : }
195 : }
196 :
197 : impl Drop for GlobalResourceUnits {
198 1136 : fn drop(&mut self) {
199 1136 : GLOBAL_RESOURCES
200 1136 : .dirty_layers
201 1136 : .fetch_sub(1, AtomicOrdering::Relaxed);
202 1136 :
203 1136 : // Subtract our contribution to the global total dirty bytes
204 1136 : self.publish_size(0);
205 1136 : }
206 : }
207 :
208 : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
209 : max_dirty_bytes: AtomicU64::new(0),
210 : dirty_bytes: AtomicU64::new(0),
211 : dirty_layers: AtomicUsize::new(0),
212 : };
213 :
214 : impl InMemoryLayer {
215 606143 : pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
216 606143 : self.file_id
217 606143 : }
218 :
219 1134 : pub(crate) fn get_timeline_id(&self) -> TimelineId {
220 1134 : self.timeline_id
221 1134 : }
222 :
223 0 : pub(crate) fn info(&self) -> InMemoryLayerInfo {
224 0 : let lsn_start = self.start_lsn;
225 :
226 0 : if let Some(&lsn_end) = self.end_lsn.get() {
227 0 : InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
228 : } else {
229 0 : InMemoryLayerInfo::Open { lsn_start }
230 : }
231 0 : }
232 :
233 0 : pub(crate) fn try_len(&self) -> Option<u64> {
234 0 : self.inner.try_read().map(|i| i.file.len()).ok()
235 0 : }
236 :
237 5090610 : pub(crate) fn assert_writable(&self) {
238 5090610 : assert!(self.end_lsn.get().is_none());
239 5090610 : }
240 :
241 1519603 : pub(crate) fn end_lsn_or_max(&self) -> Lsn {
242 1519603 : self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
243 1519603 : }
244 :
245 1518469 : pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
246 1518469 : self.start_lsn..self.end_lsn_or_max()
247 1518469 : }
248 :
249 : /// debugging function to print out the contents of the layer
250 : ///
251 : /// this is likely completly unused
252 0 : pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
253 0 : let end_str = self.end_lsn_or_max();
254 0 :
255 0 : println!(
256 0 : "----- in-memory layer for tli {} LSNs {}-{} ----",
257 0 : self.timeline_id, self.start_lsn, end_str,
258 0 : );
259 0 :
260 0 : Ok(())
261 0 : }
262 :
263 : // Look up the keys in the provided keyspace and update
264 : // the reconstruct state with whatever is found.
265 : //
266 : // If the key is cached, go no further than the cached Lsn.
267 606143 : pub(crate) async fn get_values_reconstruct_data(
268 606143 : &self,
269 606143 : keyspace: KeySpace,
270 606143 : end_lsn: Lsn,
271 606143 : reconstruct_state: &mut ValuesReconstructState,
272 606143 : ctx: &RequestContext,
273 606143 : ) -> Result<(), GetVectoredError> {
274 606143 : let ctx = RequestContextBuilder::extend(ctx)
275 606143 : .page_content_kind(PageContentKind::InMemoryLayer)
276 606143 : .build();
277 :
278 606143 : let inner = self.inner.read().await;
279 606143 : let reader = inner.file.block_cursor();
280 :
281 606143 : for range in keyspace.ranges.iter() {
282 606143 : for (key, vec_map) in inner
283 606143 : .index
284 606143 : .range(range.start.to_compact()..range.end.to_compact())
285 : {
286 499008 : let key = Key::from_compact(*key);
287 499008 : let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
288 0 : Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
289 499008 : None => self.start_lsn..end_lsn,
290 : };
291 :
292 499008 : let slice = vec_map.slice_range(lsn_range);
293 :
294 499018 : for (entry_lsn, pos) in slice.iter().rev() {
295 : // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
296 499018 : let buf = reader.read_blob(*pos, &ctx).await;
297 499018 : if let Err(e) = buf {
298 0 : reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
299 0 : break;
300 499018 : }
301 499018 :
302 499018 : let value = Value::des(&buf.unwrap());
303 499018 : if let Err(e) = value {
304 0 : reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
305 0 : break;
306 499018 : }
307 499018 :
308 499018 : let key_situation =
309 499018 : reconstruct_state.update_key(&key, *entry_lsn, value.unwrap());
310 499018 : if key_situation == ValueReconstructSituation::Complete {
311 498998 : break;
312 20 : }
313 : }
314 : }
315 : }
316 :
317 606143 : reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
318 606143 :
319 606143 : Ok(())
320 606143 : }
321 : }
322 :
323 2268 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
324 2268 : write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
325 2268 : }
326 :
327 1134 : fn inmem_layer_log_display(
328 1134 : mut f: impl Write,
329 1134 : timeline: TimelineId,
330 1134 : start_lsn: Lsn,
331 1134 : end_lsn: Lsn,
332 1134 : ) -> std::fmt::Result {
333 1134 : write!(f, "timeline {} in-memory ", timeline)?;
334 1134 : inmem_layer_display(f, start_lsn, end_lsn)
335 1134 : }
336 :
337 : impl std::fmt::Display for InMemoryLayer {
338 1134 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339 1134 : let end_lsn = self.end_lsn_or_max();
340 1134 : inmem_layer_display(f, self.start_lsn, end_lsn)
341 1134 : }
342 : }
343 :
344 : impl InMemoryLayer {
345 : /// Get layer size.
346 1264 : pub async fn size(&self) -> Result<u64> {
347 1264 : let inner = self.inner.read().await;
348 1264 : Ok(inner.file.len())
349 1264 : }
350 :
351 : /// Create a new, empty, in-memory layer
352 1264 : pub async fn create(
353 1264 : conf: &'static PageServerConf,
354 1264 : timeline_id: TimelineId,
355 1264 : tenant_shard_id: TenantShardId,
356 1264 : start_lsn: Lsn,
357 1264 : gate_guard: utils::sync::gate::GateGuard,
358 1264 : ctx: &RequestContext,
359 1264 : ) -> Result<InMemoryLayer> {
360 1264 : trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
361 :
362 1264 : let file =
363 1264 : EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
364 1264 : let key = InMemoryLayerFileId(file.page_cache_file_id());
365 1264 :
366 1264 : Ok(InMemoryLayer {
367 1264 : file_id: key,
368 1264 : frozen_local_path_str: OnceLock::new(),
369 1264 : conf,
370 1264 : timeline_id,
371 1264 : tenant_shard_id,
372 1264 : start_lsn,
373 1264 : end_lsn: OnceLock::new(),
374 1264 : opened_at: Instant::now(),
375 1264 : inner: RwLock::new(InMemoryLayerInner {
376 1264 : index: BTreeMap::new(),
377 1264 : file,
378 1264 : resource_units: GlobalResourceUnits::new(),
379 1264 : }),
380 1264 : })
381 1264 : }
382 :
383 : // Write operations
384 :
385 : /// Common subroutine of the public put_wal_record() and put_page_image() functions.
386 : /// Adds the page version to the in-memory tree
387 5090610 : pub async fn put_value(
388 5090610 : &self,
389 5090610 : key: CompactKey,
390 5090610 : lsn: Lsn,
391 5090610 : buf: &[u8],
392 5090610 : ctx: &RequestContext,
393 5090610 : ) -> Result<()> {
394 5090610 : let mut inner = self.inner.write().await;
395 5090610 : self.assert_writable();
396 5090610 : self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
397 5090610 : }
398 :
399 5090610 : async fn put_value_locked(
400 5090610 : &self,
401 5090610 : locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
402 5090610 : key: CompactKey,
403 5090610 : lsn: Lsn,
404 5090610 : buf: &[u8],
405 5090610 : ctx: &RequestContext,
406 5090610 : ) -> Result<()> {
407 5090610 : trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
408 :
409 5090610 : let off = {
410 5090610 : locked_inner
411 5090610 : .file
412 5090610 : .write_blob(
413 5090610 : buf,
414 5090610 : &RequestContextBuilder::extend(ctx)
415 5090610 : .page_content_kind(PageContentKind::InMemoryLayer)
416 5090610 : .build(),
417 5090610 : )
418 3318 : .await?
419 : };
420 :
421 5090610 : let vec_map = locked_inner.index.entry(key).or_default();
422 5090610 : let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
423 5090610 : if old.is_some() {
424 : // We already had an entry for this LSN. That's odd..
425 0 : warn!("Key {} at {} already exists", key, lsn);
426 5090610 : }
427 :
428 5090610 : let size = locked_inner.file.len();
429 5090610 : locked_inner.resource_units.maybe_publish_size(size);
430 5090610 :
431 5090610 : Ok(())
432 5090610 : }
433 :
434 4803026 : pub(crate) fn get_opened_at(&self) -> Instant {
435 4803026 : self.opened_at
436 4803026 : }
437 :
438 0 : pub(crate) async fn tick(&self) -> Option<u64> {
439 0 : let mut inner = self.inner.write().await;
440 0 : let size = inner.file.len();
441 0 : inner.resource_units.publish_size(size)
442 0 : }
443 :
444 2 : pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
445 2 : // TODO: Currently, we just leak the storage for any deleted keys
446 2 : Ok(())
447 2 : }
448 :
449 : /// Records the end_lsn for non-dropped layers.
450 : /// `end_lsn` is exclusive
451 1134 : pub async fn freeze(&self, end_lsn: Lsn) {
452 1134 : assert!(
453 1134 : self.start_lsn < end_lsn,
454 0 : "{} >= {}",
455 : self.start_lsn,
456 : end_lsn
457 : );
458 1134 : self.end_lsn.set(end_lsn).expect("end_lsn set only once");
459 1134 :
460 1134 : self.frozen_local_path_str
461 1134 : .set({
462 1134 : let mut buf = String::new();
463 1134 : inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
464 1134 : .unwrap();
465 1134 : buf.into()
466 1134 : })
467 1134 : .expect("frozen_local_path_str set only once");
468 :
469 : #[cfg(debug_assertions)]
470 : {
471 1134 : let inner = self.inner.write().await;
472 4255859 : for vec_map in inner.index.values() {
473 4386844 : for (lsn, _pos) in vec_map.as_slice() {
474 4386844 : assert!(*lsn < end_lsn);
475 : }
476 : }
477 : }
478 1134 : }
479 :
480 : /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
481 : /// layer will only contain the key range the user specifies, and may return `None`
482 : /// if there are no matching keys.
483 : ///
484 : /// Returns a new delta layer with all the same data as this in-memory layer
485 1134 : pub async fn write_to_disk(
486 1134 : &self,
487 1134 : ctx: &RequestContext,
488 1134 : key_range: Option<Range<Key>>,
489 1134 : l0_flush_global_state: &l0_flush::Inner,
490 1134 : ) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
491 : // Grab the lock in read-mode. We hold it over the I/O, but because this
492 : // layer is not writeable anymore, no one should be trying to acquire the
493 : // write lock on it, so we shouldn't block anyone. There's one exception
494 : // though: another thread might have grabbed a reference to this layer
495 : // in `get_layer_for_write' just before the checkpointer called
496 : // `freeze`, and then `write_to_disk` on it. When the thread gets the
497 : // lock, it will see that it's not writeable anymore and retry, but it
498 : // would have to wait until we release it. That race condition is very
499 : // rare though, so we just accept the potential latency hit for now.
500 1134 : let inner = self.inner.read().await;
501 :
502 : use l0_flush::Inner;
503 1134 : let _concurrency_permit = match l0_flush_global_state {
504 1134 : Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
505 : };
506 :
507 1134 : let end_lsn = *self.end_lsn.get().unwrap();
508 :
509 1134 : let key_count = if let Some(key_range) = key_range {
510 166 : let key_range = key_range.start.to_compact()..key_range.end.to_compact();
511 166 :
512 166 : inner
513 166 : .index
514 166 : .iter()
515 1326 : .filter(|(k, _)| key_range.contains(k))
516 166 : .count()
517 : } else {
518 968 : inner.index.len()
519 : };
520 1134 : if key_count == 0 {
521 166 : return Ok(None);
522 968 : }
523 :
524 968 : let mut delta_layer_writer = DeltaLayerWriter::new(
525 968 : self.conf,
526 968 : self.timeline_id,
527 968 : self.tenant_shard_id,
528 968 : Key::MIN,
529 968 : self.start_lsn..end_lsn,
530 968 : ctx,
531 968 : )
532 504 : .await?;
533 :
534 968 : match l0_flush_global_state {
535 : l0_flush::Inner::Direct { .. } => {
536 968 : let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
537 968 : assert_eq!(
538 968 : file_contents.len() % PAGE_SZ,
539 : 0,
540 0 : "needed by BlockReaderRef::Slice"
541 : );
542 968 : assert_eq!(file_contents.len(), {
543 968 : let written = usize::try_from(inner.file.len()).unwrap();
544 968 : if written % PAGE_SZ == 0 {
545 0 : written
546 : } else {
547 968 : written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
548 : }
549 : });
550 :
551 968 : let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
552 968 :
553 968 : let mut buf = Vec::new();
554 :
555 4254533 : for (key, vec_map) in inner.index.iter() {
556 : // Write all page versions
557 4385518 : for (lsn, pos) in vec_map.as_slice() {
558 : // TODO: once we have blob lengths in the in-memory index, we can
559 : // 1. get rid of the blob_io / BlockReaderRef::Slice business and
560 : // 2. load the file contents into a Bytes and
561 : // 3. the use `Bytes::slice` to get the `buf` that is our blob
562 : // 4. pass that `buf` into `put_value_bytes`
563 : // => https://github.com/neondatabase/neon/issues/8183
564 4385518 : cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
565 4385518 : let will_init = Value::des(&buf)?.will_init();
566 4385518 : let (tmp, res) = delta_layer_writer
567 4385518 : .put_value_bytes(
568 4385518 : Key::from_compact(*key),
569 4385518 : *lsn,
570 4385518 : buf.slice_len(),
571 4385518 : will_init,
572 4385518 : ctx,
573 4385518 : )
574 2732 : .await;
575 4385518 : res?;
576 4385518 : buf = tmp.into_raw_slice().into_inner();
577 : }
578 : }
579 : }
580 : }
581 :
582 : // MAX is used here because we identify L0 layers by full key range
583 6639 : let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
584 :
585 : // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
586 : //
587 : // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
588 : // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
589 : // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
590 : //
591 : // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
592 : // we dirtied when writing to the filesystem have been flushed and marked !dirty.
593 968 : drop(_concurrency_permit);
594 968 :
595 968 : Ok(Some((desc, path)))
596 1134 : }
597 : }
|