Line data Source code
1 : //! An in-memory layer stores recently received key-value pairs.
2 : //!
3 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
4 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
5 : //! its position in the file, is kept in memory, though.
6 : //!
7 : use std::cmp::Ordering;
8 : use std::collections::{BTreeMap, HashMap};
9 : use std::fmt::Write;
10 : use std::ops::Range;
11 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
12 : use std::sync::{Arc, OnceLock};
13 : use std::time::Instant;
14 :
15 : use anyhow::Result;
16 : use camino::Utf8PathBuf;
17 : use pageserver_api::key::{CompactKey, Key};
18 : use pageserver_api::keyspace::KeySpace;
19 : use pageserver_api::models::InMemoryLayerInfo;
20 : use pageserver_api::shard::TenantShardId;
21 : use tokio::sync::RwLock;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::*;
24 : use utils::id::TimelineId;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 : use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
28 :
29 : use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
30 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
31 : use crate::config::PageServerConf;
32 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
33 : // avoid binding to Write (conflicts with std::io::Write)
34 : // while being able to use std::fmt::Write's methods
35 : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
36 : use crate::tenant::ephemeral_file::EphemeralFile;
37 : use crate::tenant::storage_layer::{OnDiskValue, OnDiskValueIo};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
40 : use crate::{l0_flush, page_cache};
41 :
42 : pub(crate) mod vectored_dio_read;
43 :
44 : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
45 : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
46 :
47 : pub struct InMemoryLayer {
48 : conf: &'static PageServerConf,
49 : tenant_shard_id: TenantShardId,
50 : timeline_id: TimelineId,
51 : file_id: InMemoryLayerFileId,
52 :
53 : /// This layer contains all the changes from 'start_lsn'. The
54 : /// start is inclusive.
55 : start_lsn: Lsn,
56 :
57 : /// Frozen layers have an exclusive end LSN.
58 : /// Writes are only allowed when this is `None`.
59 : pub(crate) end_lsn: OnceLock<Lsn>,
60 :
61 : /// Used for traversal path. Cached representation of the in-memory layer after frozen.
62 : frozen_local_path_str: OnceLock<Arc<str>>,
63 :
64 : opened_at: Instant,
65 :
66 : /// All versions of all pages in the layer are kept here. Indexed
67 : /// by block number and LSN. The [`IndexEntry`] is an offset into the
68 : /// ephemeral file where the page version is stored.
69 : ///
70 : /// We use a separate lock for the index to reduce the critical section
71 : /// during which reads cannot be planned.
72 : ///
73 : /// Note that the file backing [`InMemoryLayer::file`] is append-only,
74 : /// so it is not necessary to hold a lock on the index while reading or writing from the file.
75 : /// In particular:
76 : /// 1. It is safe to read and release [`InMemoryLayer::index`] before reading from [`InMemoryLayer::file`].
77 : /// 2. It is safe to write to [`InMemoryLayer::file`] before locking and updating [`InMemoryLayer::index`].
78 : index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
79 :
80 : /// Wrapper for the actual on-disk file. Uses interior mutability for concurrent reads/writes.
81 : file: EphemeralFile,
82 :
83 : estimated_in_mem_size: AtomicU64,
84 : }
85 :
86 : impl std::fmt::Debug for InMemoryLayer {
87 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 0 : f.debug_struct("InMemoryLayer")
89 0 : .field("start_lsn", &self.start_lsn)
90 0 : .field("end_lsn", &self.end_lsn)
91 0 : .finish()
92 0 : }
93 : }
94 :
95 : /// Support the same max blob length as blob_io, because ultimately
96 : /// all the InMemoryLayer contents end up being written into a delta layer,
97 : /// using the [`crate::tenant::blob_io`].
98 : const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN;
99 : const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
100 : let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize;
101 : let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize;
102 : assert!(trailing_ones + leading_zeroes == std::mem::size_of::<usize>() * 8);
103 : trailing_ones
104 : };
105 :
106 : /// See [`InMemoryLayer::index`].
107 : ///
108 : /// For memory efficiency, the data is packed into a u64.
109 : ///
110 : /// Layout:
111 : /// - 1 bit: `will_init`
112 : /// - [`MAX_SUPPORTED_BLOB_LEN_BITS`][]: `len`
113 : /// - [`MAX_SUPPORTED_POS_BITS`](IndexEntry::MAX_SUPPORTED_POS_BITS): `pos`
114 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
115 : pub struct IndexEntry(u64);
116 :
117 : impl IndexEntry {
118 : /// See [`Self::MAX_SUPPORTED_POS`].
119 : const MAX_SUPPORTED_POS_BITS: usize = {
120 : let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS;
121 : if remainder < 32 {
122 : panic!("pos can be u32 as per type system, support that");
123 : }
124 : remainder
125 : };
126 : /// The maximum supported blob offset that can be represented by [`Self`].
127 : /// See also [`Self::validate_checkpoint_distance`].
128 : const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1;
129 :
130 : // Layout
131 : const WILL_INIT_RANGE: Range<usize> = 0..1;
132 : const LEN_RANGE: Range<usize> =
133 : Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS;
134 : const POS_RANGE: Range<usize> =
135 : Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS;
136 : const _ASSERT: () = {
137 : if Self::POS_RANGE.end != 64 {
138 : panic!("we don't want undefined bits for our own sanity")
139 : }
140 : };
141 :
142 : /// Fails if and only if the offset or length encoded in `arg` is too large to be represented by [`Self`].
143 : ///
144 : /// The only reason why that can happen in the system is if the [`InMemoryLayer`] grows too long.
145 : /// The [`InMemoryLayer`] size is determined by the checkpoint distance, enforced by [`crate::tenant::Timeline::should_roll`].
146 : ///
147 : /// Thus, to avoid failure of this function, whenever we start up and/or change checkpoint distance,
148 : /// call [`Self::validate_checkpoint_distance`] with the new checkpoint distance value.
149 : ///
150 : /// TODO: this check should happen ideally at config parsing time (and in the request handler when a change to checkpoint distance is requested)
151 : /// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer.
152 : #[inline(always)]
153 2565246 : fn new(arg: IndexEntryNewArgs) -> anyhow::Result<Self> {
154 : let IndexEntryNewArgs {
155 2565246 : base_offset,
156 2565246 : batch_offset,
157 2565246 : len,
158 2565246 : will_init,
159 2565246 : } = arg;
160 :
161 2565246 : let pos = base_offset
162 2565246 : .checked_add(batch_offset)
163 2565246 : .ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?;
164 :
165 2565246 : if pos.into_usize() > Self::MAX_SUPPORTED_POS {
166 4 : anyhow::bail!(
167 4 : "base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}",
168 : max = Self::MAX_SUPPORTED_POS
169 : );
170 2565242 : }
171 :
172 2565242 : if len > MAX_SUPPORTED_BLOB_LEN {
173 1 : anyhow::bail!(
174 1 : "len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}",
175 : );
176 2565241 : }
177 :
178 2565241 : let mut data: u64 = 0;
179 : use bit_field::BitField;
180 2565241 : data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 });
181 2565241 : data.set_bits(Self::LEN_RANGE, len.into_u64());
182 2565241 : data.set_bits(Self::POS_RANGE, pos);
183 :
184 2565241 : Ok(Self(data))
185 2565246 : }
186 :
187 : #[inline(always)]
188 3149664 : fn unpack(&self) -> IndexEntryUnpacked {
189 : use bit_field::BitField;
190 3149664 : IndexEntryUnpacked {
191 3149664 : will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0,
192 3149664 : len: self.0.get_bits(Self::LEN_RANGE),
193 3149664 : pos: self.0.get_bits(Self::POS_RANGE),
194 3149664 : }
195 3149664 : }
196 :
197 : /// See [`Self::new`].
198 137 : pub(crate) const fn validate_checkpoint_distance(
199 137 : checkpoint_distance: u64,
200 137 : ) -> Result<(), &'static str> {
201 137 : if checkpoint_distance > Self::MAX_SUPPORTED_POS as u64 {
202 0 : return Err("exceeds the maximum supported value");
203 137 : }
204 137 : let res = u64_to_usize(checkpoint_distance).checked_add(MAX_SUPPORTED_BLOB_LEN);
205 137 : if res.is_none() {
206 0 : return Err(
207 0 : "checkpoint distance + max supported blob len overflows in-memory addition",
208 0 : );
209 137 : }
210 :
211 : // NB: it is ok for the result of the addition to be larger than MAX_SUPPORTED_POS
212 :
213 137 : Ok(())
214 137 : }
215 :
216 : const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
217 : let res = Self::validate_checkpoint_distance(
218 : pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE,
219 : );
220 : if res.is_err() {
221 : panic!("default checkpoint distance is valid")
222 : }
223 : };
224 : }
225 :
226 : /// Args to [`IndexEntry::new`].
227 : #[derive(Clone, Copy)]
228 : struct IndexEntryNewArgs {
229 : base_offset: u64,
230 : batch_offset: u64,
231 : len: usize,
232 : will_init: bool,
233 : }
234 :
235 : /// Unpacked representation of the bitfielded [`IndexEntry`].
236 : #[derive(Clone, Copy, PartialEq, Eq, Debug)]
237 : struct IndexEntryUnpacked {
238 : will_init: bool,
239 : len: u64,
240 : pos: u64,
241 : }
242 :
243 : /// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
244 : /// to minimize contention.
245 : ///
246 : /// This global state is used to implement behaviors that require a global view of the system, e.g.
247 : /// rolling layers proactively to limit the total amount of dirty data.
248 : pub(crate) struct GlobalResources {
249 : // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
250 : // Zero means unlimited.
251 : pub(crate) max_dirty_bytes: AtomicU64,
252 : // How many bytes are in all EphemeralFile objects
253 : dirty_bytes: AtomicU64,
254 : // How many layers are contributing to dirty_bytes
255 : dirty_layers: AtomicUsize,
256 : }
257 :
258 : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
259 : pub(crate) struct GlobalResourceUnits {
260 : // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
261 : // for decrementing the global counter by this many bytes when dropped.
262 : dirty_bytes: u64,
263 : }
264 :
265 : impl GlobalResourceUnits {
266 : // Hint for the layer append path to update us when the layer size differs from the last
267 : // call to update_size by this much. If we don't reach this threshold, we'll still get
268 : // updated when the Timeline "ticks" in the background.
269 : const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
270 :
271 669 : pub(crate) fn new() -> Self {
272 669 : GLOBAL_RESOURCES
273 669 : .dirty_layers
274 669 : .fetch_add(1, AtomicOrdering::Relaxed);
275 669 : Self { dirty_bytes: 0 }
276 669 : }
277 :
278 : /// Do not call this frequently: all timelines will write to these same global atomics,
279 : /// so this is a relatively expensive operation. Wait at least a few seconds between calls.
280 : ///
281 : /// Returns the effective layer size limit that should be applied, if any, to keep
282 : /// the total number of dirty bytes below the configured maximum.
283 606 : pub(crate) fn publish_size(&mut self, size: u64) -> Option<u64> {
284 606 : let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
285 601 : Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
286 : Ordering::Greater => {
287 4 : let delta = size - self.dirty_bytes;
288 4 : let old = GLOBAL_RESOURCES
289 4 : .dirty_bytes
290 4 : .fetch_add(delta, AtomicOrdering::Relaxed);
291 4 : old + delta
292 : }
293 : Ordering::Less => {
294 1 : let delta = self.dirty_bytes - size;
295 1 : let old = GLOBAL_RESOURCES
296 1 : .dirty_bytes
297 1 : .fetch_sub(delta, AtomicOrdering::Relaxed);
298 1 : old - delta
299 : }
300 : };
301 :
302 : // This is a sloppy update: concurrent updates to the counter will race, and the exact
303 : // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
304 : // That's okay: as long as the metric contains some recent value, it doesn't have to always
305 : // be literally the last update.
306 606 : TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
307 :
308 606 : self.dirty_bytes = size;
309 :
310 606 : let max_dirty_bytes = GLOBAL_RESOURCES
311 606 : .max_dirty_bytes
312 606 : .load(AtomicOrdering::Relaxed);
313 606 : if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
314 : // Set the layer file limit to the average layer size: this implies that all above-average
315 : // sized layers will be elegible for freezing. They will be frozen in the order they
316 : // next enter publish_size.
317 0 : Some(
318 0 : new_global_dirty_bytes
319 0 : / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
320 0 : )
321 : } else {
322 606 : None
323 : }
324 606 : }
325 :
326 : // Call publish_size if the input size differs from last published size by more than
327 : // the drift limit
328 2402454 : pub(crate) fn maybe_publish_size(&mut self, size: u64) {
329 2402454 : let publish = match size.cmp(&self.dirty_bytes) {
330 0 : Ordering::Equal => false,
331 2402454 : Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
332 0 : Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
333 : };
334 :
335 2402454 : if publish {
336 4 : self.publish_size(size);
337 2402450 : }
338 2402454 : }
339 : }
340 :
341 : impl Drop for GlobalResourceUnits {
342 602 : fn drop(&mut self) {
343 602 : GLOBAL_RESOURCES
344 602 : .dirty_layers
345 602 : .fetch_sub(1, AtomicOrdering::Relaxed);
346 :
347 : // Subtract our contribution to the global total dirty bytes
348 602 : self.publish_size(0);
349 602 : }
350 : }
351 :
352 : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
353 : max_dirty_bytes: AtomicU64::new(0),
354 : dirty_bytes: AtomicU64::new(0),
355 : dirty_layers: AtomicUsize::new(0),
356 : };
357 :
358 : impl InMemoryLayer {
359 311161 : pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
360 311161 : self.file_id
361 311161 : }
362 :
363 601 : pub(crate) fn get_timeline_id(&self) -> TimelineId {
364 601 : self.timeline_id
365 601 : }
366 :
367 1197 : pub(crate) fn info(&self) -> InMemoryLayerInfo {
368 1197 : let lsn_start = self.start_lsn;
369 :
370 1197 : if let Some(&lsn_end) = self.end_lsn.get() {
371 1196 : InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
372 : } else {
373 1 : InMemoryLayerInfo::Open { lsn_start }
374 : }
375 1197 : }
376 :
377 1853 : pub(crate) fn len(&self) -> u64 {
378 1853 : self.file.len()
379 1853 : }
380 :
381 2402132 : pub(crate) fn assert_writable(&self) {
382 2402132 : assert!(self.end_lsn.get().is_none());
383 2402132 : }
384 :
385 1212060 : pub(crate) fn end_lsn_or_max(&self) -> Lsn {
386 1212060 : self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
387 1212060 : }
388 :
389 1211463 : pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
390 1211463 : self.start_lsn..self.end_lsn_or_max()
391 1211463 : }
392 :
393 : /// debugging function to print out the contents of the layer
394 : ///
395 : /// this is likely completly unused
396 0 : pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
397 0 : let end_str = self.end_lsn_or_max();
398 :
399 0 : println!(
400 0 : "----- in-memory layer for tli {} LSNs {}-{} ----",
401 : self.timeline_id, self.start_lsn, end_str,
402 : );
403 :
404 0 : Ok(())
405 0 : }
406 :
407 : // Look up the keys in the provided keyspace and update
408 : // the reconstruct state with whatever is found.
409 307317 : pub async fn get_values_reconstruct_data(
410 307317 : self: &Arc<InMemoryLayer>,
411 307317 : keyspace: KeySpace,
412 307317 : lsn_range: Range<Lsn>,
413 307317 : reconstruct_state: &mut ValuesReconstructState,
414 307317 : ctx: &RequestContext,
415 307317 : ) -> Result<(), GetVectoredError> {
416 307317 : let ctx = RequestContextBuilder::from(ctx)
417 307317 : .page_content_kind(PageContentKind::InMemoryLayer)
418 307317 : .attached_child();
419 :
420 307317 : let index = self.index.read().await;
421 :
422 : struct ValueRead {
423 : entry_lsn: Lsn,
424 : read: vectored_dio_read::LogicalRead<Vec<u8>>,
425 : }
426 307317 : let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
427 307317 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
428 :
429 310966 : for range in keyspace.ranges.iter() {
430 310966 : for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
431 264074 : let key = Key::from_compact(*key);
432 264074 : let slice = vec_map.slice_range(lsn_range.clone());
433 :
434 956884 : for (entry_lsn, index_entry) in slice.iter().rev() {
435 : let IndexEntryUnpacked {
436 956884 : pos,
437 956884 : len,
438 956884 : will_init,
439 956884 : } = index_entry.unpack();
440 :
441 956884 : reads.entry(key).or_default().push(ValueRead {
442 956884 : entry_lsn: *entry_lsn,
443 956884 : read: vectored_dio_read::LogicalRead::new(
444 956884 : pos,
445 956884 : Vec::with_capacity(len as usize),
446 956884 : ),
447 956884 : });
448 :
449 956884 : let io = reconstruct_state.update_key(&key, *entry_lsn, will_init);
450 956884 : ios.insert((key, *entry_lsn), io);
451 :
452 956884 : if will_init {
453 253722 : break;
454 703162 : }
455 : }
456 : }
457 : }
458 307317 : drop(index); // release the lock before we spawn the IO
459 307317 : let read_from = Arc::clone(self);
460 307317 : let read_ctx = ctx.attached_child();
461 307317 : reconstruct_state
462 307317 : .spawn_io(async move {
463 307317 : let f = vectored_dio_read::execute(
464 307317 : &read_from.file,
465 307317 : reads
466 307317 : .iter()
467 307317 : .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
468 307317 : &read_ctx,
469 : );
470 307317 : send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
471 307317 : .await;
472 :
473 571389 : for (key, value_reads) in reads {
474 1220956 : for ValueRead { entry_lsn, read } in value_reads {
475 956884 : let io = ios.remove(&(key, entry_lsn)).expect("sender must exist");
476 956884 : match read.into_result().expect("we run execute() above") {
477 0 : Err(e) => {
478 0 : io.complete(Err(std::io::Error::new(
479 0 : e.kind(),
480 0 : "dio vec read failed",
481 0 : )));
482 0 : }
483 956884 : Ok(value_buf) => {
484 956884 : io.complete(Ok(OnDiskValue::WalRecordOrImage(value_buf.into())));
485 956884 : }
486 : }
487 : }
488 : }
489 :
490 307317 : assert!(ios.is_empty());
491 :
492 : // Keep layer existent until this IO is done;
493 : // This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
494 : // but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
495 : // drop for consistency among all three layer types.
496 307317 : drop(read_from);
497 307317 : })
498 307317 : .await;
499 :
500 307317 : Ok(())
501 307317 : }
502 : }
503 :
504 1198 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
505 1198 : write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
506 1198 : }
507 :
508 601 : fn inmem_layer_log_display(
509 601 : mut f: impl Write,
510 601 : timeline: TimelineId,
511 601 : start_lsn: Lsn,
512 601 : end_lsn: Lsn,
513 601 : ) -> std::fmt::Result {
514 601 : write!(f, "timeline {timeline} in-memory ")?;
515 601 : inmem_layer_display(f, start_lsn, end_lsn)
516 601 : }
517 :
518 : impl std::fmt::Display for InMemoryLayer {
519 597 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520 597 : let end_lsn = self.end_lsn_or_max();
521 597 : inmem_layer_display(f, self.start_lsn, end_lsn)
522 597 : }
523 : }
524 :
525 : impl InMemoryLayer {
526 630 : pub fn estimated_in_mem_size(&self) -> u64 {
527 630 : self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
528 630 : }
529 :
530 : /// Create a new, empty, in-memory layer
531 665 : pub async fn create(
532 665 : conf: &'static PageServerConf,
533 665 : timeline_id: TimelineId,
534 665 : tenant_shard_id: TenantShardId,
535 665 : start_lsn: Lsn,
536 665 : gate: &utils::sync::gate::Gate,
537 665 : cancel: &CancellationToken,
538 665 : ctx: &RequestContext,
539 665 : ) -> Result<InMemoryLayer> {
540 665 : trace!(
541 0 : "initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"
542 : );
543 :
544 665 : let file =
545 665 : EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, cancel, ctx).await?;
546 665 : let key = InMemoryLayerFileId(file.page_cache_file_id());
547 :
548 665 : Ok(InMemoryLayer {
549 665 : file_id: key,
550 665 : frozen_local_path_str: OnceLock::new(),
551 665 : conf,
552 665 : timeline_id,
553 665 : tenant_shard_id,
554 665 : start_lsn,
555 665 : end_lsn: OnceLock::new(),
556 665 : opened_at: Instant::now(),
557 665 : index: RwLock::new(BTreeMap::new()),
558 665 : file,
559 665 : estimated_in_mem_size: AtomicU64::new(0),
560 665 : })
561 665 : }
562 :
563 : /// Write path.
564 : ///
565 : /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
566 : /// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
567 : ///
568 : /// This method shall not be called concurrently. We enforce this property via [`crate::tenant::Timeline::write_lock`].
569 : ///
570 : /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
571 2402132 : pub async fn put_batch(
572 2402132 : &self,
573 2402132 : serialized_batch: SerializedValueBatch,
574 2402132 : ctx: &RequestContext,
575 2402132 : ) -> anyhow::Result<()> {
576 2402132 : self.assert_writable();
577 :
578 2402132 : let base_offset = self.file.len();
579 :
580 : let SerializedValueBatch {
581 2402132 : raw,
582 2402132 : metadata,
583 : max_lsn: _,
584 : len: _,
585 2402132 : } = serialized_batch;
586 :
587 : // Write the batch to the file
588 2402132 : self.file.write_raw(&raw, ctx).await?;
589 2402132 : let new_size = self.file.len();
590 :
591 2402132 : let expected_new_len = base_offset
592 2402132 : .checked_add(raw.len().into_u64())
593 : // write_raw would error if we were to overflow u64.
594 : // also IndexEntry and higher levels in
595 : //the code don't allow the file to grow that large
596 2402132 : .unwrap();
597 2402132 : assert_eq!(new_size, expected_new_len);
598 :
599 : // Update the index with the new entries
600 2402132 : let mut index = self.index.write().await;
601 :
602 4967353 : for meta in metadata {
603 : let SerializedValueMeta {
604 2565221 : key,
605 2565221 : lsn,
606 2565221 : batch_offset,
607 2565221 : len,
608 2565221 : will_init,
609 2565221 : } = match meta {
610 2565221 : ValueMeta::Serialized(ser) => ser,
611 : ValueMeta::Observed(_) => {
612 0 : continue;
613 : }
614 : };
615 :
616 : // Add the base_offset to the batch's index entries which are relative to the batch start.
617 2565221 : let index_entry = IndexEntry::new(IndexEntryNewArgs {
618 2565221 : base_offset,
619 2565221 : batch_offset,
620 2565221 : len,
621 2565221 : will_init,
622 2565221 : })?;
623 :
624 2565221 : let vec_map = index.entry(key).or_default();
625 2565221 : let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
626 2565221 : if old.is_some() {
627 : // This should not break anything, but is unexpected: ingestion code aims to filter out
628 : // multiple writes to the same key at the same LSN. This happens in cases where our
629 : // ingenstion code generates some write like an empty page, and we see a write from postgres
630 : // to the same key in the same wal record. If one such write makes it through, we
631 : // index the most recent write, implicitly ignoring the earlier write. We log a warning
632 : // because this case is unexpected, and we would like tests to fail if this happens.
633 0 : warn!("Key {} at {} written twice at same LSN", key, lsn);
634 2565221 : }
635 2565221 : self.estimated_in_mem_size.fetch_add(
636 2565221 : (std::mem::size_of::<CompactKey>()
637 2565221 : + std::mem::size_of::<Lsn>()
638 2565221 : + std::mem::size_of::<IndexEntry>()) as u64,
639 2565221 : AtomicOrdering::Relaxed,
640 : );
641 : }
642 :
643 2402132 : Ok(())
644 2402132 : }
645 :
646 2401506 : pub(crate) fn get_opened_at(&self) -> Instant {
647 2401506 : self.opened_at
648 2401506 : }
649 :
650 0 : pub(crate) fn tick(&self) -> Option<u64> {
651 0 : self.file.tick()
652 0 : }
653 :
654 1 : pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
655 : // TODO: Currently, we just leak the storage for any deleted keys
656 1 : Ok(())
657 1 : }
658 :
659 : /// Records the end_lsn for non-dropped layers.
660 : /// `end_lsn` is exclusive
661 : ///
662 : /// A note on locking:
663 : /// The current API of [`InMemoryLayer`] does not ensure that there's no ongoing
664 : /// writes while freezing the layer. This is enforced at a higher level via
665 : /// [`crate::tenant::Timeline::write_lock`]. Freeze might be called via two code paths:
666 : /// 1. Via the active [`crate::tenant::timeline::TimelineWriter`]. This holds the
667 : /// Timeline::write_lock for its lifetime. The rolling is handled in
668 : /// [`crate::tenant::timeline::TimelineWriter::put_batch`]. It's a &mut self function
669 : /// so can't be called from different threads.
670 : /// 2. In the background via [`crate::tenant::Timeline::maybe_freeze_ephemeral_layer`].
671 : /// This only proceeds if try_lock on Timeline::write_lock succeeds (i.e. there's no active writer),
672 : /// hence there can be no concurrent writes
673 601 : pub async fn freeze(&self, end_lsn: Lsn) {
674 601 : assert!(
675 601 : self.start_lsn < end_lsn,
676 0 : "{} >= {}",
677 : self.start_lsn,
678 : end_lsn
679 : );
680 601 : self.end_lsn.set(end_lsn).expect("end_lsn set only once");
681 :
682 601 : self.frozen_local_path_str
683 601 : .set({
684 601 : let mut buf = String::new();
685 601 : inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
686 601 : .unwrap();
687 601 : buf.into()
688 : })
689 601 : .expect("frozen_local_path_str set only once");
690 :
691 : #[cfg(debug_assertions)]
692 : {
693 601 : let index = self.index.read().await;
694 2128372 : for vec_map in index.values() {
695 2213338 : for (lsn, _) in vec_map.as_slice() {
696 2213338 : assert!(*lsn < end_lsn);
697 : }
698 : }
699 : }
700 601 : }
701 :
702 : /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
703 : /// layer will only contain the key range the user specifies, and may return `None`
704 : /// if there are no matching keys.
705 : ///
706 : /// Returns a new delta layer with all the same data as this in-memory layer
707 486 : pub async fn write_to_disk(
708 486 : &self,
709 486 : ctx: &RequestContext,
710 486 : key_range: Option<Range<Key>>,
711 486 : l0_flush_global_state: &l0_flush::Inner,
712 486 : gate: &utils::sync::gate::Gate,
713 486 : cancel: CancellationToken,
714 486 : ) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
715 486 : let index = self.index.read().await;
716 :
717 : use l0_flush::Inner;
718 486 : let _concurrency_permit = match l0_flush_global_state {
719 486 : Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
720 : };
721 :
722 486 : let end_lsn = *self.end_lsn.get().unwrap();
723 :
724 486 : let key_count = if let Some(key_range) = key_range {
725 0 : let key_range = key_range.start.to_compact()..key_range.end.to_compact();
726 :
727 0 : index.iter().filter(|(k, _)| key_range.contains(k)).count()
728 : } else {
729 486 : index.len()
730 : };
731 486 : if key_count == 0 {
732 0 : return Ok(None);
733 486 : }
734 :
735 486 : let mut delta_layer_writer = DeltaLayerWriter::new(
736 486 : self.conf,
737 486 : self.timeline_id,
738 486 : self.tenant_shard_id,
739 486 : Key::MIN,
740 486 : self.start_lsn..end_lsn,
741 486 : gate,
742 486 : cancel,
743 486 : ctx,
744 486 : )
745 486 : .await?;
746 :
747 486 : match l0_flush_global_state {
748 : l0_flush::Inner::Direct { .. } => {
749 486 : let file_contents = self.file.load_to_io_buf(ctx).await?;
750 486 : let file_contents = file_contents.freeze();
751 :
752 2127396 : for (key, vec_map) in index.iter() {
753 : // Write all page versions
754 2192760 : for (lsn, entry) in vec_map
755 2127396 : .as_slice()
756 2127396 : .iter()
757 2192760 : .map(|(lsn, entry)| (lsn, entry.unpack()))
758 : {
759 : let IndexEntryUnpacked {
760 2192760 : pos,
761 2192760 : len,
762 2192760 : will_init,
763 2192760 : } = entry;
764 2192760 : let buf = file_contents.slice(pos as usize..(pos + len) as usize);
765 2192760 : let (_buf, res) = delta_layer_writer
766 2192760 : .put_value_bytes(
767 2192760 : Key::from_compact(*key),
768 2192760 : *lsn,
769 2192760 : buf.slice_len(),
770 2192760 : will_init,
771 2192760 : ctx,
772 : )
773 2192760 : .await;
774 2192760 : res?;
775 : }
776 : }
777 : }
778 : }
779 :
780 : // MAX is used here because we identify L0 layers by full key range
781 486 : let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
782 :
783 : // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
784 : //
785 : // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
786 : // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
787 : // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
788 : //
789 : // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
790 : // we dirtied when writing to the filesystem have been flushed and marked !dirty.
791 486 : drop(_concurrency_permit);
792 :
793 486 : Ok(Some((desc, path)))
794 486 : }
795 : }
796 :
797 : #[cfg(test)]
798 : mod tests {
799 : use super::*;
800 :
801 : #[test]
802 1 : fn test_index_entry() {
803 : const MAX_SUPPORTED_POS: usize = IndexEntry::MAX_SUPPORTED_POS;
804 : use {IndexEntryNewArgs as Args, IndexEntryUnpacked as Unpacked};
805 :
806 20 : let roundtrip = |args, expect: Unpacked| {
807 20 : let res = IndexEntry::new(args).expect("this tests expects no errors");
808 : let IndexEntryUnpacked {
809 20 : will_init,
810 20 : len,
811 20 : pos,
812 20 : } = res.unpack();
813 20 : assert_eq!(will_init, expect.will_init);
814 20 : assert_eq!(len, expect.len);
815 20 : assert_eq!(pos, expect.pos);
816 20 : };
817 :
818 : // basic roundtrip
819 3 : for pos in [0, MAX_SUPPORTED_POS] {
820 6 : for len in [0, MAX_SUPPORTED_BLOB_LEN] {
821 12 : for will_init in [true, false] {
822 8 : let expect = Unpacked {
823 8 : will_init,
824 8 : len: len.into_u64(),
825 8 : pos: pos.into_u64(),
826 8 : };
827 8 : roundtrip(
828 8 : Args {
829 8 : will_init,
830 8 : base_offset: pos.into_u64(),
831 8 : batch_offset: 0,
832 8 : len,
833 8 : },
834 8 : expect,
835 8 : );
836 8 : roundtrip(
837 8 : Args {
838 8 : will_init,
839 8 : base_offset: 0,
840 8 : batch_offset: pos.into_u64(),
841 8 : len,
842 8 : },
843 8 : expect,
844 8 : );
845 8 : }
846 : }
847 : }
848 :
849 : // too-large len
850 1 : let too_large = Args {
851 1 : will_init: false,
852 1 : len: MAX_SUPPORTED_BLOB_LEN + 1,
853 1 : base_offset: 0,
854 1 : batch_offset: 0,
855 1 : };
856 1 : assert!(IndexEntry::new(too_large).is_err());
857 :
858 : // too-large pos
859 : {
860 1 : let too_large = Args {
861 1 : will_init: false,
862 1 : len: 0,
863 1 : base_offset: MAX_SUPPORTED_POS.into_u64() + 1,
864 1 : batch_offset: 0,
865 1 : };
866 1 : assert!(IndexEntry::new(too_large).is_err());
867 1 : let too_large = Args {
868 1 : will_init: false,
869 1 : len: 0,
870 1 : base_offset: 0,
871 1 : batch_offset: MAX_SUPPORTED_POS.into_u64() + 1,
872 1 : };
873 1 : assert!(IndexEntry::new(too_large).is_err());
874 : }
875 :
876 : // too large (base_offset + batch_offset)
877 : {
878 1 : let too_large = Args {
879 1 : will_init: false,
880 1 : len: 0,
881 1 : base_offset: MAX_SUPPORTED_POS.into_u64(),
882 1 : batch_offset: 1,
883 1 : };
884 1 : assert!(IndexEntry::new(too_large).is_err());
885 1 : let too_large = Args {
886 1 : will_init: false,
887 1 : len: 0,
888 1 : base_offset: MAX_SUPPORTED_POS.into_u64() - 1,
889 1 : batch_offset: MAX_SUPPORTED_POS.into_u64() - 1,
890 1 : };
891 1 : assert!(IndexEntry::new(too_large).is_err());
892 : }
893 :
894 : // valid special cases
895 : // - area past the max supported pos that is accessible by len
896 3 : for len in [1, MAX_SUPPORTED_BLOB_LEN] {
897 2 : roundtrip(
898 2 : Args {
899 2 : will_init: false,
900 2 : len,
901 2 : base_offset: MAX_SUPPORTED_POS.into_u64(),
902 2 : batch_offset: 0,
903 2 : },
904 2 : Unpacked {
905 2 : will_init: false,
906 2 : len: len as u64,
907 2 : pos: MAX_SUPPORTED_POS.into_u64(),
908 2 : },
909 2 : );
910 2 : roundtrip(
911 2 : Args {
912 2 : will_init: false,
913 2 : len,
914 2 : base_offset: 0,
915 2 : batch_offset: MAX_SUPPORTED_POS.into_u64(),
916 2 : },
917 2 : Unpacked {
918 2 : will_init: false,
919 2 : len: len as u64,
920 2 : pos: MAX_SUPPORTED_POS.into_u64(),
921 2 : },
922 2 : );
923 2 : }
924 1 : }
925 : }
|