Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use std::collections::{HashMap, VecDeque};
29 : use std::fs::File;
30 : use std::ops::Range;
31 : use std::os::unix::prelude::FileExt;
32 : use std::str::FromStr;
33 : use std::sync::Arc;
34 : use std::sync::atomic::AtomicU64;
35 :
36 : use anyhow::{Context, Result, bail, ensure};
37 : use bytes::Bytes;
38 : use camino::{Utf8Path, Utf8PathBuf};
39 : use hex;
40 : use itertools::Itertools;
41 : use pageserver_api::config::MaxVectoredReadBytes;
42 : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
43 : use pageserver_api::keyspace::KeySpace;
44 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
45 : use pageserver_api::value::Value;
46 : use serde::{Deserialize, Serialize};
47 : use tokio::sync::OnceCell;
48 : use tokio_stream::StreamExt;
49 : use tokio_util::sync::CancellationToken;
50 : use tracing::*;
51 : use utils::bin_ser::BeSer;
52 : use utils::bin_ser::SerializeError;
53 : use utils::id::{TenantId, TimelineId};
54 : use utils::lsn::Lsn;
55 :
56 : use super::layer_name::ImageLayerName;
57 : use super::{
58 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
59 : ValuesReconstructState,
60 : };
61 : use crate::config::PageServerConf;
62 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
63 : use crate::page_cache::{self, FileId, PAGE_SZ};
64 : use crate::tenant::blob_io::BlobWriter;
65 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
66 : use crate::tenant::disk_btree::{
67 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
68 : };
69 : use crate::tenant::timeline::GetVectoredError;
70 : use crate::tenant::vectored_blob_io::{
71 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
72 : VectoredReadPlanner,
73 : };
74 : use crate::virtual_file::TempVirtualFile;
75 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
76 : use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
77 : use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
78 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
79 :
80 : ///
81 : /// Header stored in the beginning of the file
82 : ///
83 : /// After this comes the 'values' part, starting on block 1. After that,
84 : /// the 'index' starts at the block indicated by 'index_start_blk'
85 : ///
86 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
87 : pub struct Summary {
88 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
89 : pub magic: u16,
90 : pub format_version: u16,
91 :
92 : pub tenant_id: TenantId,
93 : pub timeline_id: TimelineId,
94 : pub key_range: Range<Key>,
95 : pub lsn: Lsn,
96 :
97 : /// Block number where the 'index' part of the file begins.
98 : pub index_start_blk: u32,
99 : /// Block within the 'index', where the B-tree root page is stored
100 : pub index_root_blk: u32,
101 : // the 'values' part starts after the summary header, on block 1.
102 : }
103 :
104 : impl From<&ImageLayer> for Summary {
105 0 : fn from(layer: &ImageLayer) -> Self {
106 0 : Self::expected(
107 0 : layer.desc.tenant_shard_id.tenant_id,
108 0 : layer.desc.timeline_id,
109 0 : layer.desc.key_range.clone(),
110 0 : layer.lsn,
111 0 : )
112 0 : }
113 : }
114 :
115 : impl Summary {
116 : /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
117 2292 : pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
118 2292 : let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
119 2292 : Self::ser_into(self, &mut buf)?;
120 : // Pad zeroes to the buffer so the length is a multiple of the alignment.
121 2292 : buf.extend_with(0, buf.capacity() - buf.len());
122 2292 : Ok(buf.freeze())
123 2292 : }
124 :
125 900 : pub(super) fn expected(
126 900 : tenant_id: TenantId,
127 900 : timeline_id: TimelineId,
128 900 : key_range: Range<Key>,
129 900 : lsn: Lsn,
130 900 : ) -> Self {
131 900 : Self {
132 900 : magic: IMAGE_FILE_MAGIC,
133 900 : format_version: STORAGE_FORMAT_VERSION,
134 900 : tenant_id,
135 900 : timeline_id,
136 900 : key_range,
137 900 : lsn,
138 900 :
139 900 : index_start_blk: 0,
140 900 : index_root_blk: 0,
141 900 : }
142 900 : }
143 : }
144 :
145 : /// This is used only from `pagectl`. Within pageserver, all layers are
146 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
147 : pub struct ImageLayer {
148 : path: Utf8PathBuf,
149 : pub desc: PersistentLayerDesc,
150 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
151 : pub lsn: Lsn,
152 : inner: OnceCell<ImageLayerInner>,
153 : }
154 :
155 : impl std::fmt::Debug for ImageLayer {
156 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 : use super::RangeDisplayDebug;
158 :
159 0 : f.debug_struct("ImageLayer")
160 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
161 0 : .field("file_size", &self.desc.file_size)
162 0 : .field("lsn", &self.lsn)
163 0 : .field("inner", &self.inner)
164 0 : .finish()
165 0 : }
166 : }
167 :
168 : /// ImageLayer is the in-memory data structure associated with an on-disk image
169 : /// file.
170 : pub struct ImageLayerInner {
171 : // values copied from summary
172 : index_start_blk: u32,
173 : index_root_blk: u32,
174 :
175 : key_range: Range<Key>,
176 : lsn: Lsn,
177 :
178 : file: Arc<VirtualFile>,
179 : file_id: FileId,
180 :
181 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
182 : }
183 :
184 : impl ImageLayerInner {
185 0 : pub(crate) fn layer_dbg_info(&self) -> String {
186 0 : format!(
187 0 : "image {}..{} {}",
188 0 : self.key_range().start,
189 0 : self.key_range().end,
190 0 : self.lsn()
191 0 : )
192 0 : }
193 : }
194 :
195 : impl std::fmt::Debug for ImageLayerInner {
196 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 0 : f.debug_struct("ImageLayerInner")
198 0 : .field("index_start_blk", &self.index_start_blk)
199 0 : .field("index_root_blk", &self.index_root_blk)
200 0 : .finish()
201 0 : }
202 : }
203 :
204 : impl ImageLayerInner {
205 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
206 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
207 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
208 0 : self.index_start_blk,
209 0 : self.index_root_blk,
210 0 : block_reader,
211 0 : );
212 0 :
213 0 : tree_reader.dump(ctx).await?;
214 :
215 0 : tree_reader
216 0 : .visit(
217 0 : &[0u8; KEY_SIZE],
218 0 : VisitDirection::Forwards,
219 0 : |key, value| {
220 0 : println!("key: {} offset {}", hex::encode(key), value);
221 0 : true
222 0 : },
223 0 : ctx,
224 0 : )
225 0 : .await?;
226 :
227 0 : Ok(())
228 0 : }
229 : }
230 :
231 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
232 : impl std::fmt::Display for ImageLayer {
233 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 0 : write!(f, "{}", self.layer_desc().short_id())
235 0 : }
236 : }
237 :
238 : impl AsLayerDesc for ImageLayer {
239 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
240 0 : &self.desc
241 0 : }
242 : }
243 :
244 : impl ImageLayer {
245 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
246 0 : self.desc.dump();
247 0 :
248 0 : if !verbose {
249 0 : return Ok(());
250 0 : }
251 :
252 0 : let inner = self.load(ctx).await?;
253 :
254 0 : inner.dump(ctx).await?;
255 :
256 0 : Ok(())
257 0 : }
258 :
259 3768 : fn temp_path_for(
260 3768 : conf: &PageServerConf,
261 3768 : timeline_id: TimelineId,
262 3768 : tenant_shard_id: TenantShardId,
263 3768 : fname: &ImageLayerName,
264 3768 : ) -> Utf8PathBuf {
265 : // TempVirtualFile requires us to never reuse a filename while an old
266 : // instance of TempVirtualFile created with that filename is not done dropping yet.
267 : // So, we use a monotonic counter to disambiguate the filenames.
268 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
269 3768 : let filename_disambiguator =
270 3768 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
271 3768 :
272 3768 : conf.timeline_path(&tenant_shard_id, &timeline_id)
273 3768 : .join(format!(
274 3768 : "{fname}.{:x}.{TEMP_FILE_SUFFIX}",
275 3768 : filename_disambiguator
276 3768 : ))
277 3768 : }
278 :
279 : ///
280 : /// Open the underlying file and read the metadata into memory, if it's
281 : /// not loaded already.
282 : ///
283 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
284 0 : self.inner
285 0 : .get_or_try_init(|| self.load_inner(ctx))
286 0 : .await
287 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
288 0 : }
289 :
290 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
291 0 : let path = self.path();
292 :
293 0 : let loaded =
294 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
295 :
296 : // not production code
297 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
298 0 : let expected_layer_name = self.layer_desc().layer_name();
299 0 :
300 0 : if actual_layer_name != expected_layer_name {
301 0 : println!("warning: filename does not match what is expected from in-file summary");
302 0 : println!("actual: {:?}", actual_layer_name.to_string());
303 0 : println!("expected: {:?}", expected_layer_name.to_string());
304 0 : }
305 :
306 0 : Ok(loaded)
307 0 : }
308 :
309 : /// Create an ImageLayer struct representing an existing file on disk.
310 : ///
311 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
312 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
313 0 : let mut summary_buf = vec![0; PAGE_SZ];
314 0 : file.read_exact_at(&mut summary_buf, 0)?;
315 0 : let summary = Summary::des_prefix(&summary_buf)?;
316 0 : let metadata = file
317 0 : .metadata()
318 0 : .context("get file metadata to determine size")?;
319 :
320 : // This function is never used for constructing layers in a running pageserver,
321 : // so it does not need an accurate TenantShardId.
322 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
323 0 :
324 0 : Ok(ImageLayer {
325 0 : path: path.to_path_buf(),
326 0 : desc: PersistentLayerDesc::new_img(
327 0 : tenant_shard_id,
328 0 : summary.timeline_id,
329 0 : summary.key_range,
330 0 : summary.lsn,
331 0 : metadata.len(),
332 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
333 0 : lsn: summary.lsn,
334 0 : inner: OnceCell::new(),
335 0 : })
336 0 : }
337 :
338 0 : fn path(&self) -> Utf8PathBuf {
339 0 : self.path.clone()
340 0 : }
341 : }
342 :
343 : #[derive(thiserror::Error, Debug)]
344 : pub enum RewriteSummaryError {
345 : #[error("magic mismatch")]
346 : MagicMismatch,
347 : #[error(transparent)]
348 : Other(#[from] anyhow::Error),
349 : }
350 :
351 : impl From<std::io::Error> for RewriteSummaryError {
352 0 : fn from(e: std::io::Error) -> Self {
353 0 : Self::Other(anyhow::anyhow!(e))
354 0 : }
355 : }
356 :
357 : impl ImageLayer {
358 0 : pub async fn rewrite_summary<F>(
359 0 : path: &Utf8Path,
360 0 : rewrite: F,
361 0 : ctx: &RequestContext,
362 0 : ) -> Result<(), RewriteSummaryError>
363 0 : where
364 0 : F: Fn(Summary) -> Summary,
365 0 : {
366 0 : let file = VirtualFile::open_with_options_v2(
367 0 : path,
368 0 : virtual_file::OpenOptions::new().read(true).write(true),
369 0 : ctx,
370 0 : )
371 0 : .await
372 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
373 0 : let file_id = page_cache::next_file_id();
374 0 : let block_reader = FileBlockReader::new(&file, file_id);
375 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
376 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
377 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
378 0 : return Err(RewriteSummaryError::MagicMismatch);
379 0 : }
380 0 :
381 0 : let new_summary = rewrite(actual_summary);
382 :
383 0 : let buf = new_summary.ser_into_page().context("serialize")?;
384 0 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
385 0 : res?;
386 0 : Ok(())
387 0 : }
388 : }
389 :
390 : impl ImageLayerInner {
391 840 : pub(crate) fn key_range(&self) -> &Range<Key> {
392 840 : &self.key_range
393 840 : }
394 :
395 840 : pub(crate) fn lsn(&self) -> Lsn {
396 840 : self.lsn
397 840 : }
398 :
399 900 : pub(super) async fn load(
400 900 : path: &Utf8Path,
401 900 : lsn: Lsn,
402 900 : summary: Option<Summary>,
403 900 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
404 900 : ctx: &RequestContext,
405 900 : ) -> anyhow::Result<Self> {
406 900 : let file = Arc::new(
407 900 : VirtualFile::open_v2(path, ctx)
408 900 : .await
409 900 : .context("open layer file")?,
410 : );
411 900 : let file_id = page_cache::next_file_id();
412 900 : let block_reader = FileBlockReader::new(&file, file_id);
413 900 : let summary_blk = block_reader
414 900 : .read_blk(0, ctx)
415 900 : .await
416 900 : .context("read first block")?;
417 :
418 : // length is the only way how this could fail, so it's not actually likely at all unless
419 : // read_blk returns wrong sized block.
420 : //
421 : // TODO: confirm and make this into assertion
422 900 : let actual_summary =
423 900 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
424 :
425 900 : if let Some(mut expected_summary) = summary {
426 : // production code path
427 900 : expected_summary.index_start_blk = actual_summary.index_start_blk;
428 900 : expected_summary.index_root_blk = actual_summary.index_root_blk;
429 900 : // mask out the timeline_id, but still require the layers to be from the same tenant
430 900 : expected_summary.timeline_id = actual_summary.timeline_id;
431 900 :
432 900 : if actual_summary != expected_summary {
433 0 : bail!(
434 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
435 0 : actual_summary,
436 0 : expected_summary
437 0 : );
438 900 : }
439 0 : }
440 :
441 900 : Ok(ImageLayerInner {
442 900 : index_start_blk: actual_summary.index_start_blk,
443 900 : index_root_blk: actual_summary.index_root_blk,
444 900 : lsn,
445 900 : file,
446 900 : file_id,
447 900 : max_vectored_read_bytes,
448 900 : key_range: actual_summary.key_range,
449 900 : })
450 900 : }
451 :
452 : // Look up the keys in the provided keyspace and update
453 : // the reconstruct state with whatever is found.
454 181512 : pub(super) async fn get_values_reconstruct_data(
455 181512 : &self,
456 181512 : this: ResidentLayer,
457 181512 : keyspace: KeySpace,
458 181512 : reconstruct_state: &mut ValuesReconstructState,
459 181512 : ctx: &RequestContext,
460 181512 : ) -> Result<(), GetVectoredError> {
461 181512 : let reads = self
462 181512 : .plan_reads(keyspace, None, ctx)
463 181512 : .await
464 181512 : .map_err(GetVectoredError::Other)?;
465 :
466 181512 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
467 181512 : .await;
468 :
469 181512 : reconstruct_state.on_image_layer_visited(&self.key_range);
470 181512 :
471 181512 : Ok(())
472 181512 : }
473 :
474 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
475 : /// and the keys in this layer.
476 : ///
477 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
478 : /// this shard.
479 181560 : async fn plan_reads(
480 181560 : &self,
481 181560 : keyspace: KeySpace,
482 181560 : shard_identity: Option<&ShardIdentity>,
483 181560 : ctx: &RequestContext,
484 181560 : ) -> anyhow::Result<Vec<VectoredRead>> {
485 181560 : let mut planner = VectoredReadPlanner::new(
486 181560 : self.max_vectored_read_bytes
487 181560 : .expect("Layer is loaded with max vectored bytes config")
488 181560 : .0
489 181560 : .into(),
490 181560 : );
491 181560 :
492 181560 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
493 181560 : let tree_reader =
494 181560 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
495 181560 :
496 181560 : let ctx = RequestContextBuilder::from(ctx)
497 181560 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
498 181560 : .attached_child();
499 :
500 269148 : for range in keyspace.ranges.iter() {
501 269148 : let mut range_end_handled = false;
502 269148 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
503 269148 : range.start.write_to_byte_slice(&mut search_key);
504 269148 :
505 269148 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
506 269148 : let mut index_stream = std::pin::pin!(index_stream);
507 :
508 1209540 : while let Some(index_entry) = index_stream.next().await {
509 1204944 : let (raw_key, offset) = index_entry?;
510 :
511 1204944 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
512 1204944 : assert!(key >= range.start);
513 :
514 1204944 : let flag = if let Some(shard_identity) = shard_identity {
515 393216 : if shard_identity.is_key_disposable(&key) {
516 294912 : BlobFlag::Ignore
517 : } else {
518 98304 : BlobFlag::None
519 : }
520 : } else {
521 811728 : BlobFlag::None
522 : };
523 :
524 1204944 : if key >= range.end {
525 264552 : planner.handle_range_end(offset);
526 264552 : range_end_handled = true;
527 264552 : break;
528 940392 : } else {
529 940392 : planner.handle(key, self.lsn, offset, flag);
530 940392 : }
531 : }
532 :
533 269148 : if !range_end_handled {
534 4596 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
535 4596 : planner.handle_range_end(payload_end);
536 264552 : }
537 : }
538 :
539 181560 : Ok(planner.finish())
540 181560 : }
541 :
542 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
543 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
544 48 : pub(super) async fn filter(
545 48 : &self,
546 48 : shard_identity: &ShardIdentity,
547 48 : writer: &mut ImageLayerWriter,
548 48 : ctx: &RequestContext,
549 48 : ) -> anyhow::Result<usize> {
550 : // Fragment the range into the regions owned by this ShardIdentity
551 48 : let plan = self
552 48 : .plan_reads(
553 48 : KeySpace {
554 48 : // If asked for the total key space, plan_reads will give us all the keys in the layer
555 48 : ranges: vec![Key::MIN..Key::MAX],
556 48 : },
557 48 : Some(shard_identity),
558 48 : ctx,
559 48 : )
560 48 : .await?;
561 :
562 48 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
563 48 : let mut key_count = 0;
564 48 : for read in plan.into_iter() {
565 48 : let buf_size = read.size();
566 48 :
567 48 : let buf = IoBufferMut::with_capacity(buf_size);
568 48 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
569 :
570 48 : let view = BufView::new_slice(&blobs_buf.buf);
571 :
572 98304 : for meta in blobs_buf.blobs.iter() {
573 : // Just read the raw header+data and pass it through to the target layer, without
574 : // decoding and recompressing it.
575 98304 : let raw = meta.raw_with_header(&view);
576 98304 : key_count += 1;
577 98304 : writer
578 98304 : .put_image_raw(meta.meta.key, raw.into_bytes(), ctx)
579 98304 : .await
580 98304 : .context(format!("Storing key {}", meta.meta.key))?;
581 : }
582 : }
583 :
584 48 : Ok(key_count)
585 48 : }
586 :
587 181512 : async fn do_reads_and_update_state(
588 181512 : &self,
589 181512 : this: ResidentLayer,
590 181512 : reads: Vec<VectoredRead>,
591 181512 : reconstruct_state: &mut ValuesReconstructState,
592 181512 : ctx: &RequestContext,
593 181512 : ) {
594 181512 : let max_vectored_read_bytes = self
595 181512 : .max_vectored_read_bytes
596 181512 : .expect("Layer is loaded with max vectored bytes config")
597 181512 : .0
598 181512 : .into();
599 :
600 207264 : for read in reads.into_iter() {
601 207264 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
602 547176 : for (_, blob_meta) in read.blobs_at.as_slice() {
603 547176 : let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
604 547176 : ios.insert((blob_meta.key, blob_meta.lsn), io);
605 547176 : }
606 :
607 207264 : let buf_size = read.size();
608 207264 :
609 207264 : if buf_size > max_vectored_read_bytes {
610 : // If the read is oversized, it should only contain one key.
611 0 : let offenders = read
612 0 : .blobs_at
613 0 : .as_slice()
614 0 : .iter()
615 0 : .filter_map(|(_, blob_meta)| {
616 0 : if blob_meta.key.is_rel_dir_key()
617 0 : || blob_meta.key == DBDIR_KEY
618 0 : || blob_meta.key.is_aux_file_key()
619 : {
620 : // The size of values for these keys is unbounded and can
621 : // grow very large in pathological cases.
622 0 : None
623 : } else {
624 0 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
625 : }
626 0 : })
627 0 : .join(", ");
628 0 :
629 0 : if !offenders.is_empty() {
630 0 : tracing::warn!(
631 0 : "Oversized vectored read ({} > {}) for keys {}",
632 : buf_size,
633 : max_vectored_read_bytes,
634 : offenders
635 : );
636 0 : }
637 207264 : }
638 :
639 207264 : let read_extend_residency = this.clone();
640 207264 : let read_from = self.file.clone();
641 207264 : let read_ctx = ctx.attached_child();
642 207264 : reconstruct_state
643 207264 : .spawn_io(async move {
644 207264 : let buf = IoBufferMut::with_capacity(buf_size);
645 207264 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
646 207264 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
647 :
648 207264 : match res {
649 207264 : Ok(blobs_buf) => {
650 207264 : let view = BufView::new_slice(&blobs_buf.buf);
651 547176 : for meta in blobs_buf.blobs.iter() {
652 547176 : let io: OnDiskValueIo =
653 547176 : ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
654 547176 : let img_buf = meta.read(&view).await;
655 :
656 547176 : let img_buf = match img_buf {
657 547176 : Ok(img_buf) => img_buf,
658 0 : Err(e) => {
659 0 : io.complete(Err(e));
660 0 : continue;
661 : }
662 : };
663 :
664 547176 : io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
665 : }
666 :
667 207264 : assert!(ios.is_empty());
668 : }
669 0 : Err(err) => {
670 0 : for (_, io) in ios {
671 0 : io.complete(Err(std::io::Error::new(
672 0 : err.kind(),
673 0 : "vec read failed",
674 0 : )));
675 0 : }
676 : }
677 : }
678 :
679 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
680 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
681 207264 : drop(read_extend_residency);
682 207264 : })
683 207264 : .await;
684 : }
685 181512 : }
686 :
687 336 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
688 336 : self.iter_with_options(
689 336 : ctx,
690 336 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
691 336 : 1024, // The default value. Unit tests might use a different value
692 336 : )
693 336 : }
694 :
695 756 : pub(crate) fn iter_with_options<'a>(
696 756 : &'a self,
697 756 : ctx: &'a RequestContext,
698 756 : max_read_size: u64,
699 756 : max_batch_size: usize,
700 756 : ) -> ImageLayerIterator<'a> {
701 756 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
702 756 : let tree_reader =
703 756 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
704 756 : ImageLayerIterator {
705 756 : image_layer: self,
706 756 : ctx,
707 756 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
708 756 : key_values_batch: VecDeque::new(),
709 756 : is_end: false,
710 756 : planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
711 756 : }
712 756 : }
713 :
714 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
715 : //
716 : // We're reusing the index traversal logical in plan_reads; would be nice to
717 : // factor that out.
718 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
719 0 : let plan = self
720 0 : .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
721 0 : .await?;
722 0 : Ok(plan
723 0 : .into_iter()
724 0 : .flat_map(|read| read.blobs_at)
725 0 : .map(|(_, blob_meta)| blob_meta.key)
726 0 : .collect())
727 0 : }
728 : }
729 :
730 : /// A builder object for constructing a new image layer.
731 : ///
732 : /// Usage:
733 : ///
734 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
735 : ///
736 : /// 2. Write the contents by calling `put_page_image` for every key-value
737 : /// pair in the key range.
738 : ///
739 : /// 3. Call `finish`.
740 : ///
741 : struct ImageLayerWriterInner {
742 : conf: &'static PageServerConf,
743 : path: Utf8PathBuf,
744 : timeline_id: TimelineId,
745 : tenant_shard_id: TenantShardId,
746 : key_range: Range<Key>,
747 : lsn: Lsn,
748 :
749 : // Total uncompressed bytes passed into put_image
750 : uncompressed_bytes: u64,
751 :
752 : // Like `uncompressed_bytes`,
753 : // but only of images we might consider for compression
754 : uncompressed_bytes_eligible: u64,
755 :
756 : // Like `uncompressed_bytes`, but only of images
757 : // where we have chosen their compressed form
758 : uncompressed_bytes_chosen: u64,
759 :
760 : // Number of keys in the layer.
761 : num_keys: usize,
762 :
763 : blob_writer: BlobWriter<TempVirtualFile>,
764 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
765 :
766 : #[cfg(feature = "testing")]
767 : last_written_key: Key,
768 : }
769 :
770 : impl ImageLayerWriterInner {
771 : ///
772 : /// Start building a new image layer.
773 : ///
774 : #[allow(clippy::too_many_arguments)]
775 3768 : async fn new(
776 3768 : conf: &'static PageServerConf,
777 3768 : timeline_id: TimelineId,
778 3768 : tenant_shard_id: TenantShardId,
779 3768 : key_range: &Range<Key>,
780 3768 : lsn: Lsn,
781 3768 : gate: &utils::sync::gate::Gate,
782 3768 : cancel: CancellationToken,
783 3768 : ctx: &RequestContext,
784 3768 : ) -> anyhow::Result<Self> {
785 3768 : // Create the file initially with a temporary filename.
786 3768 : // We'll atomically rename it to the final name when we're done.
787 3768 : let path = ImageLayer::temp_path_for(
788 3768 : conf,
789 3768 : timeline_id,
790 3768 : tenant_shard_id,
791 3768 : &ImageLayerName {
792 3768 : key_range: key_range.clone(),
793 3768 : lsn,
794 3768 : },
795 3768 : );
796 3768 : trace!("creating image layer {}", path);
797 3768 : let file = TempVirtualFile::new(
798 3768 : VirtualFile::open_with_options_v2(
799 3768 : &path,
800 3768 : virtual_file::OpenOptions::new()
801 3768 : .create_new(true)
802 3768 : .write(true),
803 3768 : ctx,
804 3768 : )
805 3768 : .await?,
806 3768 : gate.enter()?,
807 : );
808 :
809 : // Start at `PAGE_SZ` to make room for the header block.
810 3768 : let blob_writer = BlobWriter::new(
811 3768 : file,
812 3768 : PAGE_SZ as u64,
813 3768 : gate,
814 3768 : cancel,
815 3768 : ctx,
816 3768 : info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
817 0 : )?;
818 :
819 : // Initialize the b-tree index builder
820 3768 : let block_buf = BlockBuf::new();
821 3768 : let tree_builder = DiskBtreeBuilder::new(block_buf);
822 3768 :
823 3768 : let writer = Self {
824 3768 : conf,
825 3768 : path,
826 3768 : timeline_id,
827 3768 : tenant_shard_id,
828 3768 : key_range: key_range.clone(),
829 3768 : lsn,
830 3768 : tree: tree_builder,
831 3768 : blob_writer,
832 3768 : uncompressed_bytes: 0,
833 3768 : uncompressed_bytes_eligible: 0,
834 3768 : uncompressed_bytes_chosen: 0,
835 3768 : num_keys: 0,
836 3768 : #[cfg(feature = "testing")]
837 3768 : last_written_key: Key::MIN,
838 3768 : };
839 3768 :
840 3768 : Ok(writer)
841 3768 : }
842 :
843 : ///
844 : /// Write next value to the file.
845 : ///
846 : /// The page versions must be appended in blknum order.
847 : ///
848 234912 : async fn put_image(
849 234912 : &mut self,
850 234912 : key: Key,
851 234912 : img: Bytes,
852 234912 : ctx: &RequestContext,
853 234912 : ) -> anyhow::Result<()> {
854 234912 : ensure!(self.key_range.contains(&key));
855 234912 : let compression = self.conf.image_compression;
856 234912 : let uncompressed_len = img.len() as u64;
857 234912 : self.uncompressed_bytes += uncompressed_len;
858 234912 : self.num_keys += 1;
859 234912 : let (_img, res) = self
860 234912 : .blob_writer
861 234912 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
862 234912 : .await;
863 : // TODO: re-use the buffer for `img` further upstack
864 234912 : let (off, compression_info) = res?;
865 234912 : if compression_info.compressed_size.is_some() {
866 48012 : // The image has been considered for compression at least
867 48012 : self.uncompressed_bytes_eligible += uncompressed_len;
868 186900 : }
869 234912 : if compression_info.written_compressed {
870 0 : // The image has been compressed
871 0 : self.uncompressed_bytes_chosen += uncompressed_len;
872 234912 : }
873 :
874 234912 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
875 234912 : key.write_to_byte_slice(&mut keybuf);
876 234912 : self.tree.append(&keybuf, off)?;
877 :
878 : #[cfg(feature = "testing")]
879 234912 : {
880 234912 : self.last_written_key = key;
881 234912 : }
882 234912 :
883 234912 : Ok(())
884 234912 : }
885 :
886 : ///
887 : /// Write the next image to the file, as a raw blob header and data.
888 : ///
889 : /// The page versions must be appended in blknum order.
890 : ///
891 98304 : async fn put_image_raw(
892 98304 : &mut self,
893 98304 : key: Key,
894 98304 : raw_with_header: Bytes,
895 98304 : ctx: &RequestContext,
896 98304 : ) -> anyhow::Result<()> {
897 98304 : ensure!(self.key_range.contains(&key));
898 :
899 : // NB: we don't update the (un)compressed metrics, since we can't determine them without
900 : // decompressing the image. This seems okay.
901 98304 : self.num_keys += 1;
902 :
903 98304 : let (_, res) = self
904 98304 : .blob_writer
905 98304 : .write_blob_raw(raw_with_header.slice_len(), ctx)
906 98304 : .await;
907 98304 : let offset = res?;
908 :
909 98304 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
910 98304 : key.write_to_byte_slice(&mut keybuf);
911 98304 : self.tree.append(&keybuf, offset)?;
912 :
913 : #[cfg(feature = "testing")]
914 98304 : {
915 98304 : self.last_written_key = key;
916 98304 : }
917 98304 :
918 98304 : Ok(())
919 98304 : }
920 :
921 : ///
922 : /// Finish writing the image layer.
923 : ///
924 2292 : async fn finish(
925 2292 : self,
926 2292 : ctx: &RequestContext,
927 2292 : end_key: Option<Key>,
928 2292 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
929 2292 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
930 2292 :
931 2292 : // Calculate compression ratio
932 2292 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
933 2292 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
934 2292 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
935 2292 : .inc_by(self.uncompressed_bytes_eligible);
936 2292 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
937 2292 :
938 2292 : // NB: filter() may pass through raw pages from a different layer, without looking at
939 2292 : // whether these are compressed or not. We don't track metrics for these, so avoid
940 2292 : // increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too.
941 2292 : if self.uncompressed_bytes > 0 {
942 2256 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
943 2256 : };
944 :
945 2292 : let file = self
946 2292 : .blob_writer
947 2292 : .shutdown(
948 2292 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
949 2292 : ctx,
950 2292 : )
951 2292 : .await?;
952 :
953 : // Write out the index
954 2292 : let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
955 2292 : let (index_root_blk, block_buf) = self.tree.finish()?;
956 :
957 : // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
958 : // Should we just replace BlockBuf::blocks with one big buffer?
959 4776 : for buf in block_buf.blocks {
960 2484 : let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
961 2484 : res?;
962 2484 : offset += PAGE_SZ as u64;
963 : }
964 :
965 2292 : let final_key_range = if let Some(end_key) = end_key {
966 1800 : self.key_range.start..end_key
967 : } else {
968 492 : self.key_range.clone()
969 : };
970 :
971 : // Fill in the summary on blk 0
972 2292 : let summary = Summary {
973 2292 : magic: IMAGE_FILE_MAGIC,
974 2292 : format_version: STORAGE_FORMAT_VERSION,
975 2292 : tenant_id: self.tenant_shard_id.tenant_id,
976 2292 : timeline_id: self.timeline_id,
977 2292 : key_range: final_key_range.clone(),
978 2292 : lsn: self.lsn,
979 2292 : index_start_blk,
980 2292 : index_root_blk,
981 2292 : };
982 :
983 : // Writes summary at the first block (offset 0).
984 2292 : let buf = summary.ser_into_page()?;
985 2292 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
986 2292 : res?;
987 :
988 2292 : let metadata = file
989 2292 : .metadata()
990 2292 : .await
991 2292 : .context("get metadata to determine file size")?;
992 :
993 2292 : let desc = PersistentLayerDesc::new_img(
994 2292 : self.tenant_shard_id,
995 2292 : self.timeline_id,
996 2292 : final_key_range,
997 2292 : self.lsn,
998 2292 : metadata.len(),
999 2292 : );
1000 :
1001 : #[cfg(feature = "testing")]
1002 2292 : if let Some(end_key) = end_key {
1003 1800 : assert!(
1004 1800 : self.last_written_key < end_key,
1005 0 : "written key violates end_key range"
1006 : );
1007 492 : }
1008 :
1009 : // Note: Because we open the file in write-only mode, we cannot
1010 : // reuse the same VirtualFile for reading later. That's why we don't
1011 : // set inner.file here. The first read will have to re-open it.
1012 :
1013 : // fsync the file
1014 2292 : file.sync_all()
1015 2292 : .await
1016 2292 : .maybe_fatal_err("image_layer sync_all")?;
1017 :
1018 2292 : trace!("created image layer {}", self.path);
1019 :
1020 : // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
1021 : // keep the gate open also, so that it's safe for them to rename the file to its final destination.
1022 2292 : file.disarm_into_inner();
1023 2292 :
1024 2292 : Ok((desc, self.path))
1025 2292 : }
1026 : }
1027 :
1028 : /// A builder object for constructing a new image layer.
1029 : ///
1030 : /// Usage:
1031 : ///
1032 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
1033 : ///
1034 : /// 2. Write the contents by calling `put_page_image` for every key-value
1035 : /// pair in the key range.
1036 : ///
1037 : /// 3. Call `finish`.
1038 : ///
1039 : /// # Note
1040 : ///
1041 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
1042 : /// possible for the writer to drop before `finish` is actually called. So this
1043 : /// could lead to odd temporary files in the directory, exhausting file system.
1044 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
1045 : /// implementation that cleans up the temporary file in failure. It's not
1046 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
1047 : /// out some fields, making it impossible to implement `Drop`.
1048 : ///
1049 : #[must_use]
1050 : pub struct ImageLayerWriter {
1051 : inner: Option<ImageLayerWriterInner>,
1052 : }
1053 :
1054 : impl ImageLayerWriter {
1055 : ///
1056 : /// Start building a new image layer.
1057 : ///
1058 : #[allow(clippy::too_many_arguments)]
1059 3768 : pub async fn new(
1060 3768 : conf: &'static PageServerConf,
1061 3768 : timeline_id: TimelineId,
1062 3768 : tenant_shard_id: TenantShardId,
1063 3768 : key_range: &Range<Key>,
1064 3768 : lsn: Lsn,
1065 3768 : gate: &utils::sync::gate::Gate,
1066 3768 : cancel: CancellationToken,
1067 3768 : ctx: &RequestContext,
1068 3768 : ) -> anyhow::Result<ImageLayerWriter> {
1069 3768 : Ok(Self {
1070 3768 : inner: Some(
1071 3768 : ImageLayerWriterInner::new(
1072 3768 : conf,
1073 3768 : timeline_id,
1074 3768 : tenant_shard_id,
1075 3768 : key_range,
1076 3768 : lsn,
1077 3768 : gate,
1078 3768 : cancel,
1079 3768 : ctx,
1080 3768 : )
1081 3768 : .await?,
1082 : ),
1083 : })
1084 3768 : }
1085 :
1086 : ///
1087 : /// Write next value to the file.
1088 : ///
1089 : /// The page versions must be appended in blknum order.
1090 : ///
1091 234912 : pub async fn put_image(
1092 234912 : &mut self,
1093 234912 : key: Key,
1094 234912 : img: Bytes,
1095 234912 : ctx: &RequestContext,
1096 234912 : ) -> anyhow::Result<()> {
1097 234912 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
1098 234912 : }
1099 :
1100 : ///
1101 : /// Write the next value to the file, as a raw header and data. This allows passing through a
1102 : /// raw, potentially compressed image from a different layer file without recompressing it.
1103 : ///
1104 : /// The page versions must be appended in blknum order.
1105 : ///
1106 98304 : pub async fn put_image_raw(
1107 98304 : &mut self,
1108 98304 : key: Key,
1109 98304 : raw_with_header: Bytes,
1110 98304 : ctx: &RequestContext,
1111 98304 : ) -> anyhow::Result<()> {
1112 98304 : self.inner
1113 98304 : .as_mut()
1114 98304 : .unwrap()
1115 98304 : .put_image_raw(key, raw_with_header, ctx)
1116 98304 : .await
1117 98304 : }
1118 :
1119 : /// Estimated size of the image layer.
1120 51324 : pub(crate) fn estimated_size(&self) -> u64 {
1121 51324 : let inner = self.inner.as_ref().unwrap();
1122 51324 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
1123 51324 : }
1124 :
1125 51912 : pub(crate) fn num_keys(&self) -> usize {
1126 51912 : self.inner.as_ref().unwrap().num_keys
1127 51912 : }
1128 :
1129 : ///
1130 : /// Finish writing the image layer.
1131 : ///
1132 492 : pub(crate) async fn finish(
1133 492 : mut self,
1134 492 : ctx: &RequestContext,
1135 492 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1136 492 : self.inner.take().unwrap().finish(ctx, None).await
1137 492 : }
1138 :
1139 : /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
1140 1800 : pub(super) async fn finish_with_end_key(
1141 1800 : mut self,
1142 1800 : end_key: Key,
1143 1800 : ctx: &RequestContext,
1144 1800 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1145 1800 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
1146 1800 : }
1147 : }
1148 :
1149 : pub struct ImageLayerIterator<'a> {
1150 : image_layer: &'a ImageLayerInner,
1151 : ctx: &'a RequestContext,
1152 : planner: StreamingVectoredReadPlanner,
1153 : index_iter: DiskBtreeIterator<'a>,
1154 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1155 : is_end: bool,
1156 : }
1157 :
1158 : impl ImageLayerIterator<'_> {
1159 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1160 0 : self.image_layer.layer_dbg_info()
1161 0 : }
1162 :
1163 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1164 114780 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1165 114780 : assert!(self.key_values_batch.is_empty());
1166 114780 : assert!(!self.is_end);
1167 :
1168 114780 : let plan = loop {
1169 174444 : if let Some(res) = self.index_iter.next().await {
1170 173856 : let (raw_key, offset) = res?;
1171 173856 : if let Some(batch_plan) = self.planner.handle(
1172 173856 : Key::from_slice(&raw_key[..KEY_SIZE]),
1173 173856 : self.image_layer.lsn,
1174 173856 : offset,
1175 173856 : true,
1176 173856 : ) {
1177 114192 : break batch_plan;
1178 59664 : }
1179 : } else {
1180 588 : self.is_end = true;
1181 588 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1182 588 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1183 588 : break item;
1184 : } else {
1185 0 : return Ok(()); // TODO: a test case on empty iterator
1186 : }
1187 : }
1188 : };
1189 114780 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1190 114780 : let mut next_batch = std::collections::VecDeque::new();
1191 114780 : let buf_size = plan.size();
1192 114780 : let buf = IoBufferMut::with_capacity(buf_size);
1193 114780 : let blobs_buf = vectored_blob_reader
1194 114780 : .read_blobs(&plan, buf, self.ctx)
1195 114780 : .await?;
1196 114780 : let view = BufView::new_slice(&blobs_buf.buf);
1197 173688 : for meta in blobs_buf.blobs.iter() {
1198 173688 : let img_buf = meta.read(&view).await?;
1199 173688 : next_batch.push_back((
1200 173688 : meta.meta.key,
1201 173688 : self.image_layer.lsn,
1202 173688 : Value::Image(img_buf.into_bytes()),
1203 173688 : ));
1204 : }
1205 114780 : self.key_values_batch = next_batch;
1206 114780 : Ok(())
1207 114780 : }
1208 :
1209 172968 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1210 172968 : if self.key_values_batch.is_empty() {
1211 115248 : if self.is_end {
1212 972 : return Ok(None);
1213 114276 : }
1214 114276 : self.next_batch().await?;
1215 57720 : }
1216 171996 : Ok(Some(
1217 171996 : self.key_values_batch
1218 171996 : .pop_front()
1219 171996 : .expect("should not be empty"),
1220 171996 : ))
1221 172968 : }
1222 : }
1223 :
1224 : #[cfg(test)]
1225 : mod test {
1226 : use std::sync::Arc;
1227 : use std::time::Duration;
1228 :
1229 : use bytes::Bytes;
1230 : use itertools::Itertools;
1231 : use pageserver_api::key::Key;
1232 : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
1233 : use pageserver_api::value::Value;
1234 : use utils::generation::Generation;
1235 : use utils::id::{TenantId, TimelineId};
1236 : use utils::lsn::Lsn;
1237 :
1238 : use super::{ImageLayerIterator, ImageLayerWriter};
1239 : use crate::DEFAULT_PG_VERSION;
1240 : use crate::context::RequestContext;
1241 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1242 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1243 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1244 : use crate::tenant::{TenantShard, Timeline};
1245 :
1246 : #[tokio::test]
1247 12 : async fn image_layer_rewrite() {
1248 12 : let tenant_conf = pageserver_api::models::TenantConfig {
1249 12 : gc_period: Some(Duration::ZERO),
1250 12 : compaction_period: Some(Duration::ZERO),
1251 12 : ..Default::default()
1252 12 : };
1253 12 : let tenant_id = TenantId::generate();
1254 12 : let mut gen_ = Generation::new(0xdead0001);
1255 60 : let mut get_next_gen = || {
1256 60 : let ret = gen_;
1257 60 : gen_ = gen_.next();
1258 60 : ret
1259 60 : };
1260 12 : // The LSN at which we will create an image layer to filter
1261 12 : let lsn = Lsn(0xdeadbeef0000);
1262 12 : let timeline_id = TimelineId::generate();
1263 12 :
1264 12 : //
1265 12 : // Create an unsharded parent with a layer.
1266 12 : //
1267 12 :
1268 12 : let harness = TenantHarness::create_custom(
1269 12 : "test_image_layer_rewrite--parent",
1270 12 : tenant_conf.clone(),
1271 12 : tenant_id,
1272 12 : ShardIdentity::unsharded(),
1273 12 : get_next_gen(),
1274 12 : )
1275 12 : .await
1276 12 : .unwrap();
1277 12 : let (tenant, ctx) = harness.load().await;
1278 12 : let timeline = tenant
1279 12 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1280 12 : .await
1281 12 : .unwrap();
1282 12 :
1283 12 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1284 12 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1285 12 : let input_end = Key::from_hex("000000067f00000001000000ae0000002000").unwrap();
1286 12 : let range = input_start..input_end;
1287 12 :
1288 12 : // Build an image layer to filter
1289 12 : let resident = {
1290 12 : let mut writer = ImageLayerWriter::new(
1291 12 : harness.conf,
1292 12 : timeline_id,
1293 12 : harness.tenant_shard_id,
1294 12 : &range,
1295 12 : lsn,
1296 12 : &timeline.gate,
1297 12 : timeline.cancel.clone(),
1298 12 : &ctx,
1299 12 : )
1300 12 : .await
1301 12 : .unwrap();
1302 12 :
1303 12 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1304 12 : let mut key = range.start;
1305 98316 : while key < range.end {
1306 98304 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1307 98304 :
1308 98304 : key = key.next();
1309 12 : }
1310 12 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1311 12 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1312 12 : };
1313 12 : let original_size = resident.metadata().file_size;
1314 12 :
1315 12 : //
1316 12 : // Create child shards and do the rewrite, exercising filter().
1317 12 : // TODO: abstraction in TenantHarness for splits.
1318 12 : //
1319 12 :
1320 12 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1321 12 : // range, middle of key range.
1322 12 : let shard_count = ShardCount::new(4);
1323 48 : for shard_number in 0..shard_count.count() {
1324 12 : //
1325 12 : // mimic the shard split
1326 12 : //
1327 48 : let shard_identity = ShardIdentity::new(
1328 48 : ShardNumber(shard_number),
1329 48 : shard_count,
1330 48 : ShardStripeSize(0x800),
1331 48 : )
1332 48 : .unwrap();
1333 48 : let harness = TenantHarness::create_custom(
1334 48 : Box::leak(Box::new(format!(
1335 48 : "test_image_layer_rewrite--child{}",
1336 48 : shard_identity.shard_slug()
1337 48 : ))),
1338 48 : tenant_conf.clone(),
1339 48 : tenant_id,
1340 48 : shard_identity,
1341 48 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1342 48 : // But here, all we care about is that the gen number is unique.
1343 48 : get_next_gen(),
1344 48 : )
1345 48 : .await
1346 48 : .unwrap();
1347 48 : let (tenant, ctx) = harness.load().await;
1348 48 : let timeline = tenant
1349 48 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1350 48 : .await
1351 48 : .unwrap();
1352 12 :
1353 12 : //
1354 12 : // use filter() and make assertions
1355 12 : //
1356 12 :
1357 48 : let mut filtered_writer = ImageLayerWriter::new(
1358 48 : harness.conf,
1359 48 : timeline_id,
1360 48 : harness.tenant_shard_id,
1361 48 : &range,
1362 48 : lsn,
1363 48 : &timeline.gate,
1364 48 : timeline.cancel.clone(),
1365 48 : &ctx,
1366 48 : )
1367 48 : .await
1368 48 : .unwrap();
1369 12 :
1370 48 : let wrote_keys = resident
1371 48 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1372 48 : .await
1373 48 : .unwrap();
1374 48 : let replacement = if wrote_keys > 0 {
1375 36 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1376 36 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1377 36 : Some(resident)
1378 12 : } else {
1379 12 : None
1380 12 : };
1381 12 :
1382 12 : // This exact size and those below will need updating as/when the layer encoding changes, but
1383 12 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1384 48 : assert_eq!(original_size, 122880);
1385 12 :
1386 48 : match shard_number {
1387 12 : 0 => {
1388 12 : // We should have written out just one stripe for our shard identity
1389 12 : assert_eq!(wrote_keys, 0x800);
1390 12 : let replacement = replacement.unwrap();
1391 12 :
1392 12 : // We should have dropped some of the data
1393 12 : assert!(replacement.metadata().file_size < original_size);
1394 12 : assert!(replacement.metadata().file_size > 0);
1395 12 :
1396 12 : // Assert that we dropped ~3/4 of the data.
1397 12 : assert_eq!(replacement.metadata().file_size, 49152);
1398 12 : }
1399 12 : 1 => {
1400 12 : // Shard 1 has no keys in our input range
1401 12 : assert_eq!(wrote_keys, 0x0);
1402 12 : assert!(replacement.is_none());
1403 12 : }
1404 12 : 2 => {
1405 12 : // Shard 2 has one stripes in the input range
1406 12 : assert_eq!(wrote_keys, 0x800);
1407 12 : let replacement = replacement.unwrap();
1408 12 : assert!(replacement.metadata().file_size < original_size);
1409 12 : assert!(replacement.metadata().file_size > 0);
1410 12 : assert_eq!(replacement.metadata().file_size, 49152);
1411 12 : }
1412 12 : 3 => {
1413 12 : // Shard 3 has two stripes in the input range
1414 12 : assert_eq!(wrote_keys, 0x1000);
1415 12 : let replacement = replacement.unwrap();
1416 12 : assert!(replacement.metadata().file_size < original_size);
1417 12 : assert!(replacement.metadata().file_size > 0);
1418 12 : assert_eq!(replacement.metadata().file_size, 73728);
1419 12 : }
1420 12 : _ => unreachable!(),
1421 12 : }
1422 12 : }
1423 12 : }
1424 :
1425 12 : async fn produce_image_layer(
1426 12 : tenant: &TenantShard,
1427 12 : tline: &Arc<Timeline>,
1428 12 : mut images: Vec<(Key, Bytes)>,
1429 12 : lsn: Lsn,
1430 12 : ctx: &RequestContext,
1431 12 : ) -> anyhow::Result<ResidentLayer> {
1432 12 : images.sort();
1433 12 : let (key_start, _) = images.first().unwrap();
1434 12 : let (key_last, _) = images.last().unwrap();
1435 12 : let key_end = key_last.next();
1436 12 : let key_range = *key_start..key_end;
1437 12 : let mut writer = ImageLayerWriter::new(
1438 12 : tenant.conf,
1439 12 : tline.timeline_id,
1440 12 : tenant.tenant_shard_id,
1441 12 : &key_range,
1442 12 : lsn,
1443 12 : &tline.gate,
1444 12 : tline.cancel.clone(),
1445 12 : ctx,
1446 12 : )
1447 12 : .await?;
1448 :
1449 12012 : for (key, img) in images {
1450 12000 : writer.put_image(key, img, ctx).await?;
1451 : }
1452 12 : let (desc, path) = writer.finish(ctx).await?;
1453 12 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1454 :
1455 12 : Ok::<_, anyhow::Error>(img_layer)
1456 12 : }
1457 :
1458 168 : async fn assert_img_iter_equal(
1459 168 : img_iter: &mut ImageLayerIterator<'_>,
1460 168 : expect: &[(Key, Bytes)],
1461 168 : expect_lsn: Lsn,
1462 168 : ) {
1463 168 : let mut expect_iter = expect.iter();
1464 : loop {
1465 168168 : let o1 = img_iter.next().await.unwrap();
1466 168168 : let o2 = expect_iter.next();
1467 168168 : match (o1, o2) {
1468 168 : (None, None) => break,
1469 168000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1470 168000 : let Value::Image(i1) = v1 else {
1471 0 : panic!("expect Value::Image")
1472 : };
1473 168000 : assert_eq!(&k1, k2);
1474 168000 : assert_eq!(l1, expect_lsn);
1475 168000 : assert_eq!(&i1, i2);
1476 : }
1477 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1478 : }
1479 : }
1480 168 : }
1481 :
1482 : #[tokio::test]
1483 12 : async fn image_layer_iterator() {
1484 12 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1485 12 : let (tenant, ctx) = harness.load().await;
1486 12 :
1487 12 : let tline = tenant
1488 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1489 12 : .await
1490 12 : .unwrap();
1491 12 :
1492 12000 : fn get_key(id: u32) -> Key {
1493 12000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1494 12000 : key.field6 = id;
1495 12000 : key
1496 12000 : }
1497 12 : const N: usize = 1000;
1498 12 : let test_imgs = (0..N)
1499 12000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1500 12 : .collect_vec();
1501 12 : let resident_layer =
1502 12 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1503 12 : .await
1504 12 : .unwrap();
1505 12 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1506 36 : for max_read_size in [1, 1024] {
1507 192 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1508 168 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1509 168 : // Test if the batch size is correctly determined
1510 168 : let mut iter = img_layer.iter(&ctx);
1511 168 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1512 168 : let mut num_items = 0;
1513 672 : for _ in 0..3 {
1514 504 : iter.next_batch().await.unwrap();
1515 504 : num_items += iter.key_values_batch.len();
1516 504 : if max_read_size == 1 {
1517 12 : // every key should be a batch b/c the value is larger than max_read_size
1518 252 : assert_eq!(iter.key_values_batch.len(), 1);
1519 12 : } else {
1520 252 : assert!(iter.key_values_batch.len() <= batch_size);
1521 12 : }
1522 504 : if num_items >= N {
1523 12 : break;
1524 504 : }
1525 504 : iter.key_values_batch.clear();
1526 12 : }
1527 12 : // Test if the result is correct
1528 168 : let mut iter = img_layer.iter(&ctx);
1529 168 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1530 168 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1531 12 : }
1532 12 : }
1533 12 : }
1534 : }
|