Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use std::collections::{HashMap, VecDeque};
29 : use std::fs::File;
30 : use std::io::SeekFrom;
31 : use std::ops::Range;
32 : use std::os::unix::prelude::FileExt;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 :
36 : use anyhow::{Context, Result, bail, ensure};
37 : use bytes::Bytes;
38 : use camino::{Utf8Path, Utf8PathBuf};
39 : use hex;
40 : use itertools::Itertools;
41 : use pageserver_api::config::MaxVectoredReadBytes;
42 : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
43 : use pageserver_api::keyspace::KeySpace;
44 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
45 : use pageserver_api::value::Value;
46 : use rand::Rng;
47 : use rand::distributions::Alphanumeric;
48 : use serde::{Deserialize, Serialize};
49 : use tokio::sync::OnceCell;
50 : use tokio_stream::StreamExt;
51 : use tokio_util::sync::CancellationToken;
52 : use tracing::*;
53 : use utils::bin_ser::BeSer;
54 : use utils::id::{TenantId, TimelineId};
55 : use utils::lsn::Lsn;
56 :
57 : use super::layer_name::ImageLayerName;
58 : use super::{
59 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
60 : ValuesReconstructState,
61 : };
62 : use crate::config::PageServerConf;
63 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
64 : use crate::page_cache::{self, FileId, PAGE_SZ};
65 : use crate::tenant::blob_io::BlobWriter;
66 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
67 : use crate::tenant::disk_btree::{
68 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
69 : };
70 : use crate::tenant::timeline::GetVectoredError;
71 : use crate::tenant::vectored_blob_io::{
72 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
73 : VectoredReadPlanner,
74 : };
75 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
76 : use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
77 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
78 :
79 : ///
80 : /// Header stored in the beginning of the file
81 : ///
82 : /// After this comes the 'values' part, starting on block 1. After that,
83 : /// the 'index' starts at the block indicated by 'index_start_blk'
84 : ///
85 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
86 : pub struct Summary {
87 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
88 : pub magic: u16,
89 : pub format_version: u16,
90 :
91 : pub tenant_id: TenantId,
92 : pub timeline_id: TimelineId,
93 : pub key_range: Range<Key>,
94 : pub lsn: Lsn,
95 :
96 : /// Block number where the 'index' part of the file begins.
97 : pub index_start_blk: u32,
98 : /// Block within the 'index', where the B-tree root page is stored
99 : pub index_root_blk: u32,
100 : // the 'values' part starts after the summary header, on block 1.
101 : }
102 :
103 : impl From<&ImageLayer> for Summary {
104 0 : fn from(layer: &ImageLayer) -> Self {
105 0 : Self::expected(
106 0 : layer.desc.tenant_shard_id.tenant_id,
107 0 : layer.desc.timeline_id,
108 0 : layer.desc.key_range.clone(),
109 0 : layer.lsn,
110 0 : )
111 0 : }
112 : }
113 :
114 : impl Summary {
115 300 : pub(super) fn expected(
116 300 : tenant_id: TenantId,
117 300 : timeline_id: TimelineId,
118 300 : key_range: Range<Key>,
119 300 : lsn: Lsn,
120 300 : ) -> Self {
121 300 : Self {
122 300 : magic: IMAGE_FILE_MAGIC,
123 300 : format_version: STORAGE_FORMAT_VERSION,
124 300 : tenant_id,
125 300 : timeline_id,
126 300 : key_range,
127 300 : lsn,
128 300 :
129 300 : index_start_blk: 0,
130 300 : index_root_blk: 0,
131 300 : }
132 300 : }
133 : }
134 :
135 : /// This is used only from `pagectl`. Within pageserver, all layers are
136 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
137 : pub struct ImageLayer {
138 : path: Utf8PathBuf,
139 : pub desc: PersistentLayerDesc,
140 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
141 : pub lsn: Lsn,
142 : inner: OnceCell<ImageLayerInner>,
143 : }
144 :
145 : impl std::fmt::Debug for ImageLayer {
146 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 : use super::RangeDisplayDebug;
148 :
149 0 : f.debug_struct("ImageLayer")
150 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
151 0 : .field("file_size", &self.desc.file_size)
152 0 : .field("lsn", &self.lsn)
153 0 : .field("inner", &self.inner)
154 0 : .finish()
155 0 : }
156 : }
157 :
158 : /// ImageLayer is the in-memory data structure associated with an on-disk image
159 : /// file.
160 : pub struct ImageLayerInner {
161 : // values copied from summary
162 : index_start_blk: u32,
163 : index_root_blk: u32,
164 :
165 : key_range: Range<Key>,
166 : lsn: Lsn,
167 :
168 : file: Arc<VirtualFile>,
169 : file_id: FileId,
170 :
171 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
172 : }
173 :
174 : impl ImageLayerInner {
175 0 : pub(crate) fn layer_dbg_info(&self) -> String {
176 0 : format!(
177 0 : "image {}..{} {}",
178 0 : self.key_range().start,
179 0 : self.key_range().end,
180 0 : self.lsn()
181 0 : )
182 0 : }
183 : }
184 :
185 : impl std::fmt::Debug for ImageLayerInner {
186 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 0 : f.debug_struct("ImageLayerInner")
188 0 : .field("index_start_blk", &self.index_start_blk)
189 0 : .field("index_root_blk", &self.index_root_blk)
190 0 : .finish()
191 0 : }
192 : }
193 :
194 : impl ImageLayerInner {
195 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
196 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
197 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
198 0 : self.index_start_blk,
199 0 : self.index_root_blk,
200 0 : block_reader,
201 0 : );
202 0 :
203 0 : tree_reader.dump(ctx).await?;
204 :
205 0 : tree_reader
206 0 : .visit(
207 0 : &[0u8; KEY_SIZE],
208 0 : VisitDirection::Forwards,
209 0 : |key, value| {
210 0 : println!("key: {} offset {}", hex::encode(key), value);
211 0 : true
212 0 : },
213 0 : ctx,
214 0 : )
215 0 : .await?;
216 :
217 0 : Ok(())
218 0 : }
219 : }
220 :
221 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
222 : impl std::fmt::Display for ImageLayer {
223 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 0 : write!(f, "{}", self.layer_desc().short_id())
225 0 : }
226 : }
227 :
228 : impl AsLayerDesc for ImageLayer {
229 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
230 0 : &self.desc
231 0 : }
232 : }
233 :
234 : impl ImageLayer {
235 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
236 0 : self.desc.dump();
237 0 :
238 0 : if !verbose {
239 0 : return Ok(());
240 0 : }
241 :
242 0 : let inner = self.load(ctx).await?;
243 :
244 0 : inner.dump(ctx).await?;
245 :
246 0 : Ok(())
247 0 : }
248 :
249 1248 : fn temp_path_for(
250 1248 : conf: &PageServerConf,
251 1248 : timeline_id: TimelineId,
252 1248 : tenant_shard_id: TenantShardId,
253 1248 : fname: &ImageLayerName,
254 1248 : ) -> Utf8PathBuf {
255 1248 : let rand_string: String = rand::thread_rng()
256 1248 : .sample_iter(&Alphanumeric)
257 1248 : .take(8)
258 1248 : .map(char::from)
259 1248 : .collect();
260 1248 :
261 1248 : conf.timeline_path(&tenant_shard_id, &timeline_id)
262 1248 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
263 1248 : }
264 :
265 : ///
266 : /// Open the underlying file and read the metadata into memory, if it's
267 : /// not loaded already.
268 : ///
269 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
270 0 : self.inner
271 0 : .get_or_try_init(|| self.load_inner(ctx))
272 0 : .await
273 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
274 0 : }
275 :
276 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
277 0 : let path = self.path();
278 :
279 0 : let loaded =
280 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
281 :
282 : // not production code
283 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
284 0 : let expected_layer_name = self.layer_desc().layer_name();
285 0 :
286 0 : if actual_layer_name != expected_layer_name {
287 0 : println!("warning: filename does not match what is expected from in-file summary");
288 0 : println!("actual: {:?}", actual_layer_name.to_string());
289 0 : println!("expected: {:?}", expected_layer_name.to_string());
290 0 : }
291 :
292 0 : Ok(loaded)
293 0 : }
294 :
295 : /// Create an ImageLayer struct representing an existing file on disk.
296 : ///
297 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
298 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
299 0 : let mut summary_buf = vec![0; PAGE_SZ];
300 0 : file.read_exact_at(&mut summary_buf, 0)?;
301 0 : let summary = Summary::des_prefix(&summary_buf)?;
302 0 : let metadata = file
303 0 : .metadata()
304 0 : .context("get file metadata to determine size")?;
305 :
306 : // This function is never used for constructing layers in a running pageserver,
307 : // so it does not need an accurate TenantShardId.
308 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
309 0 :
310 0 : Ok(ImageLayer {
311 0 : path: path.to_path_buf(),
312 0 : desc: PersistentLayerDesc::new_img(
313 0 : tenant_shard_id,
314 0 : summary.timeline_id,
315 0 : summary.key_range,
316 0 : summary.lsn,
317 0 : metadata.len(),
318 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
319 0 : lsn: summary.lsn,
320 0 : inner: OnceCell::new(),
321 0 : })
322 0 : }
323 :
324 0 : fn path(&self) -> Utf8PathBuf {
325 0 : self.path.clone()
326 0 : }
327 : }
328 :
329 : #[derive(thiserror::Error, Debug)]
330 : pub enum RewriteSummaryError {
331 : #[error("magic mismatch")]
332 : MagicMismatch,
333 : #[error(transparent)]
334 : Other(#[from] anyhow::Error),
335 : }
336 :
337 : impl From<std::io::Error> for RewriteSummaryError {
338 0 : fn from(e: std::io::Error) -> Self {
339 0 : Self::Other(anyhow::anyhow!(e))
340 0 : }
341 : }
342 :
343 : impl ImageLayer {
344 0 : pub async fn rewrite_summary<F>(
345 0 : path: &Utf8Path,
346 0 : rewrite: F,
347 0 : ctx: &RequestContext,
348 0 : ) -> Result<(), RewriteSummaryError>
349 0 : where
350 0 : F: Fn(Summary) -> Summary,
351 0 : {
352 0 : let mut file = VirtualFile::open_with_options(
353 0 : path,
354 0 : virtual_file::OpenOptions::new().read(true).write(true),
355 0 : ctx,
356 0 : )
357 0 : .await
358 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
359 0 : let file_id = page_cache::next_file_id();
360 0 : let block_reader = FileBlockReader::new(&file, file_id);
361 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
362 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
363 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
364 0 : return Err(RewriteSummaryError::MagicMismatch);
365 0 : }
366 0 :
367 0 : let new_summary = rewrite(actual_summary);
368 0 :
369 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
370 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
371 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
372 0 : file.seek(SeekFrom::Start(0)).await?;
373 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
374 0 : res?;
375 0 : Ok(())
376 0 : }
377 : }
378 :
379 : impl ImageLayerInner {
380 280 : pub(crate) fn key_range(&self) -> &Range<Key> {
381 280 : &self.key_range
382 280 : }
383 :
384 280 : pub(crate) fn lsn(&self) -> Lsn {
385 280 : self.lsn
386 280 : }
387 :
388 300 : pub(super) async fn load(
389 300 : path: &Utf8Path,
390 300 : lsn: Lsn,
391 300 : summary: Option<Summary>,
392 300 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
393 300 : ctx: &RequestContext,
394 300 : ) -> anyhow::Result<Self> {
395 300 : let file = Arc::new(
396 300 : VirtualFile::open_v2(path, ctx)
397 300 : .await
398 300 : .context("open layer file")?,
399 : );
400 300 : let file_id = page_cache::next_file_id();
401 300 : let block_reader = FileBlockReader::new(&file, file_id);
402 300 : let summary_blk = block_reader
403 300 : .read_blk(0, ctx)
404 300 : .await
405 300 : .context("read first block")?;
406 :
407 : // length is the only way how this could fail, so it's not actually likely at all unless
408 : // read_blk returns wrong sized block.
409 : //
410 : // TODO: confirm and make this into assertion
411 300 : let actual_summary =
412 300 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
413 :
414 300 : if let Some(mut expected_summary) = summary {
415 : // production code path
416 300 : expected_summary.index_start_blk = actual_summary.index_start_blk;
417 300 : expected_summary.index_root_blk = actual_summary.index_root_blk;
418 300 : // mask out the timeline_id, but still require the layers to be from the same tenant
419 300 : expected_summary.timeline_id = actual_summary.timeline_id;
420 300 :
421 300 : if actual_summary != expected_summary {
422 0 : bail!(
423 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
424 0 : actual_summary,
425 0 : expected_summary
426 0 : );
427 300 : }
428 0 : }
429 :
430 300 : Ok(ImageLayerInner {
431 300 : index_start_blk: actual_summary.index_start_blk,
432 300 : index_root_blk: actual_summary.index_root_blk,
433 300 : lsn,
434 300 : file,
435 300 : file_id,
436 300 : max_vectored_read_bytes,
437 300 : key_range: actual_summary.key_range,
438 300 : })
439 300 : }
440 :
441 : // Look up the keys in the provided keyspace and update
442 : // the reconstruct state with whatever is found.
443 60504 : pub(super) async fn get_values_reconstruct_data(
444 60504 : &self,
445 60504 : this: ResidentLayer,
446 60504 : keyspace: KeySpace,
447 60504 : reconstruct_state: &mut ValuesReconstructState,
448 60504 : ctx: &RequestContext,
449 60504 : ) -> Result<(), GetVectoredError> {
450 60504 : let reads = self
451 60504 : .plan_reads(keyspace, None, ctx)
452 60504 : .await
453 60504 : .map_err(GetVectoredError::Other)?;
454 :
455 60504 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
456 60504 : .await;
457 :
458 60504 : reconstruct_state.on_image_layer_visited(&self.key_range);
459 60504 :
460 60504 : Ok(())
461 60504 : }
462 :
463 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
464 : /// and the keys in this layer.
465 : ///
466 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
467 : /// this shard.
468 60520 : async fn plan_reads(
469 60520 : &self,
470 60520 : keyspace: KeySpace,
471 60520 : shard_identity: Option<&ShardIdentity>,
472 60520 : ctx: &RequestContext,
473 60520 : ) -> anyhow::Result<Vec<VectoredRead>> {
474 60520 : let mut planner = VectoredReadPlanner::new(
475 60520 : self.max_vectored_read_bytes
476 60520 : .expect("Layer is loaded with max vectored bytes config")
477 60520 : .0
478 60520 : .into(),
479 60520 : );
480 60520 :
481 60520 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
482 60520 : let tree_reader =
483 60520 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
484 60520 :
485 60520 : let ctx = RequestContextBuilder::from(ctx)
486 60520 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
487 60520 : .attached_child();
488 :
489 89716 : for range in keyspace.ranges.iter() {
490 89716 : let mut range_end_handled = false;
491 89716 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
492 89716 : range.start.write_to_byte_slice(&mut search_key);
493 89716 :
494 89716 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
495 89716 : let mut index_stream = std::pin::pin!(index_stream);
496 :
497 403180 : while let Some(index_entry) = index_stream.next().await {
498 401648 : let (raw_key, offset) = index_entry?;
499 :
500 401648 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
501 401648 : assert!(key >= range.start);
502 :
503 401648 : let flag = if let Some(shard_identity) = shard_identity {
504 131072 : if shard_identity.is_key_disposable(&key) {
505 98304 : BlobFlag::Ignore
506 : } else {
507 32768 : BlobFlag::None
508 : }
509 : } else {
510 270576 : BlobFlag::None
511 : };
512 :
513 401648 : if key >= range.end {
514 88184 : planner.handle_range_end(offset);
515 88184 : range_end_handled = true;
516 88184 : break;
517 313464 : } else {
518 313464 : planner.handle(key, self.lsn, offset, flag);
519 313464 : }
520 : }
521 :
522 89716 : if !range_end_handled {
523 1532 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
524 1532 : planner.handle_range_end(payload_end);
525 88184 : }
526 : }
527 :
528 60520 : Ok(planner.finish())
529 60520 : }
530 :
531 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
532 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
533 16 : pub(super) async fn filter(
534 16 : &self,
535 16 : shard_identity: &ShardIdentity,
536 16 : writer: &mut ImageLayerWriter,
537 16 : ctx: &RequestContext,
538 16 : ) -> anyhow::Result<usize> {
539 : // Fragment the range into the regions owned by this ShardIdentity
540 16 : let plan = self
541 16 : .plan_reads(
542 16 : KeySpace {
543 16 : // If asked for the total key space, plan_reads will give us all the keys in the layer
544 16 : ranges: vec![Key::MIN..Key::MAX],
545 16 : },
546 16 : Some(shard_identity),
547 16 : ctx,
548 16 : )
549 16 : .await?;
550 :
551 16 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
552 16 : let mut key_count = 0;
553 16 : for read in plan.into_iter() {
554 16 : let buf_size = read.size();
555 16 :
556 16 : let buf = IoBufferMut::with_capacity(buf_size);
557 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
558 :
559 16 : let view = BufView::new_slice(&blobs_buf.buf);
560 :
561 32768 : for meta in blobs_buf.blobs.iter() {
562 32768 : let img_buf = meta.read(&view).await?;
563 :
564 32768 : key_count += 1;
565 32768 : writer
566 32768 : .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
567 32768 : .await
568 32768 : .context(format!("Storing key {}", meta.meta.key))?;
569 : }
570 : }
571 :
572 16 : Ok(key_count)
573 16 : }
574 :
575 60504 : async fn do_reads_and_update_state(
576 60504 : &self,
577 60504 : this: ResidentLayer,
578 60504 : reads: Vec<VectoredRead>,
579 60504 : reconstruct_state: &mut ValuesReconstructState,
580 60504 : ctx: &RequestContext,
581 60504 : ) {
582 60504 : let max_vectored_read_bytes = self
583 60504 : .max_vectored_read_bytes
584 60504 : .expect("Layer is loaded with max vectored bytes config")
585 60504 : .0
586 60504 : .into();
587 :
588 69088 : for read in reads.into_iter() {
589 69088 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
590 182392 : for (_, blob_meta) in read.blobs_at.as_slice() {
591 182392 : let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
592 182392 : ios.insert((blob_meta.key, blob_meta.lsn), io);
593 182392 : }
594 :
595 69088 : let buf_size = read.size();
596 69088 :
597 69088 : if buf_size > max_vectored_read_bytes {
598 : // If the read is oversized, it should only contain one key.
599 0 : let offenders = read
600 0 : .blobs_at
601 0 : .as_slice()
602 0 : .iter()
603 0 : .filter_map(|(_, blob_meta)| {
604 0 : if blob_meta.key.is_rel_dir_key()
605 0 : || blob_meta.key == DBDIR_KEY
606 0 : || blob_meta.key.is_aux_file_key()
607 : {
608 : // The size of values for these keys is unbounded and can
609 : // grow very large in pathological cases.
610 0 : None
611 : } else {
612 0 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
613 : }
614 0 : })
615 0 : .join(", ");
616 0 :
617 0 : if !offenders.is_empty() {
618 0 : tracing::warn!(
619 0 : "Oversized vectored read ({} > {}) for keys {}",
620 : buf_size,
621 : max_vectored_read_bytes,
622 : offenders
623 : );
624 0 : }
625 69088 : }
626 :
627 69088 : let read_extend_residency = this.clone();
628 69088 : let read_from = self.file.clone();
629 69088 : let read_ctx = ctx.attached_child();
630 69088 : reconstruct_state
631 69088 : .spawn_io(async move {
632 69088 : let buf = IoBufferMut::with_capacity(buf_size);
633 69088 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
634 69088 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
635 :
636 69088 : match res {
637 69088 : Ok(blobs_buf) => {
638 69088 : let view = BufView::new_slice(&blobs_buf.buf);
639 182392 : for meta in blobs_buf.blobs.iter() {
640 182392 : let io: OnDiskValueIo =
641 182392 : ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
642 182392 : let img_buf = meta.read(&view).await;
643 :
644 182392 : let img_buf = match img_buf {
645 182392 : Ok(img_buf) => img_buf,
646 0 : Err(e) => {
647 0 : io.complete(Err(e));
648 0 : continue;
649 : }
650 : };
651 :
652 182392 : io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
653 : }
654 :
655 69088 : assert!(ios.is_empty());
656 : }
657 0 : Err(err) => {
658 0 : for (_, io) in ios {
659 0 : io.complete(Err(std::io::Error::new(
660 0 : err.kind(),
661 0 : "vec read failed",
662 0 : )));
663 0 : }
664 : }
665 : }
666 :
667 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
668 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
669 69088 : drop(read_extend_residency);
670 69088 : })
671 69088 : .await;
672 : }
673 60504 : }
674 :
675 252 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
676 252 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
677 252 : let tree_reader =
678 252 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
679 252 : ImageLayerIterator {
680 252 : image_layer: self,
681 252 : ctx,
682 252 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
683 252 : key_values_batch: VecDeque::new(),
684 252 : is_end: false,
685 252 : planner: StreamingVectoredReadPlanner::new(
686 252 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
687 252 : 1024, // The default value. Unit tests might use a different value
688 252 : ),
689 252 : }
690 252 : }
691 :
692 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
693 : //
694 : // We're reusing the index traversal logical in plan_reads; would be nice to
695 : // factor that out.
696 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
697 0 : let plan = self
698 0 : .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
699 0 : .await?;
700 0 : Ok(plan
701 0 : .into_iter()
702 0 : .flat_map(|read| read.blobs_at)
703 0 : .map(|(_, blob_meta)| blob_meta.key)
704 0 : .collect())
705 0 : }
706 : }
707 :
708 : /// A builder object for constructing a new image layer.
709 : ///
710 : /// Usage:
711 : ///
712 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
713 : ///
714 : /// 2. Write the contents by calling `put_page_image` for every key-value
715 : /// pair in the key range.
716 : ///
717 : /// 3. Call `finish`.
718 : ///
719 : struct ImageLayerWriterInner {
720 : conf: &'static PageServerConf,
721 : path: Utf8PathBuf,
722 : timeline_id: TimelineId,
723 : tenant_shard_id: TenantShardId,
724 : key_range: Range<Key>,
725 : lsn: Lsn,
726 :
727 : // Total uncompressed bytes passed into put_image
728 : uncompressed_bytes: u64,
729 :
730 : // Like `uncompressed_bytes`,
731 : // but only of images we might consider for compression
732 : uncompressed_bytes_eligible: u64,
733 :
734 : // Like `uncompressed_bytes`, but only of images
735 : // where we have chosen their compressed form
736 : uncompressed_bytes_chosen: u64,
737 :
738 : // Number of keys in the layer.
739 : num_keys: usize,
740 :
741 : blob_writer: BlobWriter<false>,
742 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
743 :
744 : #[cfg(feature = "testing")]
745 : last_written_key: Key,
746 : }
747 :
748 : impl ImageLayerWriterInner {
749 : ///
750 : /// Start building a new image layer.
751 : ///
752 : #[allow(clippy::too_many_arguments)]
753 1248 : async fn new(
754 1248 : conf: &'static PageServerConf,
755 1248 : timeline_id: TimelineId,
756 1248 : tenant_shard_id: TenantShardId,
757 1248 : key_range: &Range<Key>,
758 1248 : lsn: Lsn,
759 1248 : gate: &utils::sync::gate::Gate,
760 1248 : cancel: CancellationToken,
761 1248 : ctx: &RequestContext,
762 1248 : ) -> anyhow::Result<Self> {
763 1248 : // Create the file initially with a temporary filename.
764 1248 : // We'll atomically rename it to the final name when we're done.
765 1248 : let path = ImageLayer::temp_path_for(
766 1248 : conf,
767 1248 : timeline_id,
768 1248 : tenant_shard_id,
769 1248 : &ImageLayerName {
770 1248 : key_range: key_range.clone(),
771 1248 : lsn,
772 1248 : },
773 1248 : );
774 1248 : trace!("creating image layer {}", path);
775 1248 : let mut file = {
776 1248 : VirtualFile::open_with_options(
777 1248 : &path,
778 1248 : virtual_file::OpenOptions::new()
779 1248 : .write(true)
780 1248 : .create_new(true),
781 1248 : ctx,
782 1248 : )
783 1248 : .await?
784 : };
785 : // make room for the header block
786 1248 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
787 1248 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
788 1248 :
789 1248 : // Initialize the b-tree index builder
790 1248 : let block_buf = BlockBuf::new();
791 1248 : let tree_builder = DiskBtreeBuilder::new(block_buf);
792 1248 :
793 1248 : let writer = Self {
794 1248 : conf,
795 1248 : path,
796 1248 : timeline_id,
797 1248 : tenant_shard_id,
798 1248 : key_range: key_range.clone(),
799 1248 : lsn,
800 1248 : tree: tree_builder,
801 1248 : blob_writer,
802 1248 : uncompressed_bytes: 0,
803 1248 : uncompressed_bytes_eligible: 0,
804 1248 : uncompressed_bytes_chosen: 0,
805 1248 : num_keys: 0,
806 1248 : #[cfg(feature = "testing")]
807 1248 : last_written_key: Key::MIN,
808 1248 : };
809 1248 :
810 1248 : Ok(writer)
811 1248 : }
812 :
813 : ///
814 : /// Write next value to the file.
815 : ///
816 : /// The page versions must be appended in blknum order.
817 : ///
818 111044 : async fn put_image(
819 111044 : &mut self,
820 111044 : key: Key,
821 111044 : img: Bytes,
822 111044 : ctx: &RequestContext,
823 111044 : ) -> anyhow::Result<()> {
824 111044 : ensure!(self.key_range.contains(&key));
825 111044 : let compression = self.conf.image_compression;
826 111044 : let uncompressed_len = img.len() as u64;
827 111044 : self.uncompressed_bytes += uncompressed_len;
828 111044 : self.num_keys += 1;
829 111044 : let (_img, res) = self
830 111044 : .blob_writer
831 111044 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
832 111044 : .await;
833 : // TODO: re-use the buffer for `img` further upstack
834 111044 : let (off, compression_info) = res?;
835 111044 : if compression_info.compressed_size.is_some() {
836 16004 : // The image has been considered for compression at least
837 16004 : self.uncompressed_bytes_eligible += uncompressed_len;
838 95040 : }
839 111044 : if compression_info.written_compressed {
840 0 : // The image has been compressed
841 0 : self.uncompressed_bytes_chosen += uncompressed_len;
842 111044 : }
843 :
844 111044 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
845 111044 : key.write_to_byte_slice(&mut keybuf);
846 111044 : self.tree.append(&keybuf, off)?;
847 :
848 : #[cfg(feature = "testing")]
849 111044 : {
850 111044 : self.last_written_key = key;
851 111044 : }
852 111044 :
853 111044 : Ok(())
854 111044 : }
855 :
856 : ///
857 : /// Finish writing the image layer.
858 : ///
859 760 : async fn finish(
860 760 : self,
861 760 : ctx: &RequestContext,
862 760 : end_key: Option<Key>,
863 760 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
864 760 : let temp_path = self.path.clone();
865 760 : let result = self.finish0(ctx, end_key).await;
866 760 : if let Err(ref e) = result {
867 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
868 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
869 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
870 0 : }
871 760 : }
872 760 : result
873 760 : }
874 :
875 : ///
876 : /// Finish writing the image layer.
877 : ///
878 760 : async fn finish0(
879 760 : self,
880 760 : ctx: &RequestContext,
881 760 : end_key: Option<Key>,
882 760 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
883 760 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
884 760 :
885 760 : // Calculate compression ratio
886 760 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
887 760 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
888 760 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
889 760 : .inc_by(self.uncompressed_bytes_eligible);
890 760 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
891 760 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
892 760 :
893 760 : let mut file = self.blob_writer.into_inner();
894 760 :
895 760 : // Write out the index
896 760 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
897 760 : .await?;
898 760 : let (index_root_blk, block_buf) = self.tree.finish()?;
899 1584 : for buf in block_buf.blocks {
900 824 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
901 824 : res?;
902 : }
903 :
904 760 : let final_key_range = if let Some(end_key) = end_key {
905 596 : self.key_range.start..end_key
906 : } else {
907 164 : self.key_range.clone()
908 : };
909 :
910 : // Fill in the summary on blk 0
911 760 : let summary = Summary {
912 760 : magic: IMAGE_FILE_MAGIC,
913 760 : format_version: STORAGE_FORMAT_VERSION,
914 760 : tenant_id: self.tenant_shard_id.tenant_id,
915 760 : timeline_id: self.timeline_id,
916 760 : key_range: final_key_range.clone(),
917 760 : lsn: self.lsn,
918 760 : index_start_blk,
919 760 : index_root_blk,
920 760 : };
921 760 :
922 760 : let mut buf = Vec::with_capacity(PAGE_SZ);
923 760 : // TODO: could use smallvec here but it's a pain with Slice<T>
924 760 : Summary::ser_into(&summary, &mut buf)?;
925 760 : file.seek(SeekFrom::Start(0)).await?;
926 760 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
927 760 : res?;
928 :
929 760 : let metadata = file
930 760 : .metadata()
931 760 : .await
932 760 : .context("get metadata to determine file size")?;
933 :
934 760 : let desc = PersistentLayerDesc::new_img(
935 760 : self.tenant_shard_id,
936 760 : self.timeline_id,
937 760 : final_key_range,
938 760 : self.lsn,
939 760 : metadata.len(),
940 760 : );
941 :
942 : #[cfg(feature = "testing")]
943 760 : if let Some(end_key) = end_key {
944 596 : assert!(
945 596 : self.last_written_key < end_key,
946 0 : "written key violates end_key range"
947 : );
948 164 : }
949 :
950 : // Note: Because we open the file in write-only mode, we cannot
951 : // reuse the same VirtualFile for reading later. That's why we don't
952 : // set inner.file here. The first read will have to re-open it.
953 :
954 : // fsync the file
955 760 : file.sync_all()
956 760 : .await
957 760 : .maybe_fatal_err("image_layer sync_all")?;
958 :
959 760 : trace!("created image layer {}", self.path);
960 :
961 760 : Ok((desc, self.path))
962 760 : }
963 : }
964 :
965 : /// A builder object for constructing a new image layer.
966 : ///
967 : /// Usage:
968 : ///
969 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
970 : ///
971 : /// 2. Write the contents by calling `put_page_image` for every key-value
972 : /// pair in the key range.
973 : ///
974 : /// 3. Call `finish`.
975 : ///
976 : /// # Note
977 : ///
978 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
979 : /// possible for the writer to drop before `finish` is actually called. So this
980 : /// could lead to odd temporary files in the directory, exhausting file system.
981 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
982 : /// implementation that cleans up the temporary file in failure. It's not
983 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
984 : /// out some fields, making it impossible to implement `Drop`.
985 : ///
986 : #[must_use]
987 : pub struct ImageLayerWriter {
988 : inner: Option<ImageLayerWriterInner>,
989 : }
990 :
991 : impl ImageLayerWriter {
992 : ///
993 : /// Start building a new image layer.
994 : ///
995 : #[allow(clippy::too_many_arguments)]
996 1248 : pub async fn new(
997 1248 : conf: &'static PageServerConf,
998 1248 : timeline_id: TimelineId,
999 1248 : tenant_shard_id: TenantShardId,
1000 1248 : key_range: &Range<Key>,
1001 1248 : lsn: Lsn,
1002 1248 : gate: &utils::sync::gate::Gate,
1003 1248 : cancel: CancellationToken,
1004 1248 : ctx: &RequestContext,
1005 1248 : ) -> anyhow::Result<ImageLayerWriter> {
1006 1248 : Ok(Self {
1007 1248 : inner: Some(
1008 1248 : ImageLayerWriterInner::new(
1009 1248 : conf,
1010 1248 : timeline_id,
1011 1248 : tenant_shard_id,
1012 1248 : key_range,
1013 1248 : lsn,
1014 1248 : gate,
1015 1248 : cancel,
1016 1248 : ctx,
1017 1248 : )
1018 1248 : .await?,
1019 : ),
1020 : })
1021 1248 : }
1022 :
1023 : ///
1024 : /// Write next value to the file.
1025 : ///
1026 : /// The page versions must be appended in blknum order.
1027 : ///
1028 111044 : pub async fn put_image(
1029 111044 : &mut self,
1030 111044 : key: Key,
1031 111044 : img: Bytes,
1032 111044 : ctx: &RequestContext,
1033 111044 : ) -> anyhow::Result<()> {
1034 111044 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
1035 111044 : }
1036 :
1037 : /// Estimated size of the image layer.
1038 17108 : pub(crate) fn estimated_size(&self) -> u64 {
1039 17108 : let inner = self.inner.as_ref().unwrap();
1040 17108 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
1041 17108 : }
1042 :
1043 17304 : pub(crate) fn num_keys(&self) -> usize {
1044 17304 : self.inner.as_ref().unwrap().num_keys
1045 17304 : }
1046 :
1047 : ///
1048 : /// Finish writing the image layer.
1049 : ///
1050 164 : pub(crate) async fn finish(
1051 164 : mut self,
1052 164 : ctx: &RequestContext,
1053 164 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1054 164 : self.inner.take().unwrap().finish(ctx, None).await
1055 164 : }
1056 :
1057 : /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
1058 596 : pub(super) async fn finish_with_end_key(
1059 596 : mut self,
1060 596 : end_key: Key,
1061 596 : ctx: &RequestContext,
1062 596 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1063 596 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
1064 596 : }
1065 : }
1066 :
1067 : impl Drop for ImageLayerWriter {
1068 1248 : fn drop(&mut self) {
1069 1248 : if let Some(inner) = self.inner.take() {
1070 488 : inner.blob_writer.into_inner().remove();
1071 760 : }
1072 1248 : }
1073 : }
1074 :
1075 : pub struct ImageLayerIterator<'a> {
1076 : image_layer: &'a ImageLayerInner,
1077 : ctx: &'a RequestContext,
1078 : planner: StreamingVectoredReadPlanner,
1079 : index_iter: DiskBtreeIterator<'a>,
1080 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1081 : is_end: bool,
1082 : }
1083 :
1084 : impl ImageLayerIterator<'_> {
1085 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1086 0 : self.image_layer.layer_dbg_info()
1087 0 : }
1088 :
1089 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1090 38260 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1091 38260 : assert!(self.key_values_batch.is_empty());
1092 38260 : assert!(!self.is_end);
1093 :
1094 38260 : let plan = loop {
1095 58148 : if let Some(res) = self.index_iter.next().await {
1096 57952 : let (raw_key, offset) = res?;
1097 57952 : if let Some(batch_plan) = self.planner.handle(
1098 57952 : Key::from_slice(&raw_key[..KEY_SIZE]),
1099 57952 : self.image_layer.lsn,
1100 57952 : offset,
1101 57952 : true,
1102 57952 : ) {
1103 38064 : break batch_plan;
1104 19888 : }
1105 : } else {
1106 196 : self.is_end = true;
1107 196 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1108 196 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1109 196 : break item;
1110 : } else {
1111 0 : return Ok(()); // TODO: a test case on empty iterator
1112 : }
1113 : }
1114 : };
1115 38260 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1116 38260 : let mut next_batch = std::collections::VecDeque::new();
1117 38260 : let buf_size = plan.size();
1118 38260 : let buf = IoBufferMut::with_capacity(buf_size);
1119 38260 : let blobs_buf = vectored_blob_reader
1120 38260 : .read_blobs(&plan, buf, self.ctx)
1121 38260 : .await?;
1122 38260 : let view = BufView::new_slice(&blobs_buf.buf);
1123 57896 : for meta in blobs_buf.blobs.iter() {
1124 57896 : let img_buf = meta.read(&view).await?;
1125 57896 : next_batch.push_back((
1126 57896 : meta.meta.key,
1127 57896 : self.image_layer.lsn,
1128 57896 : Value::Image(img_buf.into_bytes()),
1129 57896 : ));
1130 : }
1131 38260 : self.key_values_batch = next_batch;
1132 38260 : Ok(())
1133 38260 : }
1134 :
1135 57656 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1136 57656 : if self.key_values_batch.is_empty() {
1137 38416 : if self.is_end {
1138 324 : return Ok(None);
1139 38092 : }
1140 38092 : self.next_batch().await?;
1141 19240 : }
1142 57332 : Ok(Some(
1143 57332 : self.key_values_batch
1144 57332 : .pop_front()
1145 57332 : .expect("should not be empty"),
1146 57332 : ))
1147 57656 : }
1148 : }
1149 :
1150 : #[cfg(test)]
1151 : mod test {
1152 : use std::sync::Arc;
1153 : use std::time::Duration;
1154 :
1155 : use bytes::Bytes;
1156 : use itertools::Itertools;
1157 : use pageserver_api::key::Key;
1158 : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
1159 : use pageserver_api::value::Value;
1160 : use utils::generation::Generation;
1161 : use utils::id::{TenantId, TimelineId};
1162 : use utils::lsn::Lsn;
1163 :
1164 : use super::{ImageLayerIterator, ImageLayerWriter};
1165 : use crate::DEFAULT_PG_VERSION;
1166 : use crate::context::RequestContext;
1167 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1168 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1169 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1170 : use crate::tenant::{Tenant, Timeline};
1171 :
1172 : #[tokio::test]
1173 4 : async fn image_layer_rewrite() {
1174 4 : let tenant_conf = pageserver_api::models::TenantConfig {
1175 4 : gc_period: Some(Duration::ZERO),
1176 4 : compaction_period: Some(Duration::ZERO),
1177 4 : ..Default::default()
1178 4 : };
1179 4 : let tenant_id = TenantId::generate();
1180 4 : let mut gen_ = Generation::new(0xdead0001);
1181 20 : let mut get_next_gen = || {
1182 20 : let ret = gen_;
1183 20 : gen_ = gen_.next();
1184 20 : ret
1185 20 : };
1186 4 : // The LSN at which we will create an image layer to filter
1187 4 : let lsn = Lsn(0xdeadbeef0000);
1188 4 : let timeline_id = TimelineId::generate();
1189 4 :
1190 4 : //
1191 4 : // Create an unsharded parent with a layer.
1192 4 : //
1193 4 :
1194 4 : let harness = TenantHarness::create_custom(
1195 4 : "test_image_layer_rewrite--parent",
1196 4 : tenant_conf.clone(),
1197 4 : tenant_id,
1198 4 : ShardIdentity::unsharded(),
1199 4 : get_next_gen(),
1200 4 : )
1201 4 : .await
1202 4 : .unwrap();
1203 4 : let (tenant, ctx) = harness.load().await;
1204 4 : let timeline = tenant
1205 4 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1206 4 : .await
1207 4 : .unwrap();
1208 4 :
1209 4 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1210 4 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1211 4 : let input_end = Key::from_hex("000000067f00000001000000ae0000002000").unwrap();
1212 4 : let range = input_start..input_end;
1213 4 :
1214 4 : // Build an image layer to filter
1215 4 : let resident = {
1216 4 : let mut writer = ImageLayerWriter::new(
1217 4 : harness.conf,
1218 4 : timeline_id,
1219 4 : harness.tenant_shard_id,
1220 4 : &range,
1221 4 : lsn,
1222 4 : &timeline.gate,
1223 4 : timeline.cancel.clone(),
1224 4 : &ctx,
1225 4 : )
1226 4 : .await
1227 4 : .unwrap();
1228 4 :
1229 4 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1230 4 : let mut key = range.start;
1231 32772 : while key < range.end {
1232 32768 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1233 32768 :
1234 32768 : key = key.next();
1235 4 : }
1236 4 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1237 4 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1238 4 : };
1239 4 : let original_size = resident.metadata().file_size;
1240 4 :
1241 4 : //
1242 4 : // Create child shards and do the rewrite, exercising filter().
1243 4 : // TODO: abstraction in TenantHarness for splits.
1244 4 : //
1245 4 :
1246 4 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1247 4 : // range, middle of key range.
1248 4 : let shard_count = ShardCount::new(4);
1249 16 : for shard_number in 0..shard_count.count() {
1250 4 : //
1251 4 : // mimic the shard split
1252 4 : //
1253 16 : let shard_identity = ShardIdentity::new(
1254 16 : ShardNumber(shard_number),
1255 16 : shard_count,
1256 16 : ShardStripeSize(0x800),
1257 16 : )
1258 16 : .unwrap();
1259 16 : let harness = TenantHarness::create_custom(
1260 16 : Box::leak(Box::new(format!(
1261 16 : "test_image_layer_rewrite--child{}",
1262 16 : shard_identity.shard_slug()
1263 16 : ))),
1264 16 : tenant_conf.clone(),
1265 16 : tenant_id,
1266 16 : shard_identity,
1267 16 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1268 16 : // But here, all we care about is that the gen number is unique.
1269 16 : get_next_gen(),
1270 16 : )
1271 16 : .await
1272 16 : .unwrap();
1273 16 : let (tenant, ctx) = harness.load().await;
1274 16 : let timeline = tenant
1275 16 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1276 16 : .await
1277 16 : .unwrap();
1278 4 :
1279 4 : //
1280 4 : // use filter() and make assertions
1281 4 : //
1282 4 :
1283 16 : let mut filtered_writer = ImageLayerWriter::new(
1284 16 : harness.conf,
1285 16 : timeline_id,
1286 16 : harness.tenant_shard_id,
1287 16 : &range,
1288 16 : lsn,
1289 16 : &timeline.gate,
1290 16 : timeline.cancel.clone(),
1291 16 : &ctx,
1292 16 : )
1293 16 : .await
1294 16 : .unwrap();
1295 4 :
1296 16 : let wrote_keys = resident
1297 16 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1298 16 : .await
1299 16 : .unwrap();
1300 16 : let replacement = if wrote_keys > 0 {
1301 12 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1302 12 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1303 12 : Some(resident)
1304 4 : } else {
1305 4 : None
1306 4 : };
1307 4 :
1308 4 : // This exact size and those below will need updating as/when the layer encoding changes, but
1309 4 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1310 16 : assert_eq!(original_size, 122880);
1311 4 :
1312 16 : match shard_number {
1313 4 : 0 => {
1314 4 : // We should have written out just one stripe for our shard identity
1315 4 : assert_eq!(wrote_keys, 0x800);
1316 4 : let replacement = replacement.unwrap();
1317 4 :
1318 4 : // We should have dropped some of the data
1319 4 : assert!(replacement.metadata().file_size < original_size);
1320 4 : assert!(replacement.metadata().file_size > 0);
1321 4 :
1322 4 : // Assert that we dropped ~3/4 of the data.
1323 4 : assert_eq!(replacement.metadata().file_size, 49152);
1324 4 : }
1325 4 : 1 => {
1326 4 : // Shard 1 has no keys in our input range
1327 4 : assert_eq!(wrote_keys, 0x0);
1328 4 : assert!(replacement.is_none());
1329 4 : }
1330 4 : 2 => {
1331 4 : // Shard 2 has one stripes in the input range
1332 4 : assert_eq!(wrote_keys, 0x800);
1333 4 : let replacement = replacement.unwrap();
1334 4 : assert!(replacement.metadata().file_size < original_size);
1335 4 : assert!(replacement.metadata().file_size > 0);
1336 4 : assert_eq!(replacement.metadata().file_size, 49152);
1337 4 : }
1338 4 : 3 => {
1339 4 : // Shard 3 has two stripes in the input range
1340 4 : assert_eq!(wrote_keys, 0x1000);
1341 4 : let replacement = replacement.unwrap();
1342 4 : assert!(replacement.metadata().file_size < original_size);
1343 4 : assert!(replacement.metadata().file_size > 0);
1344 4 : assert_eq!(replacement.metadata().file_size, 73728);
1345 4 : }
1346 4 : _ => unreachable!(),
1347 4 : }
1348 4 : }
1349 4 : }
1350 :
1351 4 : async fn produce_image_layer(
1352 4 : tenant: &Tenant,
1353 4 : tline: &Arc<Timeline>,
1354 4 : mut images: Vec<(Key, Bytes)>,
1355 4 : lsn: Lsn,
1356 4 : ctx: &RequestContext,
1357 4 : ) -> anyhow::Result<ResidentLayer> {
1358 4 : images.sort();
1359 4 : let (key_start, _) = images.first().unwrap();
1360 4 : let (key_last, _) = images.last().unwrap();
1361 4 : let key_end = key_last.next();
1362 4 : let key_range = *key_start..key_end;
1363 4 : let mut writer = ImageLayerWriter::new(
1364 4 : tenant.conf,
1365 4 : tline.timeline_id,
1366 4 : tenant.tenant_shard_id,
1367 4 : &key_range,
1368 4 : lsn,
1369 4 : &tline.gate,
1370 4 : tline.cancel.clone(),
1371 4 : ctx,
1372 4 : )
1373 4 : .await?;
1374 :
1375 4004 : for (key, img) in images {
1376 4000 : writer.put_image(key, img, ctx).await?;
1377 : }
1378 4 : let (desc, path) = writer.finish(ctx).await?;
1379 4 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1380 :
1381 4 : Ok::<_, anyhow::Error>(img_layer)
1382 4 : }
1383 :
1384 56 : async fn assert_img_iter_equal(
1385 56 : img_iter: &mut ImageLayerIterator<'_>,
1386 56 : expect: &[(Key, Bytes)],
1387 56 : expect_lsn: Lsn,
1388 56 : ) {
1389 56 : let mut expect_iter = expect.iter();
1390 : loop {
1391 56056 : let o1 = img_iter.next().await.unwrap();
1392 56056 : let o2 = expect_iter.next();
1393 56056 : match (o1, o2) {
1394 56 : (None, None) => break,
1395 56000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1396 56000 : let Value::Image(i1) = v1 else {
1397 0 : panic!("expect Value::Image")
1398 : };
1399 56000 : assert_eq!(&k1, k2);
1400 56000 : assert_eq!(l1, expect_lsn);
1401 56000 : assert_eq!(&i1, i2);
1402 : }
1403 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1404 : }
1405 : }
1406 56 : }
1407 :
1408 : #[tokio::test]
1409 4 : async fn image_layer_iterator() {
1410 4 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1411 4 : let (tenant, ctx) = harness.load().await;
1412 4 :
1413 4 : let tline = tenant
1414 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1415 4 : .await
1416 4 : .unwrap();
1417 4 :
1418 4000 : fn get_key(id: u32) -> Key {
1419 4000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1420 4000 : key.field6 = id;
1421 4000 : key
1422 4000 : }
1423 4 : const N: usize = 1000;
1424 4 : let test_imgs = (0..N)
1425 4000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1426 4 : .collect_vec();
1427 4 : let resident_layer =
1428 4 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1429 4 : .await
1430 4 : .unwrap();
1431 4 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1432 12 : for max_read_size in [1, 1024] {
1433 64 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1434 56 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1435 56 : // Test if the batch size is correctly determined
1436 56 : let mut iter = img_layer.iter(&ctx);
1437 56 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1438 56 : let mut num_items = 0;
1439 224 : for _ in 0..3 {
1440 168 : iter.next_batch().await.unwrap();
1441 168 : num_items += iter.key_values_batch.len();
1442 168 : if max_read_size == 1 {
1443 4 : // every key should be a batch b/c the value is larger than max_read_size
1444 84 : assert_eq!(iter.key_values_batch.len(), 1);
1445 4 : } else {
1446 84 : assert!(iter.key_values_batch.len() <= batch_size);
1447 4 : }
1448 168 : if num_items >= N {
1449 4 : break;
1450 168 : }
1451 168 : iter.key_values_batch.clear();
1452 4 : }
1453 4 : // Test if the result is correct
1454 56 : let mut iter = img_layer.iter(&ctx);
1455 56 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1456 56 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1457 4 : }
1458 4 : }
1459 4 : }
1460 : }
|