Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use std::collections::{HashMap, VecDeque};
29 : use std::fs::File;
30 : use std::ops::Range;
31 : use std::os::unix::prelude::FileExt;
32 : use std::str::FromStr;
33 : use std::sync::Arc;
34 : use std::sync::atomic::AtomicU64;
35 :
36 : use anyhow::{Context, Result, bail, ensure};
37 : use bytes::Bytes;
38 : use camino::{Utf8Path, Utf8PathBuf};
39 : use hex;
40 : use itertools::Itertools;
41 : use pageserver_api::config::MaxVectoredReadBytes;
42 : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
43 : use pageserver_api::keyspace::KeySpace;
44 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
45 : use pageserver_api::value::Value;
46 : use serde::{Deserialize, Serialize};
47 : use tokio::sync::OnceCell;
48 : use tokio_stream::StreamExt;
49 : use tokio_util::sync::CancellationToken;
50 : use tracing::*;
51 : use utils::bin_ser::BeSer;
52 : use utils::bin_ser::SerializeError;
53 : use utils::id::{TenantId, TimelineId};
54 : use utils::lsn::Lsn;
55 :
56 : use super::errors::PutError;
57 : use super::layer_name::ImageLayerName;
58 : use super::{
59 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
60 : ValuesReconstructState,
61 : };
62 : use crate::config::PageServerConf;
63 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
64 : use crate::page_cache::{self, FileId, PAGE_SZ};
65 : use crate::tenant::blob_io::BlobWriter;
66 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
67 : use crate::tenant::disk_btree::{
68 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
69 : };
70 : use crate::tenant::timeline::GetVectoredError;
71 : use crate::tenant::vectored_blob_io::{
72 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
73 : VectoredReadPlanner,
74 : };
75 : use crate::virtual_file::TempVirtualFile;
76 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
77 : use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
78 : use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
79 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
80 :
81 : ///
82 : /// Header stored in the beginning of the file
83 : ///
84 : /// After this comes the 'values' part, starting on block 1. After that,
85 : /// the 'index' starts at the block indicated by 'index_start_blk'
86 : ///
87 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
88 : pub struct Summary {
89 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
90 : pub magic: u16,
91 : pub format_version: u16,
92 :
93 : pub tenant_id: TenantId,
94 : pub timeline_id: TimelineId,
95 : pub key_range: Range<Key>,
96 : pub lsn: Lsn,
97 :
98 : /// Block number where the 'index' part of the file begins.
99 : pub index_start_blk: u32,
100 : /// Block within the 'index', where the B-tree root page is stored
101 : pub index_root_blk: u32,
102 : // the 'values' part starts after the summary header, on block 1.
103 : }
104 :
105 : impl From<&ImageLayer> for Summary {
106 0 : fn from(layer: &ImageLayer) -> Self {
107 0 : Self::expected(
108 0 : layer.desc.tenant_shard_id.tenant_id,
109 0 : layer.desc.timeline_id,
110 0 : layer.desc.key_range.clone(),
111 0 : layer.lsn,
112 0 : )
113 0 : }
114 : }
115 :
116 : impl Summary {
117 : /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
118 191 : pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
119 191 : let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
120 191 : Self::ser_into(self, &mut buf)?;
121 : // Pad zeroes to the buffer so the length is a multiple of the alignment.
122 191 : buf.extend_with(0, buf.capacity() - buf.len());
123 191 : Ok(buf.freeze())
124 191 : }
125 :
126 75 : pub(super) fn expected(
127 75 : tenant_id: TenantId,
128 75 : timeline_id: TimelineId,
129 75 : key_range: Range<Key>,
130 75 : lsn: Lsn,
131 75 : ) -> Self {
132 75 : Self {
133 75 : magic: IMAGE_FILE_MAGIC,
134 75 : format_version: STORAGE_FORMAT_VERSION,
135 75 : tenant_id,
136 75 : timeline_id,
137 75 : key_range,
138 75 : lsn,
139 75 :
140 75 : index_start_blk: 0,
141 75 : index_root_blk: 0,
142 75 : }
143 75 : }
144 : }
145 :
146 : /// This is used only from `pagectl`. Within pageserver, all layers are
147 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
148 : pub struct ImageLayer {
149 : path: Utf8PathBuf,
150 : pub desc: PersistentLayerDesc,
151 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
152 : pub lsn: Lsn,
153 : inner: OnceCell<ImageLayerInner>,
154 : }
155 :
156 : impl std::fmt::Debug for ImageLayer {
157 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 : use super::RangeDisplayDebug;
159 :
160 0 : f.debug_struct("ImageLayer")
161 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
162 0 : .field("file_size", &self.desc.file_size)
163 0 : .field("lsn", &self.lsn)
164 0 : .field("inner", &self.inner)
165 0 : .finish()
166 0 : }
167 : }
168 :
169 : /// ImageLayer is the in-memory data structure associated with an on-disk image
170 : /// file.
171 : pub struct ImageLayerInner {
172 : // values copied from summary
173 : index_start_blk: u32,
174 : index_root_blk: u32,
175 :
176 : key_range: Range<Key>,
177 : lsn: Lsn,
178 :
179 : file: Arc<VirtualFile>,
180 : file_id: FileId,
181 :
182 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
183 : }
184 :
185 : impl ImageLayerInner {
186 0 : pub(crate) fn layer_dbg_info(&self) -> String {
187 0 : format!(
188 0 : "image {}..{} {}",
189 0 : self.key_range().start,
190 0 : self.key_range().end,
191 0 : self.lsn()
192 0 : )
193 0 : }
194 : }
195 :
196 : impl std::fmt::Debug for ImageLayerInner {
197 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 0 : f.debug_struct("ImageLayerInner")
199 0 : .field("index_start_blk", &self.index_start_blk)
200 0 : .field("index_root_blk", &self.index_root_blk)
201 0 : .finish()
202 0 : }
203 : }
204 :
205 : impl ImageLayerInner {
206 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
207 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
208 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
209 0 : self.index_start_blk,
210 0 : self.index_root_blk,
211 0 : block_reader,
212 0 : );
213 0 :
214 0 : tree_reader.dump(ctx).await?;
215 :
216 0 : tree_reader
217 0 : .visit(
218 0 : &[0u8; KEY_SIZE],
219 0 : VisitDirection::Forwards,
220 0 : |key, value| {
221 0 : println!("key: {} offset {}", hex::encode(key), value);
222 0 : true
223 0 : },
224 0 : ctx,
225 0 : )
226 0 : .await?;
227 :
228 0 : Ok(())
229 0 : }
230 : }
231 :
232 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
233 : impl std::fmt::Display for ImageLayer {
234 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 0 : write!(f, "{}", self.layer_desc().short_id())
236 0 : }
237 : }
238 :
239 : impl AsLayerDesc for ImageLayer {
240 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
241 0 : &self.desc
242 0 : }
243 : }
244 :
245 : impl ImageLayer {
246 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
247 0 : self.desc.dump();
248 0 :
249 0 : if !verbose {
250 0 : return Ok(());
251 0 : }
252 :
253 0 : let inner = self.load(ctx).await?;
254 :
255 0 : inner.dump(ctx).await?;
256 :
257 0 : Ok(())
258 0 : }
259 :
260 314 : fn temp_path_for(
261 314 : conf: &PageServerConf,
262 314 : timeline_id: TimelineId,
263 314 : tenant_shard_id: TenantShardId,
264 314 : fname: &ImageLayerName,
265 314 : ) -> Utf8PathBuf {
266 : // TempVirtualFile requires us to never reuse a filename while an old
267 : // instance of TempVirtualFile created with that filename is not done dropping yet.
268 : // So, we use a monotonic counter to disambiguate the filenames.
269 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
270 314 : let filename_disambiguator =
271 314 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
272 314 :
273 314 : conf.timeline_path(&tenant_shard_id, &timeline_id)
274 314 : .join(format!(
275 314 : "{fname}.{:x}.{TEMP_FILE_SUFFIX}",
276 314 : filename_disambiguator
277 314 : ))
278 314 : }
279 :
280 : ///
281 : /// Open the underlying file and read the metadata into memory, if it's
282 : /// not loaded already.
283 : ///
284 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
285 0 : self.inner
286 0 : .get_or_try_init(|| self.load_inner(ctx))
287 0 : .await
288 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
289 0 : }
290 :
291 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
292 0 : let path = self.path();
293 :
294 0 : let loaded =
295 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
296 :
297 : // not production code
298 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
299 0 : let expected_layer_name = self.layer_desc().layer_name();
300 0 :
301 0 : if actual_layer_name != expected_layer_name {
302 0 : println!("warning: filename does not match what is expected from in-file summary");
303 0 : println!("actual: {:?}", actual_layer_name.to_string());
304 0 : println!("expected: {:?}", expected_layer_name.to_string());
305 0 : }
306 :
307 0 : Ok(loaded)
308 0 : }
309 :
310 : /// Create an ImageLayer struct representing an existing file on disk.
311 : ///
312 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
313 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
314 0 : let mut summary_buf = vec![0; PAGE_SZ];
315 0 : file.read_exact_at(&mut summary_buf, 0)?;
316 0 : let summary = Summary::des_prefix(&summary_buf)?;
317 0 : let metadata = file
318 0 : .metadata()
319 0 : .context("get file metadata to determine size")?;
320 :
321 : // This function is never used for constructing layers in a running pageserver,
322 : // so it does not need an accurate TenantShardId.
323 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
324 0 :
325 0 : Ok(ImageLayer {
326 0 : path: path.to_path_buf(),
327 0 : desc: PersistentLayerDesc::new_img(
328 0 : tenant_shard_id,
329 0 : summary.timeline_id,
330 0 : summary.key_range,
331 0 : summary.lsn,
332 0 : metadata.len(),
333 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
334 0 : lsn: summary.lsn,
335 0 : inner: OnceCell::new(),
336 0 : })
337 0 : }
338 :
339 0 : fn path(&self) -> Utf8PathBuf {
340 0 : self.path.clone()
341 0 : }
342 : }
343 :
344 : #[derive(thiserror::Error, Debug)]
345 : pub enum RewriteSummaryError {
346 : #[error("magic mismatch")]
347 : MagicMismatch,
348 : #[error(transparent)]
349 : Other(#[from] anyhow::Error),
350 : }
351 :
352 : impl From<std::io::Error> for RewriteSummaryError {
353 0 : fn from(e: std::io::Error) -> Self {
354 0 : Self::Other(anyhow::anyhow!(e))
355 0 : }
356 : }
357 :
358 : impl ImageLayer {
359 0 : pub async fn rewrite_summary<F>(
360 0 : path: &Utf8Path,
361 0 : rewrite: F,
362 0 : ctx: &RequestContext,
363 0 : ) -> Result<(), RewriteSummaryError>
364 0 : where
365 0 : F: Fn(Summary) -> Summary,
366 0 : {
367 0 : let file = VirtualFile::open_with_options_v2(
368 0 : path,
369 0 : virtual_file::OpenOptions::new().read(true).write(true),
370 0 : ctx,
371 0 : )
372 0 : .await
373 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
374 0 : let file_id = page_cache::next_file_id();
375 0 : let block_reader = FileBlockReader::new(&file, file_id);
376 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
377 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
378 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
379 0 : return Err(RewriteSummaryError::MagicMismatch);
380 0 : }
381 0 :
382 0 : let new_summary = rewrite(actual_summary);
383 :
384 0 : let buf = new_summary.ser_into_page().context("serialize")?;
385 0 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
386 0 : res?;
387 0 : Ok(())
388 0 : }
389 : }
390 :
391 : impl ImageLayerInner {
392 70 : pub(crate) fn key_range(&self) -> &Range<Key> {
393 70 : &self.key_range
394 70 : }
395 :
396 70 : pub(crate) fn lsn(&self) -> Lsn {
397 70 : self.lsn
398 70 : }
399 :
400 75 : pub(super) async fn load(
401 75 : path: &Utf8Path,
402 75 : lsn: Lsn,
403 75 : summary: Option<Summary>,
404 75 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
405 75 : ctx: &RequestContext,
406 75 : ) -> anyhow::Result<Self> {
407 75 : let file = Arc::new(
408 75 : VirtualFile::open_v2(path, ctx)
409 75 : .await
410 75 : .context("open layer file")?,
411 : );
412 75 : let file_id = page_cache::next_file_id();
413 75 : let block_reader = FileBlockReader::new(&file, file_id);
414 75 : let summary_blk = block_reader
415 75 : .read_blk(0, ctx)
416 75 : .await
417 75 : .context("read first block")?;
418 :
419 : // length is the only way how this could fail, so it's not actually likely at all unless
420 : // read_blk returns wrong sized block.
421 : //
422 : // TODO: confirm and make this into assertion
423 75 : let actual_summary =
424 75 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
425 :
426 75 : if let Some(mut expected_summary) = summary {
427 : // production code path
428 75 : expected_summary.index_start_blk = actual_summary.index_start_blk;
429 75 : expected_summary.index_root_blk = actual_summary.index_root_blk;
430 75 : // mask out the timeline_id, but still require the layers to be from the same tenant
431 75 : expected_summary.timeline_id = actual_summary.timeline_id;
432 75 :
433 75 : if actual_summary != expected_summary {
434 0 : bail!(
435 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
436 0 : actual_summary,
437 0 : expected_summary
438 0 : );
439 75 : }
440 0 : }
441 :
442 75 : Ok(ImageLayerInner {
443 75 : index_start_blk: actual_summary.index_start_blk,
444 75 : index_root_blk: actual_summary.index_root_blk,
445 75 : lsn,
446 75 : file,
447 75 : file_id,
448 75 : max_vectored_read_bytes,
449 75 : key_range: actual_summary.key_range,
450 75 : })
451 75 : }
452 :
453 : // Look up the keys in the provided keyspace and update
454 : // the reconstruct state with whatever is found.
455 15126 : pub(super) async fn get_values_reconstruct_data(
456 15126 : &self,
457 15126 : this: ResidentLayer,
458 15126 : keyspace: KeySpace,
459 15126 : reconstruct_state: &mut ValuesReconstructState,
460 15126 : ctx: &RequestContext,
461 15126 : ) -> Result<(), GetVectoredError> {
462 15126 : let reads = self
463 15126 : .plan_reads(keyspace, None, ctx)
464 15126 : .await
465 15126 : .map_err(GetVectoredError::Other)?;
466 :
467 15126 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
468 15126 : .await;
469 :
470 15126 : reconstruct_state.on_image_layer_visited(&self.key_range);
471 15126 :
472 15126 : Ok(())
473 15126 : }
474 :
475 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
476 : /// and the keys in this layer.
477 : ///
478 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
479 : /// this shard.
480 15130 : async fn plan_reads(
481 15130 : &self,
482 15130 : keyspace: KeySpace,
483 15130 : shard_identity: Option<&ShardIdentity>,
484 15130 : ctx: &RequestContext,
485 15130 : ) -> anyhow::Result<Vec<VectoredRead>> {
486 15130 : let mut planner = VectoredReadPlanner::new(
487 15130 : self.max_vectored_read_bytes
488 15130 : .expect("Layer is loaded with max vectored bytes config")
489 15130 : .0
490 15130 : .into(),
491 15130 : );
492 15130 :
493 15130 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
494 15130 : let tree_reader =
495 15130 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
496 15130 :
497 15130 : let ctx = RequestContextBuilder::from(ctx)
498 15130 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
499 15130 : .attached_child();
500 :
501 22429 : for range in keyspace.ranges.iter() {
502 22429 : let mut range_end_handled = false;
503 22429 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
504 22429 : range.start.write_to_byte_slice(&mut search_key);
505 22429 :
506 22429 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
507 22429 : let mut index_stream = std::pin::pin!(index_stream);
508 :
509 100795 : while let Some(index_entry) = index_stream.next().await {
510 100412 : let (raw_key, offset) = index_entry?;
511 :
512 100412 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
513 100412 : assert!(key >= range.start);
514 :
515 100412 : let flag = if let Some(shard_identity) = shard_identity {
516 32768 : if shard_identity.is_key_disposable(&key) {
517 24576 : BlobFlag::Ignore
518 : } else {
519 8192 : BlobFlag::None
520 : }
521 : } else {
522 67644 : BlobFlag::None
523 : };
524 :
525 100412 : if key >= range.end {
526 22046 : planner.handle_range_end(offset);
527 22046 : range_end_handled = true;
528 22046 : break;
529 78366 : } else {
530 78366 : planner.handle(key, self.lsn, offset, flag);
531 78366 : }
532 : }
533 :
534 22429 : if !range_end_handled {
535 383 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
536 383 : planner.handle_range_end(payload_end);
537 22046 : }
538 : }
539 :
540 15130 : Ok(planner.finish())
541 15130 : }
542 :
543 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
544 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
545 4 : pub(super) async fn filter(
546 4 : &self,
547 4 : shard_identity: &ShardIdentity,
548 4 : writer: &mut ImageLayerWriter,
549 4 : ctx: &RequestContext,
550 4 : ) -> anyhow::Result<usize> {
551 : // Fragment the range into the regions owned by this ShardIdentity
552 4 : let plan = self
553 4 : .plan_reads(
554 4 : KeySpace {
555 4 : // If asked for the total key space, plan_reads will give us all the keys in the layer
556 4 : ranges: vec![Key::MIN..Key::MAX],
557 4 : },
558 4 : Some(shard_identity),
559 4 : ctx,
560 4 : )
561 4 : .await?;
562 :
563 4 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
564 4 : let mut key_count = 0;
565 4 : for read in plan.into_iter() {
566 4 : let buf_size = read.size();
567 4 :
568 4 : let buf = IoBufferMut::with_capacity(buf_size);
569 4 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
570 :
571 4 : let view = BufView::new_slice(&blobs_buf.buf);
572 :
573 8192 : for meta in blobs_buf.blobs.iter() {
574 : // Just read the raw header+data and pass it through to the target layer, without
575 : // decoding and recompressing it.
576 8192 : let raw = meta.raw_with_header(&view);
577 8192 : key_count += 1;
578 8192 : writer
579 8192 : .put_image_raw(meta.meta.key, raw.into_bytes(), ctx)
580 8192 : .await
581 8192 : .context(format!("Storing key {}", meta.meta.key))?;
582 : }
583 : }
584 :
585 4 : Ok(key_count)
586 4 : }
587 :
588 15126 : async fn do_reads_and_update_state(
589 15126 : &self,
590 15126 : this: ResidentLayer,
591 15126 : reads: Vec<VectoredRead>,
592 15126 : reconstruct_state: &mut ValuesReconstructState,
593 15126 : ctx: &RequestContext,
594 15126 : ) {
595 15126 : let max_vectored_read_bytes = self
596 15126 : .max_vectored_read_bytes
597 15126 : .expect("Layer is loaded with max vectored bytes config")
598 15126 : .0
599 15126 : .into();
600 :
601 17272 : for read in reads.into_iter() {
602 17272 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
603 45598 : for (_, blob_meta) in read.blobs_at.as_slice() {
604 45598 : let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
605 45598 : ios.insert((blob_meta.key, blob_meta.lsn), io);
606 45598 : }
607 :
608 17272 : let buf_size = read.size();
609 17272 :
610 17272 : if buf_size > max_vectored_read_bytes {
611 : // If the read is oversized, it should only contain one key.
612 0 : let offenders = read
613 0 : .blobs_at
614 0 : .as_slice()
615 0 : .iter()
616 0 : .filter_map(|(_, blob_meta)| {
617 0 : if blob_meta.key.is_rel_dir_key()
618 0 : || blob_meta.key == DBDIR_KEY
619 0 : || blob_meta.key.is_aux_file_key()
620 : {
621 : // The size of values for these keys is unbounded and can
622 : // grow very large in pathological cases.
623 0 : None
624 : } else {
625 0 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
626 : }
627 0 : })
628 0 : .join(", ");
629 0 :
630 0 : if !offenders.is_empty() {
631 0 : tracing::warn!(
632 0 : "Oversized vectored read ({} > {}) for keys {}",
633 : buf_size,
634 : max_vectored_read_bytes,
635 : offenders
636 : );
637 0 : }
638 17272 : }
639 :
640 17272 : let read_extend_residency = this.clone();
641 17272 : let read_from = self.file.clone();
642 17272 : let read_ctx = ctx.attached_child();
643 17272 : reconstruct_state
644 17272 : .spawn_io(async move {
645 17272 : let buf = IoBufferMut::with_capacity(buf_size);
646 17272 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
647 17272 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
648 :
649 17272 : match res {
650 17272 : Ok(blobs_buf) => {
651 17272 : let view = BufView::new_slice(&blobs_buf.buf);
652 45598 : for meta in blobs_buf.blobs.iter() {
653 45598 : let io: OnDiskValueIo =
654 45598 : ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
655 45598 : let img_buf = meta.read(&view).await;
656 :
657 45598 : let img_buf = match img_buf {
658 45598 : Ok(img_buf) => img_buf,
659 0 : Err(e) => {
660 0 : io.complete(Err(e));
661 0 : continue;
662 : }
663 : };
664 :
665 45598 : io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
666 : }
667 :
668 17272 : assert!(ios.is_empty());
669 : }
670 0 : Err(err) => {
671 0 : for (_, io) in ios {
672 0 : io.complete(Err(std::io::Error::new(
673 0 : err.kind(),
674 0 : "vec read failed",
675 0 : )));
676 0 : }
677 : }
678 : }
679 :
680 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
681 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
682 17272 : drop(read_extend_residency);
683 17272 : })
684 17272 : .await;
685 : }
686 15126 : }
687 :
688 63 : pub(crate) fn iter_with_options<'a>(
689 63 : &'a self,
690 63 : ctx: &'a RequestContext,
691 63 : max_read_size: u64,
692 63 : max_batch_size: usize,
693 63 : ) -> ImageLayerIterator<'a> {
694 63 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
695 63 : let tree_reader =
696 63 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
697 63 : ImageLayerIterator {
698 63 : image_layer: self,
699 63 : ctx,
700 63 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
701 63 : key_values_batch: VecDeque::new(),
702 63 : is_end: false,
703 63 : planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
704 63 : }
705 63 : }
706 :
707 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
708 : //
709 : // We're reusing the index traversal logical in plan_reads; would be nice to
710 : // factor that out.
711 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
712 0 : let plan = self
713 0 : .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
714 0 : .await?;
715 0 : Ok(plan
716 0 : .into_iter()
717 0 : .flat_map(|read| read.blobs_at)
718 0 : .map(|(_, blob_meta)| blob_meta.key)
719 0 : .collect())
720 0 : }
721 : }
722 :
723 : /// A builder object for constructing a new image layer.
724 : ///
725 : /// Usage:
726 : ///
727 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
728 : ///
729 : /// 2. Write the contents by calling `put_page_image` for every key-value
730 : /// pair in the key range.
731 : ///
732 : /// 3. Call `finish`.
733 : ///
734 : struct ImageLayerWriterInner {
735 : conf: &'static PageServerConf,
736 : path: Utf8PathBuf,
737 : timeline_id: TimelineId,
738 : tenant_shard_id: TenantShardId,
739 : key_range: Range<Key>,
740 : lsn: Lsn,
741 :
742 : // Total uncompressed bytes passed into put_image
743 : uncompressed_bytes: u64,
744 :
745 : // Like `uncompressed_bytes`,
746 : // but only of images we might consider for compression
747 : uncompressed_bytes_eligible: u64,
748 :
749 : // Like `uncompressed_bytes`, but only of images
750 : // where we have chosen their compressed form
751 : uncompressed_bytes_chosen: u64,
752 :
753 : // Number of keys in the layer.
754 : num_keys: usize,
755 :
756 : blob_writer: BlobWriter<TempVirtualFile>,
757 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
758 :
759 : #[cfg(feature = "testing")]
760 : last_written_key: Key,
761 : }
762 :
763 : impl ImageLayerWriterInner {
764 : ///
765 : /// Start building a new image layer.
766 : ///
767 : #[allow(clippy::too_many_arguments)]
768 314 : async fn new(
769 314 : conf: &'static PageServerConf,
770 314 : timeline_id: TimelineId,
771 314 : tenant_shard_id: TenantShardId,
772 314 : key_range: &Range<Key>,
773 314 : lsn: Lsn,
774 314 : gate: &utils::sync::gate::Gate,
775 314 : cancel: CancellationToken,
776 314 : ctx: &RequestContext,
777 314 : ) -> anyhow::Result<Self> {
778 314 : // Create the file initially with a temporary filename.
779 314 : // We'll atomically rename it to the final name when we're done.
780 314 : let path = ImageLayer::temp_path_for(
781 314 : conf,
782 314 : timeline_id,
783 314 : tenant_shard_id,
784 314 : &ImageLayerName {
785 314 : key_range: key_range.clone(),
786 314 : lsn,
787 314 : },
788 314 : );
789 314 : trace!("creating image layer {}", path);
790 314 : let file = TempVirtualFile::new(
791 314 : VirtualFile::open_with_options_v2(
792 314 : &path,
793 314 : virtual_file::OpenOptions::new()
794 314 : .create_new(true)
795 314 : .write(true),
796 314 : ctx,
797 314 : )
798 314 : .await?,
799 314 : gate.enter()?,
800 : );
801 :
802 : // Start at `PAGE_SZ` to make room for the header block.
803 314 : let blob_writer = BlobWriter::new(
804 314 : file,
805 314 : PAGE_SZ as u64,
806 314 : gate,
807 314 : cancel,
808 314 : ctx,
809 314 : info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
810 0 : )?;
811 :
812 : // Initialize the b-tree index builder
813 314 : let block_buf = BlockBuf::new();
814 314 : let tree_builder = DiskBtreeBuilder::new(block_buf);
815 314 :
816 314 : let writer = Self {
817 314 : conf,
818 314 : path,
819 314 : timeline_id,
820 314 : tenant_shard_id,
821 314 : key_range: key_range.clone(),
822 314 : lsn,
823 314 : tree: tree_builder,
824 314 : blob_writer,
825 314 : uncompressed_bytes: 0,
826 314 : uncompressed_bytes_eligible: 0,
827 314 : uncompressed_bytes_chosen: 0,
828 314 : num_keys: 0,
829 314 : #[cfg(feature = "testing")]
830 314 : last_written_key: Key::MIN,
831 314 : };
832 314 :
833 314 : Ok(writer)
834 314 : }
835 :
836 : ///
837 : /// Write next value to the file.
838 : ///
839 : /// The page versions must be appended in blknum order.
840 : ///
841 19576 : async fn put_image(
842 19576 : &mut self,
843 19576 : key: Key,
844 19576 : img: Bytes,
845 19576 : ctx: &RequestContext,
846 19576 : ) -> Result<(), PutError> {
847 19576 : if !self.key_range.contains(&key) {
848 0 : return Err(PutError::Other(anyhow::anyhow!(
849 0 : "key {:?} not in range {:?}",
850 0 : key,
851 0 : self.key_range
852 0 : )));
853 19576 : }
854 19576 : let compression = self.conf.image_compression;
855 19576 : let uncompressed_len = img.len() as u64;
856 19576 : self.uncompressed_bytes += uncompressed_len;
857 19576 : self.num_keys += 1;
858 19576 : let (_img, res) = self
859 19576 : .blob_writer
860 19576 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
861 19576 : .await;
862 : // TODO: re-use the buffer for `img` further upstack
863 19576 : let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
864 19576 : if compression_info.compressed_size.is_some() {
865 4001 : // The image has been considered for compression at least
866 4001 : self.uncompressed_bytes_eligible += uncompressed_len;
867 15575 : }
868 19576 : if compression_info.written_compressed {
869 0 : // The image has been compressed
870 0 : self.uncompressed_bytes_chosen += uncompressed_len;
871 19576 : }
872 :
873 19576 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
874 19576 : key.write_to_byte_slice(&mut keybuf);
875 19576 : self.tree
876 19576 : .append(&keybuf, off)
877 19576 : .map_err(anyhow::Error::new)
878 19576 : .map_err(PutError::Other)?;
879 :
880 : #[cfg(feature = "testing")]
881 19576 : {
882 19576 : self.last_written_key = key;
883 19576 : }
884 19576 :
885 19576 : Ok(())
886 19576 : }
887 :
888 : ///
889 : /// Write the next image to the file, as a raw blob header and data.
890 : ///
891 : /// The page versions must be appended in blknum order.
892 : ///
893 8192 : async fn put_image_raw(
894 8192 : &mut self,
895 8192 : key: Key,
896 8192 : raw_with_header: Bytes,
897 8192 : ctx: &RequestContext,
898 8192 : ) -> anyhow::Result<()> {
899 8192 : ensure!(self.key_range.contains(&key));
900 :
901 : // NB: we don't update the (un)compressed metrics, since we can't determine them without
902 : // decompressing the image. This seems okay.
903 8192 : self.num_keys += 1;
904 :
905 8192 : let (_, res) = self
906 8192 : .blob_writer
907 8192 : .write_blob_raw(raw_with_header.slice_len(), ctx)
908 8192 : .await;
909 8192 : let offset = res?;
910 :
911 8192 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
912 8192 : key.write_to_byte_slice(&mut keybuf);
913 8192 : self.tree.append(&keybuf, offset)?;
914 :
915 : #[cfg(feature = "testing")]
916 8192 : {
917 8192 : self.last_written_key = key;
918 8192 : }
919 8192 :
920 8192 : Ok(())
921 8192 : }
922 :
923 : ///
924 : /// Finish writing the image layer.
925 : ///
926 191 : async fn finish(
927 191 : self,
928 191 : ctx: &RequestContext,
929 191 : end_key: Option<Key>,
930 191 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
931 191 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
932 191 :
933 191 : // Calculate compression ratio
934 191 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
935 191 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
936 191 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
937 191 : .inc_by(self.uncompressed_bytes_eligible);
938 191 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
939 191 :
940 191 : // NB: filter() may pass through raw pages from a different layer, without looking at
941 191 : // whether these are compressed or not. We don't track metrics for these, so avoid
942 191 : // increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too.
943 191 : if self.uncompressed_bytes > 0 {
944 188 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
945 188 : };
946 :
947 191 : let file = self
948 191 : .blob_writer
949 191 : .shutdown(
950 191 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
951 191 : ctx,
952 191 : )
953 191 : .await?;
954 :
955 : // Write out the index
956 191 : let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
957 191 : let (index_root_blk, block_buf) = self.tree.finish()?;
958 :
959 : // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
960 : // Should we just replace BlockBuf::blocks with one big buffer?
961 398 : for buf in block_buf.blocks {
962 207 : let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
963 207 : res?;
964 207 : offset += PAGE_SZ as u64;
965 : }
966 :
967 191 : let final_key_range = if let Some(end_key) = end_key {
968 150 : self.key_range.start..end_key
969 : } else {
970 41 : self.key_range.clone()
971 : };
972 :
973 : // Fill in the summary on blk 0
974 191 : let summary = Summary {
975 191 : magic: IMAGE_FILE_MAGIC,
976 191 : format_version: STORAGE_FORMAT_VERSION,
977 191 : tenant_id: self.tenant_shard_id.tenant_id,
978 191 : timeline_id: self.timeline_id,
979 191 : key_range: final_key_range.clone(),
980 191 : lsn: self.lsn,
981 191 : index_start_blk,
982 191 : index_root_blk,
983 191 : };
984 :
985 : // Writes summary at the first block (offset 0).
986 191 : let buf = summary.ser_into_page()?;
987 191 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
988 191 : res?;
989 :
990 191 : let metadata = file
991 191 : .metadata()
992 191 : .await
993 191 : .context("get metadata to determine file size")?;
994 :
995 191 : let desc = PersistentLayerDesc::new_img(
996 191 : self.tenant_shard_id,
997 191 : self.timeline_id,
998 191 : final_key_range,
999 191 : self.lsn,
1000 191 : metadata.len(),
1001 191 : );
1002 :
1003 : #[cfg(feature = "testing")]
1004 191 : if let Some(end_key) = end_key {
1005 150 : assert!(
1006 150 : self.last_written_key < end_key,
1007 0 : "written key violates end_key range"
1008 : );
1009 41 : }
1010 :
1011 : // Note: Because we open the file in write-only mode, we cannot
1012 : // reuse the same VirtualFile for reading later. That's why we don't
1013 : // set inner.file here. The first read will have to re-open it.
1014 :
1015 : // fsync the file
1016 191 : file.sync_all()
1017 191 : .await
1018 191 : .maybe_fatal_err("image_layer sync_all")?;
1019 :
1020 191 : trace!("created image layer {}", self.path);
1021 :
1022 : // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
1023 : // keep the gate open also, so that it's safe for them to rename the file to its final destination.
1024 191 : file.disarm_into_inner();
1025 191 :
1026 191 : Ok((desc, self.path))
1027 191 : }
1028 : }
1029 :
1030 : /// A builder object for constructing a new image layer.
1031 : ///
1032 : /// Usage:
1033 : ///
1034 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
1035 : ///
1036 : /// 2. Write the contents by calling `put_page_image` for every key-value
1037 : /// pair in the key range.
1038 : ///
1039 : /// 3. Call `finish`.
1040 : ///
1041 : /// # Note
1042 : ///
1043 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
1044 : /// possible for the writer to drop before `finish` is actually called. So this
1045 : /// could lead to odd temporary files in the directory, exhausting file system.
1046 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
1047 : /// implementation that cleans up the temporary file in failure. It's not
1048 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
1049 : /// out some fields, making it impossible to implement `Drop`.
1050 : ///
1051 : #[must_use]
1052 : pub struct ImageLayerWriter {
1053 : inner: Option<ImageLayerWriterInner>,
1054 : }
1055 :
1056 : impl ImageLayerWriter {
1057 : ///
1058 : /// Start building a new image layer.
1059 : ///
1060 : #[allow(clippy::too_many_arguments)]
1061 314 : pub async fn new(
1062 314 : conf: &'static PageServerConf,
1063 314 : timeline_id: TimelineId,
1064 314 : tenant_shard_id: TenantShardId,
1065 314 : key_range: &Range<Key>,
1066 314 : lsn: Lsn,
1067 314 : gate: &utils::sync::gate::Gate,
1068 314 : cancel: CancellationToken,
1069 314 : ctx: &RequestContext,
1070 314 : ) -> anyhow::Result<ImageLayerWriter> {
1071 314 : Ok(Self {
1072 314 : inner: Some(
1073 314 : ImageLayerWriterInner::new(
1074 314 : conf,
1075 314 : timeline_id,
1076 314 : tenant_shard_id,
1077 314 : key_range,
1078 314 : lsn,
1079 314 : gate,
1080 314 : cancel,
1081 314 : ctx,
1082 314 : )
1083 314 : .await?,
1084 : ),
1085 : })
1086 314 : }
1087 :
1088 : ///
1089 : /// Write next value to the file.
1090 : ///
1091 : /// The page versions must be appended in blknum order.
1092 : ///
1093 19576 : pub async fn put_image(
1094 19576 : &mut self,
1095 19576 : key: Key,
1096 19576 : img: Bytes,
1097 19576 : ctx: &RequestContext,
1098 19576 : ) -> Result<(), PutError> {
1099 19576 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
1100 19576 : }
1101 :
1102 : ///
1103 : /// Write the next value to the file, as a raw header and data. This allows passing through a
1104 : /// raw, potentially compressed image from a different layer file without recompressing it.
1105 : ///
1106 : /// The page versions must be appended in blknum order.
1107 : ///
1108 8192 : pub async fn put_image_raw(
1109 8192 : &mut self,
1110 8192 : key: Key,
1111 8192 : raw_with_header: Bytes,
1112 8192 : ctx: &RequestContext,
1113 8192 : ) -> anyhow::Result<()> {
1114 8192 : self.inner
1115 8192 : .as_mut()
1116 8192 : .unwrap()
1117 8192 : .put_image_raw(key, raw_with_header, ctx)
1118 8192 : .await
1119 8192 : }
1120 :
1121 : /// Estimated size of the image layer.
1122 4277 : pub(crate) fn estimated_size(&self) -> u64 {
1123 4277 : let inner = self.inner.as_ref().unwrap();
1124 4277 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
1125 4277 : }
1126 :
1127 4326 : pub(crate) fn num_keys(&self) -> usize {
1128 4326 : self.inner.as_ref().unwrap().num_keys
1129 4326 : }
1130 :
1131 : ///
1132 : /// Finish writing the image layer.
1133 : ///
1134 41 : pub(crate) async fn finish(
1135 41 : mut self,
1136 41 : ctx: &RequestContext,
1137 41 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1138 41 : self.inner.take().unwrap().finish(ctx, None).await
1139 41 : }
1140 :
1141 : /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
1142 150 : pub(super) async fn finish_with_end_key(
1143 150 : mut self,
1144 150 : end_key: Key,
1145 150 : ctx: &RequestContext,
1146 150 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1147 150 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
1148 150 : }
1149 : }
1150 :
1151 : pub struct ImageLayerIterator<'a> {
1152 : image_layer: &'a ImageLayerInner,
1153 : ctx: &'a RequestContext,
1154 : planner: StreamingVectoredReadPlanner,
1155 : index_iter: DiskBtreeIterator<'a>,
1156 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1157 : is_end: bool,
1158 : }
1159 :
1160 : impl ImageLayerIterator<'_> {
1161 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1162 0 : self.image_layer.layer_dbg_info()
1163 0 : }
1164 :
1165 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1166 9565 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1167 9565 : assert!(self.key_values_batch.is_empty());
1168 9565 : assert!(!self.is_end);
1169 :
1170 9565 : let plan = loop {
1171 14537 : if let Some(res) = self.index_iter.next().await {
1172 14488 : let (raw_key, offset) = res?;
1173 14488 : if let Some(batch_plan) = self.planner.handle(
1174 14488 : Key::from_slice(&raw_key[..KEY_SIZE]),
1175 14488 : self.image_layer.lsn,
1176 14488 : offset,
1177 14488 : true,
1178 14488 : ) {
1179 9516 : break batch_plan;
1180 4972 : }
1181 : } else {
1182 49 : self.is_end = true;
1183 49 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1184 49 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1185 49 : break item;
1186 : } else {
1187 0 : return Ok(()); // TODO: a test case on empty iterator
1188 : }
1189 : }
1190 : };
1191 9565 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1192 9565 : let mut next_batch = std::collections::VecDeque::new();
1193 9565 : let buf_size = plan.size();
1194 9565 : let buf = IoBufferMut::with_capacity(buf_size);
1195 9565 : let blobs_buf = vectored_blob_reader
1196 9565 : .read_blobs(&plan, buf, self.ctx)
1197 9565 : .await?;
1198 9565 : let view = BufView::new_slice(&blobs_buf.buf);
1199 14474 : for meta in blobs_buf.blobs.iter() {
1200 14474 : let img_buf = meta.read(&view).await?;
1201 14474 : next_batch.push_back((
1202 14474 : meta.meta.key,
1203 14474 : self.image_layer.lsn,
1204 14474 : Value::Image(img_buf.into_bytes()),
1205 14474 : ));
1206 : }
1207 9565 : self.key_values_batch = next_batch;
1208 9565 : Ok(())
1209 9565 : }
1210 :
1211 14414 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1212 14414 : if self.key_values_batch.is_empty() {
1213 9604 : if self.is_end {
1214 81 : return Ok(None);
1215 9523 : }
1216 9523 : self.next_batch().await?;
1217 4810 : }
1218 14333 : Ok(Some(
1219 14333 : self.key_values_batch
1220 14333 : .pop_front()
1221 14333 : .expect("should not be empty"),
1222 14333 : ))
1223 14414 : }
1224 : }
1225 :
1226 : #[cfg(test)]
1227 : mod test {
1228 : use std::sync::Arc;
1229 : use std::time::Duration;
1230 :
1231 : use bytes::Bytes;
1232 : use itertools::Itertools;
1233 : use pageserver_api::key::Key;
1234 : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
1235 : use pageserver_api::value::Value;
1236 : use utils::generation::Generation;
1237 : use utils::id::{TenantId, TimelineId};
1238 : use utils::lsn::Lsn;
1239 :
1240 : use super::{ImageLayerIterator, ImageLayerWriter};
1241 : use crate::DEFAULT_PG_VERSION;
1242 : use crate::context::RequestContext;
1243 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1244 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1245 : use crate::tenant::{TenantShard, Timeline};
1246 :
1247 : #[tokio::test]
1248 1 : async fn image_layer_rewrite() {
1249 1 : let tenant_conf = pageserver_api::models::TenantConfig {
1250 1 : gc_period: Some(Duration::ZERO),
1251 1 : compaction_period: Some(Duration::ZERO),
1252 1 : ..Default::default()
1253 1 : };
1254 1 : let tenant_id = TenantId::generate();
1255 1 : let mut gen_ = Generation::new(0xdead0001);
1256 5 : let mut get_next_gen = || {
1257 5 : let ret = gen_;
1258 5 : gen_ = gen_.next();
1259 5 : ret
1260 5 : };
1261 1 : // The LSN at which we will create an image layer to filter
1262 1 : let lsn = Lsn(0xdeadbeef0000);
1263 1 : let timeline_id = TimelineId::generate();
1264 1 :
1265 1 : //
1266 1 : // Create an unsharded parent with a layer.
1267 1 : //
1268 1 :
1269 1 : let harness = TenantHarness::create_custom(
1270 1 : "test_image_layer_rewrite--parent",
1271 1 : tenant_conf.clone(),
1272 1 : tenant_id,
1273 1 : ShardIdentity::unsharded(),
1274 1 : get_next_gen(),
1275 1 : )
1276 1 : .await
1277 1 : .unwrap();
1278 1 : let (tenant, ctx) = harness.load().await;
1279 1 : let timeline = tenant
1280 1 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1281 1 : .await
1282 1 : .unwrap();
1283 1 :
1284 1 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1285 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1286 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000002000").unwrap();
1287 1 : let range = input_start..input_end;
1288 1 :
1289 1 : // Build an image layer to filter
1290 1 : let resident = {
1291 1 : let mut writer = ImageLayerWriter::new(
1292 1 : harness.conf,
1293 1 : timeline_id,
1294 1 : harness.tenant_shard_id,
1295 1 : &range,
1296 1 : lsn,
1297 1 : &timeline.gate,
1298 1 : timeline.cancel.clone(),
1299 1 : &ctx,
1300 1 : )
1301 1 : .await
1302 1 : .unwrap();
1303 1 :
1304 1 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1305 1 : let mut key = range.start;
1306 8193 : while key < range.end {
1307 8192 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1308 8192 :
1309 8192 : key = key.next();
1310 1 : }
1311 1 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1312 1 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1313 1 : };
1314 1 : let original_size = resident.metadata().file_size;
1315 1 :
1316 1 : //
1317 1 : // Create child shards and do the rewrite, exercising filter().
1318 1 : // TODO: abstraction in TenantHarness for splits.
1319 1 : //
1320 1 :
1321 1 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1322 1 : // range, middle of key range.
1323 1 : let shard_count = ShardCount::new(4);
1324 4 : for shard_number in 0..shard_count.count() {
1325 1 : //
1326 1 : // mimic the shard split
1327 1 : //
1328 4 : let shard_identity = ShardIdentity::new(
1329 4 : ShardNumber(shard_number),
1330 4 : shard_count,
1331 4 : ShardStripeSize(0x800),
1332 4 : )
1333 4 : .unwrap();
1334 4 : let harness = TenantHarness::create_custom(
1335 4 : Box::leak(Box::new(format!(
1336 4 : "test_image_layer_rewrite--child{}",
1337 4 : shard_identity.shard_slug()
1338 4 : ))),
1339 4 : tenant_conf.clone(),
1340 4 : tenant_id,
1341 4 : shard_identity,
1342 4 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1343 4 : // But here, all we care about is that the gen number is unique.
1344 4 : get_next_gen(),
1345 4 : )
1346 4 : .await
1347 4 : .unwrap();
1348 4 : let (tenant, ctx) = harness.load().await;
1349 4 : let timeline = tenant
1350 4 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1351 4 : .await
1352 4 : .unwrap();
1353 1 :
1354 1 : //
1355 1 : // use filter() and make assertions
1356 1 : //
1357 1 :
1358 4 : let mut filtered_writer = ImageLayerWriter::new(
1359 4 : harness.conf,
1360 4 : timeline_id,
1361 4 : harness.tenant_shard_id,
1362 4 : &range,
1363 4 : lsn,
1364 4 : &timeline.gate,
1365 4 : timeline.cancel.clone(),
1366 4 : &ctx,
1367 4 : )
1368 4 : .await
1369 4 : .unwrap();
1370 1 :
1371 4 : let wrote_keys = resident
1372 4 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1373 4 : .await
1374 4 : .unwrap();
1375 4 : let replacement = if wrote_keys > 0 {
1376 3 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1377 3 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1378 3 : Some(resident)
1379 1 : } else {
1380 1 : None
1381 1 : };
1382 1 :
1383 1 : // This exact size and those below will need updating as/when the layer encoding changes, but
1384 1 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1385 4 : assert_eq!(original_size, 122880);
1386 1 :
1387 4 : match shard_number {
1388 1 : 0 => {
1389 1 : // We should have written out just one stripe for our shard identity
1390 1 : assert_eq!(wrote_keys, 0x800);
1391 1 : let replacement = replacement.unwrap();
1392 1 :
1393 1 : // We should have dropped some of the data
1394 1 : assert!(replacement.metadata().file_size < original_size);
1395 1 : assert!(replacement.metadata().file_size > 0);
1396 1 :
1397 1 : // Assert that we dropped ~3/4 of the data.
1398 1 : assert_eq!(replacement.metadata().file_size, 49152);
1399 1 : }
1400 1 : 1 => {
1401 1 : // Shard 1 has no keys in our input range
1402 1 : assert_eq!(wrote_keys, 0x0);
1403 1 : assert!(replacement.is_none());
1404 1 : }
1405 1 : 2 => {
1406 1 : // Shard 2 has one stripes in the input range
1407 1 : assert_eq!(wrote_keys, 0x800);
1408 1 : let replacement = replacement.unwrap();
1409 1 : assert!(replacement.metadata().file_size < original_size);
1410 1 : assert!(replacement.metadata().file_size > 0);
1411 1 : assert_eq!(replacement.metadata().file_size, 49152);
1412 1 : }
1413 1 : 3 => {
1414 1 : // Shard 3 has two stripes in the input range
1415 1 : assert_eq!(wrote_keys, 0x1000);
1416 1 : let replacement = replacement.unwrap();
1417 1 : assert!(replacement.metadata().file_size < original_size);
1418 1 : assert!(replacement.metadata().file_size > 0);
1419 1 : assert_eq!(replacement.metadata().file_size, 73728);
1420 1 : }
1421 1 : _ => unreachable!(),
1422 1 : }
1423 1 : }
1424 1 : }
1425 :
1426 1 : async fn produce_image_layer(
1427 1 : tenant: &TenantShard,
1428 1 : tline: &Arc<Timeline>,
1429 1 : mut images: Vec<(Key, Bytes)>,
1430 1 : lsn: Lsn,
1431 1 : ctx: &RequestContext,
1432 1 : ) -> anyhow::Result<ResidentLayer> {
1433 1 : images.sort();
1434 1 : let (key_start, _) = images.first().unwrap();
1435 1 : let (key_last, _) = images.last().unwrap();
1436 1 : let key_end = key_last.next();
1437 1 : let key_range = *key_start..key_end;
1438 1 : let mut writer = ImageLayerWriter::new(
1439 1 : tenant.conf,
1440 1 : tline.timeline_id,
1441 1 : tenant.tenant_shard_id,
1442 1 : &key_range,
1443 1 : lsn,
1444 1 : &tline.gate,
1445 1 : tline.cancel.clone(),
1446 1 : ctx,
1447 1 : )
1448 1 : .await?;
1449 :
1450 1001 : for (key, img) in images {
1451 1000 : writer.put_image(key, img, ctx).await?;
1452 : }
1453 1 : let (desc, path) = writer.finish(ctx).await?;
1454 1 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1455 :
1456 1 : Ok::<_, anyhow::Error>(img_layer)
1457 1 : }
1458 :
1459 14 : async fn assert_img_iter_equal(
1460 14 : img_iter: &mut ImageLayerIterator<'_>,
1461 14 : expect: &[(Key, Bytes)],
1462 14 : expect_lsn: Lsn,
1463 14 : ) {
1464 14 : let mut expect_iter = expect.iter();
1465 : loop {
1466 14014 : let o1 = img_iter.next().await.unwrap();
1467 14014 : let o2 = expect_iter.next();
1468 14014 : match (o1, o2) {
1469 14 : (None, None) => break,
1470 14000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1471 14000 : let Value::Image(i1) = v1 else {
1472 0 : panic!("expect Value::Image")
1473 : };
1474 14000 : assert_eq!(&k1, k2);
1475 14000 : assert_eq!(l1, expect_lsn);
1476 14000 : assert_eq!(&i1, i2);
1477 : }
1478 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1479 : }
1480 : }
1481 14 : }
1482 :
1483 : #[tokio::test]
1484 1 : async fn image_layer_iterator() {
1485 1 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1486 1 : let (tenant, ctx) = harness.load().await;
1487 1 :
1488 1 : let tline = tenant
1489 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1490 1 : .await
1491 1 : .unwrap();
1492 1 :
1493 1000 : fn get_key(id: u32) -> Key {
1494 1000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1495 1000 : key.field6 = id;
1496 1000 : key
1497 1000 : }
1498 1 : const N: usize = 1000;
1499 1 : let test_imgs = (0..N)
1500 1000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1501 1 : .collect_vec();
1502 1 : let resident_layer =
1503 1 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1504 1 : .await
1505 1 : .unwrap();
1506 1 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1507 3 : for max_read_size in [1, 1024] {
1508 16 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1509 14 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1510 14 : // Test if the batch size is correctly determined
1511 14 : let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
1512 14 : let mut num_items = 0;
1513 56 : for _ in 0..3 {
1514 42 : iter.next_batch().await.unwrap();
1515 42 : num_items += iter.key_values_batch.len();
1516 42 : if max_read_size == 1 {
1517 1 : // every key should be a batch b/c the value is larger than max_read_size
1518 21 : assert_eq!(iter.key_values_batch.len(), 1);
1519 1 : } else {
1520 21 : assert!(iter.key_values_batch.len() <= batch_size);
1521 1 : }
1522 42 : if num_items >= N {
1523 1 : break;
1524 42 : }
1525 42 : iter.key_values_batch.clear();
1526 1 : }
1527 1 : // Test if the result is correct
1528 14 : let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
1529 14 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1530 1 : }
1531 1 : }
1532 1 : }
1533 : }
|