Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use crate::config::PageServerConf;
29 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
30 : use crate::page_cache::{self, FileId, PAGE_SZ};
31 : use crate::repository::{Key, Value, KEY_SIZE};
32 : use crate::tenant::blob_io::BlobWriter;
33 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
34 : use crate::tenant::disk_btree::{
35 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
36 : };
37 : use crate::tenant::timeline::GetVectoredError;
38 : use crate::tenant::vectored_blob_io::{
39 : BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
40 : };
41 : use crate::tenant::PageReconstructError;
42 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
45 : use anyhow::{anyhow, bail, ensure, Context, Result};
46 : use bytes::{Bytes, BytesMut};
47 : use camino::{Utf8Path, Utf8PathBuf};
48 : use hex;
49 : use itertools::Itertools;
50 : use pageserver_api::config::MaxVectoredReadBytes;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
53 : use rand::{distributions::Alphanumeric, Rng};
54 : use serde::{Deserialize, Serialize};
55 : use std::collections::VecDeque;
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::prelude::FileExt;
60 : use std::str::FromStr;
61 : use tokio::sync::OnceCell;
62 : use tokio_stream::StreamExt;
63 : use tracing::*;
64 :
65 : use utils::{
66 : bin_ser::BeSer,
67 : id::{TenantId, TimelineId},
68 : lsn::Lsn,
69 : };
70 :
71 : use super::layer_name::ImageLayerName;
72 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
73 :
74 : ///
75 : /// Header stored in the beginning of the file
76 : ///
77 : /// After this comes the 'values' part, starting on block 1. After that,
78 : /// the 'index' starts at the block indicated by 'index_start_blk'
79 : ///
80 318 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
81 : pub struct Summary {
82 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
83 : pub magic: u16,
84 : pub format_version: u16,
85 :
86 : pub tenant_id: TenantId,
87 : pub timeline_id: TimelineId,
88 : pub key_range: Range<Key>,
89 : pub lsn: Lsn,
90 :
91 : /// Block number where the 'index' part of the file begins.
92 : pub index_start_blk: u32,
93 : /// Block within the 'index', where the B-tree root page is stored
94 : pub index_root_blk: u32,
95 : // the 'values' part starts after the summary header, on block 1.
96 : }
97 :
98 : impl From<&ImageLayer> for Summary {
99 0 : fn from(layer: &ImageLayer) -> Self {
100 0 : Self::expected(
101 0 : layer.desc.tenant_shard_id.tenant_id,
102 0 : layer.desc.timeline_id,
103 0 : layer.desc.key_range.clone(),
104 0 : layer.lsn,
105 0 : )
106 0 : }
107 : }
108 :
109 : impl Summary {
110 318 : pub(super) fn expected(
111 318 : tenant_id: TenantId,
112 318 : timeline_id: TimelineId,
113 318 : key_range: Range<Key>,
114 318 : lsn: Lsn,
115 318 : ) -> Self {
116 318 : Self {
117 318 : magic: IMAGE_FILE_MAGIC,
118 318 : format_version: STORAGE_FORMAT_VERSION,
119 318 : tenant_id,
120 318 : timeline_id,
121 318 : key_range,
122 318 : lsn,
123 318 :
124 318 : index_start_blk: 0,
125 318 : index_root_blk: 0,
126 318 : }
127 318 : }
128 : }
129 :
130 : /// This is used only from `pagectl`. Within pageserver, all layers are
131 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
132 : pub struct ImageLayer {
133 : path: Utf8PathBuf,
134 : pub desc: PersistentLayerDesc,
135 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
136 : pub lsn: Lsn,
137 : inner: OnceCell<ImageLayerInner>,
138 : }
139 :
140 : impl std::fmt::Debug for ImageLayer {
141 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 : use super::RangeDisplayDebug;
143 :
144 0 : f.debug_struct("ImageLayer")
145 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
146 0 : .field("file_size", &self.desc.file_size)
147 0 : .field("lsn", &self.lsn)
148 0 : .field("inner", &self.inner)
149 0 : .finish()
150 0 : }
151 : }
152 :
153 : /// ImageLayer is the in-memory data structure associated with an on-disk image
154 : /// file.
155 : pub struct ImageLayerInner {
156 : // values copied from summary
157 : index_start_blk: u32,
158 : index_root_blk: u32,
159 :
160 : key_range: Range<Key>,
161 : lsn: Lsn,
162 :
163 : file: VirtualFile,
164 : file_id: FileId,
165 :
166 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
167 : }
168 :
169 : impl ImageLayerInner {
170 0 : pub(crate) fn layer_dbg_info(&self) -> String {
171 0 : format!(
172 0 : "image {}..{} {}",
173 0 : self.key_range().start,
174 0 : self.key_range().end,
175 0 : self.lsn()
176 0 : )
177 0 : }
178 : }
179 :
180 : impl std::fmt::Debug for ImageLayerInner {
181 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 0 : f.debug_struct("ImageLayerInner")
183 0 : .field("index_start_blk", &self.index_start_blk)
184 0 : .field("index_root_blk", &self.index_root_blk)
185 0 : .finish()
186 0 : }
187 : }
188 :
189 : impl ImageLayerInner {
190 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
191 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
192 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
193 0 : self.index_start_blk,
194 0 : self.index_root_blk,
195 0 : block_reader,
196 0 : );
197 0 :
198 0 : tree_reader.dump().await?;
199 :
200 0 : tree_reader
201 0 : .visit(
202 0 : &[0u8; KEY_SIZE],
203 0 : VisitDirection::Forwards,
204 0 : |key, value| {
205 0 : println!("key: {} offset {}", hex::encode(key), value);
206 0 : true
207 0 : },
208 0 : ctx,
209 0 : )
210 0 : .await?;
211 :
212 0 : Ok(())
213 0 : }
214 : }
215 :
216 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
217 : impl std::fmt::Display for ImageLayer {
218 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 0 : write!(f, "{}", self.layer_desc().short_id())
220 0 : }
221 : }
222 :
223 : impl AsLayerDesc for ImageLayer {
224 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
225 0 : &self.desc
226 0 : }
227 : }
228 :
229 : impl ImageLayer {
230 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
231 0 : self.desc.dump();
232 0 :
233 0 : if !verbose {
234 0 : return Ok(());
235 0 : }
236 :
237 0 : let inner = self.load(ctx).await?;
238 :
239 0 : inner.dump(ctx).await?;
240 :
241 0 : Ok(())
242 0 : }
243 :
244 1506 : fn temp_path_for(
245 1506 : conf: &PageServerConf,
246 1506 : timeline_id: TimelineId,
247 1506 : tenant_shard_id: TenantShardId,
248 1506 : fname: &ImageLayerName,
249 1506 : ) -> Utf8PathBuf {
250 1506 : let rand_string: String = rand::thread_rng()
251 1506 : .sample_iter(&Alphanumeric)
252 1506 : .take(8)
253 1506 : .map(char::from)
254 1506 : .collect();
255 1506 :
256 1506 : conf.timeline_path(&tenant_shard_id, &timeline_id)
257 1506 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
258 1506 : }
259 :
260 : ///
261 : /// Open the underlying file and read the metadata into memory, if it's
262 : /// not loaded already.
263 : ///
264 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
265 0 : self.inner
266 0 : .get_or_try_init(|| self.load_inner(ctx))
267 0 : .await
268 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
269 0 : }
270 :
271 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
272 0 : let path = self.path();
273 :
274 0 : let loaded =
275 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
276 :
277 : // not production code
278 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
279 0 : let expected_layer_name = self.layer_desc().layer_name();
280 0 :
281 0 : if actual_layer_name != expected_layer_name {
282 0 : println!("warning: filename does not match what is expected from in-file summary");
283 0 : println!("actual: {:?}", actual_layer_name.to_string());
284 0 : println!("expected: {:?}", expected_layer_name.to_string());
285 0 : }
286 :
287 0 : Ok(loaded)
288 0 : }
289 :
290 : /// Create an ImageLayer struct representing an existing file on disk.
291 : ///
292 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
293 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
294 0 : let mut summary_buf = vec![0; PAGE_SZ];
295 0 : file.read_exact_at(&mut summary_buf, 0)?;
296 0 : let summary = Summary::des_prefix(&summary_buf)?;
297 0 : let metadata = file
298 0 : .metadata()
299 0 : .context("get file metadata to determine size")?;
300 :
301 : // This function is never used for constructing layers in a running pageserver,
302 : // so it does not need an accurate TenantShardId.
303 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
304 0 :
305 0 : Ok(ImageLayer {
306 0 : path: path.to_path_buf(),
307 0 : desc: PersistentLayerDesc::new_img(
308 0 : tenant_shard_id,
309 0 : summary.timeline_id,
310 0 : summary.key_range,
311 0 : summary.lsn,
312 0 : metadata.len(),
313 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
314 0 : lsn: summary.lsn,
315 0 : inner: OnceCell::new(),
316 0 : })
317 0 : }
318 :
319 0 : fn path(&self) -> Utf8PathBuf {
320 0 : self.path.clone()
321 0 : }
322 : }
323 :
324 0 : #[derive(thiserror::Error, Debug)]
325 : pub enum RewriteSummaryError {
326 : #[error("magic mismatch")]
327 : MagicMismatch,
328 : #[error(transparent)]
329 : Other(#[from] anyhow::Error),
330 : }
331 :
332 : impl From<std::io::Error> for RewriteSummaryError {
333 0 : fn from(e: std::io::Error) -> Self {
334 0 : Self::Other(anyhow::anyhow!(e))
335 0 : }
336 : }
337 :
338 : impl ImageLayer {
339 0 : pub async fn rewrite_summary<F>(
340 0 : path: &Utf8Path,
341 0 : rewrite: F,
342 0 : ctx: &RequestContext,
343 0 : ) -> Result<(), RewriteSummaryError>
344 0 : where
345 0 : F: Fn(Summary) -> Summary,
346 0 : {
347 0 : let mut file = VirtualFile::open_with_options(
348 0 : path,
349 0 : virtual_file::OpenOptions::new().read(true).write(true),
350 0 : ctx,
351 0 : )
352 0 : .await
353 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
354 0 : let file_id = page_cache::next_file_id();
355 0 : let block_reader = FileBlockReader::new(&file, file_id);
356 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
357 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
358 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
359 0 : return Err(RewriteSummaryError::MagicMismatch);
360 0 : }
361 0 :
362 0 : let new_summary = rewrite(actual_summary);
363 0 :
364 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
365 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
366 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
367 0 : file.seek(SeekFrom::Start(0)).await?;
368 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
369 0 : res?;
370 0 : Ok(())
371 0 : }
372 : }
373 :
374 : impl ImageLayerInner {
375 108 : pub(crate) fn key_range(&self) -> &Range<Key> {
376 108 : &self.key_range
377 108 : }
378 :
379 108 : pub(crate) fn lsn(&self) -> Lsn {
380 108 : self.lsn
381 108 : }
382 :
383 318 : pub(super) async fn load(
384 318 : path: &Utf8Path,
385 318 : lsn: Lsn,
386 318 : summary: Option<Summary>,
387 318 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
388 318 : ctx: &RequestContext,
389 318 : ) -> anyhow::Result<Self> {
390 318 : let file = VirtualFile::open(path, ctx)
391 159 : .await
392 318 : .context("open layer file")?;
393 318 : let file_id = page_cache::next_file_id();
394 318 : let block_reader = FileBlockReader::new(&file, file_id);
395 318 : let summary_blk = block_reader
396 318 : .read_blk(0, ctx)
397 162 : .await
398 318 : .context("read first block")?;
399 :
400 : // length is the only way how this could fail, so it's not actually likely at all unless
401 : // read_blk returns wrong sized block.
402 : //
403 : // TODO: confirm and make this into assertion
404 318 : let actual_summary =
405 318 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
406 :
407 318 : if let Some(mut expected_summary) = summary {
408 : // production code path
409 318 : expected_summary.index_start_blk = actual_summary.index_start_blk;
410 318 : expected_summary.index_root_blk = actual_summary.index_root_blk;
411 318 : // mask out the timeline_id, but still require the layers to be from the same tenant
412 318 : expected_summary.timeline_id = actual_summary.timeline_id;
413 318 :
414 318 : if actual_summary != expected_summary {
415 0 : bail!(
416 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
417 0 : actual_summary,
418 0 : expected_summary
419 0 : );
420 318 : }
421 0 : }
422 :
423 318 : Ok(ImageLayerInner {
424 318 : index_start_blk: actual_summary.index_start_blk,
425 318 : index_root_blk: actual_summary.index_root_blk,
426 318 : lsn,
427 318 : file,
428 318 : file_id,
429 318 : max_vectored_read_bytes,
430 318 : key_range: actual_summary.key_range,
431 318 : })
432 318 : }
433 :
434 : // Look up the keys in the provided keyspace and update
435 : // the reconstruct state with whatever is found.
436 24102 : pub(super) async fn get_values_reconstruct_data(
437 24102 : &self,
438 24102 : keyspace: KeySpace,
439 24102 : reconstruct_state: &mut ValuesReconstructState,
440 24102 : ctx: &RequestContext,
441 24102 : ) -> Result<(), GetVectoredError> {
442 24102 : let reads = self
443 24102 : .plan_reads(keyspace, None, ctx)
444 2141 : .await
445 24102 : .map_err(GetVectoredError::Other)?;
446 :
447 24102 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
448 14115 : .await;
449 :
450 24102 : reconstruct_state.on_image_layer_visited(&self.key_range);
451 24102 :
452 24102 : Ok(())
453 24102 : }
454 :
455 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
456 : /// and the keys in this layer.
457 : ///
458 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
459 : /// this shard.
460 24126 : async fn plan_reads(
461 24126 : &self,
462 24126 : keyspace: KeySpace,
463 24126 : shard_identity: Option<&ShardIdentity>,
464 24126 : ctx: &RequestContext,
465 24126 : ) -> anyhow::Result<Vec<VectoredRead>> {
466 24126 : let mut planner = VectoredReadPlanner::new(
467 24126 : self.max_vectored_read_bytes
468 24126 : .expect("Layer is loaded with max vectored bytes config")
469 24126 : .0
470 24126 : .into(),
471 24126 : );
472 24126 :
473 24126 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
474 24126 : let tree_reader =
475 24126 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
476 24126 :
477 24126 : let ctx = RequestContextBuilder::extend(ctx)
478 24126 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
479 24126 : .build();
480 :
481 89149 : for range in keyspace.ranges.iter() {
482 89149 : let mut range_end_handled = false;
483 89149 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
484 89149 : range.start.write_to_byte_slice(&mut search_key);
485 89149 :
486 89149 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
487 89149 : let mut index_stream = std::pin::pin!(index_stream);
488 :
489 3289975 : while let Some(index_entry) = index_stream.next().await {
490 3289362 : let (raw_key, offset) = index_entry?;
491 :
492 3289362 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
493 3289362 : assert!(key >= range.start);
494 :
495 3289362 : let flag = if let Some(shard_identity) = shard_identity {
496 3145728 : if shard_identity.is_key_disposable(&key) {
497 2359296 : BlobFlag::Ignore
498 : } else {
499 786432 : BlobFlag::None
500 : }
501 : } else {
502 143634 : BlobFlag::None
503 : };
504 :
505 3289362 : if key >= range.end {
506 88536 : planner.handle_range_end(offset);
507 88536 : range_end_handled = true;
508 88536 : break;
509 3200826 : } else {
510 3200826 : planner.handle(key, self.lsn, offset, flag);
511 3200826 : }
512 : }
513 :
514 89149 : if !range_end_handled {
515 613 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
516 613 : planner.handle_range_end(payload_end);
517 88536 : }
518 : }
519 :
520 24126 : Ok(planner.finish())
521 24126 : }
522 :
523 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
524 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
525 24 : pub(super) async fn filter(
526 24 : &self,
527 24 : shard_identity: &ShardIdentity,
528 24 : writer: &mut ImageLayerWriter,
529 24 : ctx: &RequestContext,
530 24 : ) -> anyhow::Result<usize> {
531 : // Fragment the range into the regions owned by this ShardIdentity
532 24 : let plan = self
533 24 : .plan_reads(
534 24 : KeySpace {
535 24 : // If asked for the total key space, plan_reads will give us all the keys in the layer
536 24 : ranges: vec![Key::MIN..Key::MAX],
537 24 : },
538 24 : Some(shard_identity),
539 24 : ctx,
540 24 : )
541 1407 : .await?;
542 :
543 24 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
544 24 : let mut key_count = 0;
545 48 : for read in plan.into_iter() {
546 48 : let buf_size = read.size();
547 48 :
548 48 : let buf = BytesMut::with_capacity(buf_size);
549 48 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
550 :
551 48 : let frozen_buf = blobs_buf.buf.freeze();
552 :
553 786432 : for meta in blobs_buf.blobs.iter() {
554 786432 : let img_buf = frozen_buf.slice(meta.start..meta.end);
555 786432 :
556 786432 : key_count += 1;
557 786432 : writer
558 786432 : .put_image(meta.meta.key, img_buf, ctx)
559 798720 : .await
560 786432 : .context(format!("Storing key {}", meta.meta.key))?;
561 : }
562 : }
563 :
564 24 : Ok(key_count)
565 24 : }
566 :
567 24102 : async fn do_reads_and_update_state(
568 24102 : &self,
569 24102 : reads: Vec<VectoredRead>,
570 24102 : reconstruct_state: &mut ValuesReconstructState,
571 24102 : ctx: &RequestContext,
572 24102 : ) {
573 24102 : let max_vectored_read_bytes = self
574 24102 : .max_vectored_read_bytes
575 24102 : .expect("Layer is loaded with max vectored bytes config")
576 24102 : .0
577 24102 : .into();
578 24102 :
579 24102 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
580 27655 : for read in reads.into_iter() {
581 27655 : let buf_size = read.size();
582 27655 :
583 27655 : if buf_size > max_vectored_read_bytes {
584 : // If the read is oversized, it should only contain one key.
585 0 : let offenders = read
586 0 : .blobs_at
587 0 : .as_slice()
588 0 : .iter()
589 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
590 0 : .join(", ");
591 0 : tracing::warn!(
592 0 : "Oversized vectored read ({} > {}) for keys {}",
593 : buf_size,
594 : max_vectored_read_bytes,
595 : offenders
596 : );
597 27655 : }
598 :
599 27655 : let buf = BytesMut::with_capacity(buf_size);
600 27655 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
601 :
602 27655 : match res {
603 27655 : Ok(blobs_buf) => {
604 27655 : let frozen_buf = blobs_buf.buf.freeze();
605 :
606 55098 : for meta in blobs_buf.blobs.iter() {
607 55098 : let img_buf = frozen_buf.slice(meta.start..meta.end);
608 55098 : reconstruct_state.update_key(
609 55098 : &meta.meta.key,
610 55098 : self.lsn,
611 55098 : Value::Image(img_buf),
612 55098 : );
613 55098 : }
614 : }
615 0 : Err(err) => {
616 0 : let kind = err.kind();
617 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
618 0 : reconstruct_state.on_key_error(
619 0 : blob_meta.key,
620 0 : PageReconstructError::from(anyhow!(
621 0 : "Failed to read blobs from virtual file {}: {}",
622 0 : self.file.path,
623 0 : kind
624 0 : )),
625 0 : );
626 0 : }
627 : }
628 : };
629 : }
630 24102 : }
631 :
632 276 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
633 276 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
634 276 : let tree_reader =
635 276 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
636 276 : ImageLayerIterator {
637 276 : image_layer: self,
638 276 : ctx,
639 276 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
640 276 : key_values_batch: VecDeque::new(),
641 276 : is_end: false,
642 276 : planner: StreamingVectoredReadPlanner::new(
643 276 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
644 276 : 1024, // The default value. Unit tests might use a different value
645 276 : ),
646 276 : }
647 276 : }
648 : }
649 :
650 : /// A builder object for constructing a new image layer.
651 : ///
652 : /// Usage:
653 : ///
654 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
655 : ///
656 : /// 2. Write the contents by calling `put_page_image` for every key-value
657 : /// pair in the key range.
658 : ///
659 : /// 3. Call `finish`.
660 : ///
661 : struct ImageLayerWriterInner {
662 : conf: &'static PageServerConf,
663 : path: Utf8PathBuf,
664 : timeline_id: TimelineId,
665 : tenant_shard_id: TenantShardId,
666 : key_range: Range<Key>,
667 : lsn: Lsn,
668 :
669 : // Total uncompressed bytes passed into put_image
670 : uncompressed_bytes: u64,
671 :
672 : // Like `uncompressed_bytes`,
673 : // but only of images we might consider for compression
674 : uncompressed_bytes_eligible: u64,
675 :
676 : // Like `uncompressed_bytes`, but only of images
677 : // where we have chosen their compressed form
678 : uncompressed_bytes_chosen: u64,
679 :
680 : // Number of keys in the layer.
681 : num_keys: usize,
682 :
683 : blob_writer: BlobWriter<false>,
684 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
685 :
686 : #[cfg(feature = "testing")]
687 : last_written_key: Key,
688 : }
689 :
690 : impl ImageLayerWriterInner {
691 : ///
692 : /// Start building a new image layer.
693 : ///
694 1506 : async fn new(
695 1506 : conf: &'static PageServerConf,
696 1506 : timeline_id: TimelineId,
697 1506 : tenant_shard_id: TenantShardId,
698 1506 : key_range: &Range<Key>,
699 1506 : lsn: Lsn,
700 1506 : ctx: &RequestContext,
701 1506 : ) -> anyhow::Result<Self> {
702 1506 : // Create the file initially with a temporary filename.
703 1506 : // We'll atomically rename it to the final name when we're done.
704 1506 : let path = ImageLayer::temp_path_for(
705 1506 : conf,
706 1506 : timeline_id,
707 1506 : tenant_shard_id,
708 1506 : &ImageLayerName {
709 1506 : key_range: key_range.clone(),
710 1506 : lsn,
711 1506 : },
712 1506 : );
713 1506 : trace!("creating image layer {}", path);
714 1506 : let mut file = {
715 1506 : VirtualFile::open_with_options(
716 1506 : &path,
717 1506 : virtual_file::OpenOptions::new()
718 1506 : .write(true)
719 1506 : .create_new(true),
720 1506 : ctx,
721 1506 : )
722 1008 : .await?
723 : };
724 : // make room for the header block
725 1506 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
726 1506 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
727 1506 :
728 1506 : // Initialize the b-tree index builder
729 1506 : let block_buf = BlockBuf::new();
730 1506 : let tree_builder = DiskBtreeBuilder::new(block_buf);
731 1506 :
732 1506 : let writer = Self {
733 1506 : conf,
734 1506 : path,
735 1506 : timeline_id,
736 1506 : tenant_shard_id,
737 1506 : key_range: key_range.clone(),
738 1506 : lsn,
739 1506 : tree: tree_builder,
740 1506 : blob_writer,
741 1506 : uncompressed_bytes: 0,
742 1506 : uncompressed_bytes_eligible: 0,
743 1506 : uncompressed_bytes_chosen: 0,
744 1506 : num_keys: 0,
745 1506 : #[cfg(feature = "testing")]
746 1506 : last_written_key: Key::MIN,
747 1506 : };
748 1506 :
749 1506 : Ok(writer)
750 1506 : }
751 :
752 : ///
753 : /// Write next value to the file.
754 : ///
755 : /// The page versions must be appended in blknum order.
756 : ///
757 1638594 : async fn put_image(
758 1638594 : &mut self,
759 1638594 : key: Key,
760 1638594 : img: Bytes,
761 1638594 : ctx: &RequestContext,
762 1638594 : ) -> anyhow::Result<()> {
763 1638594 : ensure!(self.key_range.contains(&key));
764 1638594 : let compression = self.conf.image_compression;
765 1638594 : let uncompressed_len = img.len() as u64;
766 1638594 : self.uncompressed_bytes += uncompressed_len;
767 1638594 : self.num_keys += 1;
768 1638594 : let (_img, res) = self
769 1638594 : .blob_writer
770 1638594 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
771 1664552 : .await;
772 : // TODO: re-use the buffer for `img` further upstack
773 1638594 : let (off, compression_info) = res?;
774 1638594 : if compression_info.compressed_size.is_some() {
775 24006 : // The image has been considered for compression at least
776 24006 : self.uncompressed_bytes_eligible += uncompressed_len;
777 1614588 : }
778 1638594 : if compression_info.written_compressed {
779 0 : // The image has been compressed
780 0 : self.uncompressed_bytes_chosen += uncompressed_len;
781 1638594 : }
782 :
783 1638594 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
784 1638594 : key.write_to_byte_slice(&mut keybuf);
785 1638594 : self.tree.append(&keybuf, off)?;
786 :
787 : #[cfg(feature = "testing")]
788 1638594 : {
789 1638594 : self.last_written_key = key;
790 1638594 : }
791 1638594 :
792 1638594 : Ok(())
793 1638594 : }
794 :
795 : ///
796 : /// Finish writing the image layer.
797 : ///
798 924 : async fn finish(
799 924 : self,
800 924 : ctx: &RequestContext,
801 924 : end_key: Option<Key>,
802 924 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
803 924 : let index_start_blk =
804 924 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
805 924 :
806 924 : // Calculate compression ratio
807 924 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
808 924 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
809 924 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
810 924 : .inc_by(self.uncompressed_bytes_eligible);
811 924 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
812 924 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
813 924 :
814 924 : let mut file = self.blob_writer.into_inner();
815 924 :
816 924 : // Write out the index
817 924 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
818 0 : .await?;
819 924 : let (index_root_blk, block_buf) = self.tree.finish()?;
820 3216 : for buf in block_buf.blocks {
821 2292 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
822 2292 : res?;
823 : }
824 :
825 924 : let final_key_range = if let Some(end_key) = end_key {
826 102 : self.key_range.start..end_key
827 : } else {
828 822 : self.key_range.clone()
829 : };
830 :
831 : // Fill in the summary on blk 0
832 924 : let summary = Summary {
833 924 : magic: IMAGE_FILE_MAGIC,
834 924 : format_version: STORAGE_FORMAT_VERSION,
835 924 : tenant_id: self.tenant_shard_id.tenant_id,
836 924 : timeline_id: self.timeline_id,
837 924 : key_range: final_key_range.clone(),
838 924 : lsn: self.lsn,
839 924 : index_start_blk,
840 924 : index_root_blk,
841 924 : };
842 924 :
843 924 : let mut buf = Vec::with_capacity(PAGE_SZ);
844 924 : // TODO: could use smallvec here but it's a pain with Slice<T>
845 924 : Summary::ser_into(&summary, &mut buf)?;
846 924 : file.seek(SeekFrom::Start(0)).await?;
847 924 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
848 924 : res?;
849 :
850 924 : let metadata = file
851 924 : .metadata()
852 467 : .await
853 924 : .context("get metadata to determine file size")?;
854 :
855 924 : let desc = PersistentLayerDesc::new_img(
856 924 : self.tenant_shard_id,
857 924 : self.timeline_id,
858 924 : final_key_range,
859 924 : self.lsn,
860 924 : metadata.len(),
861 924 : );
862 :
863 : #[cfg(feature = "testing")]
864 924 : if let Some(end_key) = end_key {
865 102 : assert!(
866 102 : self.last_written_key < end_key,
867 0 : "written key violates end_key range"
868 : );
869 822 : }
870 :
871 : // Note: Because we open the file in write-only mode, we cannot
872 : // reuse the same VirtualFile for reading later. That's why we don't
873 : // set inner.file here. The first read will have to re-open it.
874 :
875 : // fsync the file
876 924 : file.sync_all().await?;
877 :
878 924 : trace!("created image layer {}", self.path);
879 :
880 924 : Ok((desc, self.path))
881 924 : }
882 : }
883 :
884 : /// A builder object for constructing a new image layer.
885 : ///
886 : /// Usage:
887 : ///
888 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
889 : ///
890 : /// 2. Write the contents by calling `put_page_image` for every key-value
891 : /// pair in the key range.
892 : ///
893 : /// 3. Call `finish`.
894 : ///
895 : /// # Note
896 : ///
897 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
898 : /// possible for the writer to drop before `finish` is actually called. So this
899 : /// could lead to odd temporary files in the directory, exhausting file system.
900 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
901 : /// implementation that cleans up the temporary file in failure. It's not
902 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
903 : /// out some fields, making it impossible to implement `Drop`.
904 : ///
905 : #[must_use]
906 : pub struct ImageLayerWriter {
907 : inner: Option<ImageLayerWriterInner>,
908 : }
909 :
910 : impl ImageLayerWriter {
911 : ///
912 : /// Start building a new image layer.
913 : ///
914 1506 : pub async fn new(
915 1506 : conf: &'static PageServerConf,
916 1506 : timeline_id: TimelineId,
917 1506 : tenant_shard_id: TenantShardId,
918 1506 : key_range: &Range<Key>,
919 1506 : lsn: Lsn,
920 1506 : ctx: &RequestContext,
921 1506 : ) -> anyhow::Result<ImageLayerWriter> {
922 1506 : Ok(Self {
923 1506 : inner: Some(
924 1506 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
925 1008 : .await?,
926 : ),
927 : })
928 1506 : }
929 :
930 : ///
931 : /// Write next value to the file.
932 : ///
933 : /// The page versions must be appended in blknum order.
934 : ///
935 1638594 : pub async fn put_image(
936 1638594 : &mut self,
937 1638594 : key: Key,
938 1638594 : img: Bytes,
939 1638594 : ctx: &RequestContext,
940 1638594 : ) -> anyhow::Result<()> {
941 1664552 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
942 1638594 : }
943 :
944 : /// Estimated size of the image layer.
945 25146 : pub(crate) fn estimated_size(&self) -> u64 {
946 25146 : let inner = self.inner.as_ref().unwrap();
947 25146 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
948 25146 : }
949 :
950 25326 : pub(crate) fn num_keys(&self) -> usize {
951 25326 : self.inner.as_ref().unwrap().num_keys
952 25326 : }
953 :
954 : ///
955 : /// Finish writing the image layer.
956 : ///
957 822 : pub(crate) async fn finish(
958 822 : mut self,
959 822 : ctx: &RequestContext,
960 822 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
961 2359 : self.inner.take().unwrap().finish(ctx, None).await
962 822 : }
963 :
964 : /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
965 102 : pub(super) async fn finish_with_end_key(
966 102 : mut self,
967 102 : end_key: Key,
968 102 : ctx: &RequestContext,
969 102 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
970 210 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
971 102 : }
972 : }
973 :
974 : impl Drop for ImageLayerWriter {
975 1506 : fn drop(&mut self) {
976 1506 : if let Some(inner) = self.inner.take() {
977 582 : inner.blob_writer.into_inner().remove();
978 924 : }
979 1506 : }
980 : }
981 :
982 : pub struct ImageLayerIterator<'a> {
983 : image_layer: &'a ImageLayerInner,
984 : ctx: &'a RequestContext,
985 : planner: StreamingVectoredReadPlanner,
986 : index_iter: DiskBtreeIterator<'a>,
987 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
988 : is_end: bool,
989 : }
990 :
991 : impl<'a> ImageLayerIterator<'a> {
992 0 : pub(crate) fn layer_dbg_info(&self) -> String {
993 0 : self.image_layer.layer_dbg_info()
994 0 : }
995 :
996 : /// Retrieve a batch of key-value pairs into the iterator buffer.
997 57052 : async fn next_batch(&mut self) -> anyhow::Result<()> {
998 57052 : assert!(self.key_values_batch.is_empty());
999 57052 : assert!(!self.is_end);
1000 :
1001 57052 : let plan = loop {
1002 86310 : if let Some(res) = self.index_iter.next().await {
1003 86118 : let (raw_key, offset) = res?;
1004 86118 : if let Some(batch_plan) = self.planner.handle(
1005 86118 : Key::from_slice(&raw_key[..KEY_SIZE]),
1006 86118 : self.image_layer.lsn,
1007 86118 : offset,
1008 86118 : ) {
1009 56860 : break batch_plan;
1010 29258 : }
1011 : } else {
1012 192 : self.is_end = true;
1013 192 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1014 192 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1015 192 : break item;
1016 : } else {
1017 0 : return Ok(()); // TODO: a test case on empty iterator
1018 : }
1019 : }
1020 : };
1021 57052 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1022 57052 : let mut next_batch = std::collections::VecDeque::new();
1023 57052 : let buf_size = plan.size();
1024 57052 : let buf = BytesMut::with_capacity(buf_size);
1025 57052 : let blobs_buf = vectored_blob_reader
1026 57052 : .read_blobs(&plan, buf, self.ctx)
1027 28970 : .await?;
1028 57052 : let frozen_buf: Bytes = blobs_buf.buf.freeze();
1029 86034 : for meta in blobs_buf.blobs.iter() {
1030 86034 : let img_buf = frozen_buf.slice(meta.start..meta.end);
1031 86034 : next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
1032 86034 : }
1033 57052 : self.key_values_batch = next_batch;
1034 57052 : Ok(())
1035 57052 : }
1036 :
1037 85524 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1038 85524 : if self.key_values_batch.is_empty() {
1039 57100 : if self.is_end {
1040 300 : return Ok(None);
1041 56800 : }
1042 56800 : self.next_batch().await?;
1043 28424 : }
1044 85224 : Ok(Some(
1045 85224 : self.key_values_batch
1046 85224 : .pop_front()
1047 85224 : .expect("should not be empty"),
1048 85224 : ))
1049 85524 : }
1050 : }
1051 :
1052 : #[cfg(test)]
1053 : mod test {
1054 : use std::{sync::Arc, time::Duration};
1055 :
1056 : use bytes::Bytes;
1057 : use itertools::Itertools;
1058 : use pageserver_api::{
1059 : key::Key,
1060 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1061 : };
1062 : use utils::{
1063 : generation::Generation,
1064 : id::{TenantId, TimelineId},
1065 : lsn::Lsn,
1066 : };
1067 :
1068 : use crate::{
1069 : context::RequestContext,
1070 : repository::Value,
1071 : tenant::{
1072 : config::TenantConf,
1073 : harness::{TenantHarness, TIMELINE_ID},
1074 : storage_layer::{Layer, ResidentLayer},
1075 : vectored_blob_io::StreamingVectoredReadPlanner,
1076 : Tenant, Timeline,
1077 : },
1078 : DEFAULT_PG_VERSION,
1079 : };
1080 :
1081 : use super::{ImageLayerIterator, ImageLayerWriter};
1082 :
1083 : #[tokio::test]
1084 6 : async fn image_layer_rewrite() {
1085 6 : let tenant_conf = TenantConf {
1086 6 : gc_period: Duration::ZERO,
1087 6 : compaction_period: Duration::ZERO,
1088 6 : ..TenantConf::default()
1089 6 : };
1090 6 : let tenant_id = TenantId::generate();
1091 6 : let mut gen = Generation::new(0xdead0001);
1092 30 : let mut get_next_gen = || {
1093 30 : let ret = gen;
1094 30 : gen = gen.next();
1095 30 : ret
1096 30 : };
1097 6 : // The LSN at which we will create an image layer to filter
1098 6 : let lsn = Lsn(0xdeadbeef0000);
1099 6 : let timeline_id = TimelineId::generate();
1100 6 :
1101 6 : //
1102 6 : // Create an unsharded parent with a layer.
1103 6 : //
1104 6 :
1105 6 : let harness = TenantHarness::create_custom(
1106 6 : "test_image_layer_rewrite--parent",
1107 6 : tenant_conf.clone(),
1108 6 : tenant_id,
1109 6 : ShardIdentity::unsharded(),
1110 6 : get_next_gen(),
1111 6 : )
1112 6 : .await
1113 6 : .unwrap();
1114 24 : let (tenant, ctx) = harness.load().await;
1115 6 : let timeline = tenant
1116 6 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1117 12 : .await
1118 6 : .unwrap();
1119 6 :
1120 6 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1121 6 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1122 6 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1123 6 : let range = input_start..input_end;
1124 6 :
1125 6 : // Build an image layer to filter
1126 6 : let resident = {
1127 6 : let mut writer = ImageLayerWriter::new(
1128 6 : harness.conf,
1129 6 : timeline_id,
1130 6 : harness.tenant_shard_id,
1131 6 : &range,
1132 6 : lsn,
1133 6 : &ctx,
1134 6 : )
1135 6 : .await
1136 6 : .unwrap();
1137 6 :
1138 6 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1139 6 : let mut key = range.start;
1140 786438 : while key < range.end {
1141 798717 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1142 786432 :
1143 786432 : key = key.next();
1144 6 : }
1145 357 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1146 6 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1147 6 : };
1148 6 : let original_size = resident.metadata().file_size;
1149 6 :
1150 6 : //
1151 6 : // Create child shards and do the rewrite, exercising filter().
1152 6 : // TODO: abstraction in TenantHarness for splits.
1153 6 : //
1154 6 :
1155 6 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1156 6 : // range, middle of key range.
1157 6 : let shard_count = ShardCount::new(4);
1158 24 : for shard_number in 0..shard_count.count() {
1159 6 : //
1160 6 : // mimic the shard split
1161 6 : //
1162 24 : let shard_identity = ShardIdentity::new(
1163 24 : ShardNumber(shard_number),
1164 24 : shard_count,
1165 24 : ShardStripeSize(0x8000),
1166 24 : )
1167 24 : .unwrap();
1168 24 : let harness = TenantHarness::create_custom(
1169 24 : Box::leak(Box::new(format!(
1170 24 : "test_image_layer_rewrite--child{}",
1171 24 : shard_identity.shard_slug()
1172 24 : ))),
1173 24 : tenant_conf.clone(),
1174 24 : tenant_id,
1175 24 : shard_identity,
1176 24 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1177 24 : // But here, all we care about is that the gen number is unique.
1178 24 : get_next_gen(),
1179 24 : )
1180 6 : .await
1181 24 : .unwrap();
1182 96 : let (tenant, ctx) = harness.load().await;
1183 24 : let timeline = tenant
1184 24 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1185 36 : .await
1186 24 : .unwrap();
1187 6 :
1188 6 : //
1189 6 : // use filter() and make assertions
1190 6 : //
1191 6 :
1192 24 : let mut filtered_writer = ImageLayerWriter::new(
1193 24 : harness.conf,
1194 24 : timeline_id,
1195 24 : harness.tenant_shard_id,
1196 24 : &range,
1197 24 : lsn,
1198 24 : &ctx,
1199 24 : )
1200 12 : .await
1201 24 : .unwrap();
1202 6 :
1203 24 : let wrote_keys = resident
1204 24 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1205 800157 : .await
1206 24 : .unwrap();
1207 24 : let replacement = if wrote_keys > 0 {
1208 387 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1209 18 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1210 18 : Some(resident)
1211 6 : } else {
1212 6 : None
1213 6 : };
1214 6 :
1215 6 : // This exact size and those below will need updating as/when the layer encoding changes, but
1216 6 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1217 24 : assert_eq!(original_size, 1597440);
1218 6 :
1219 24 : match shard_number {
1220 6 : 0 => {
1221 6 : // We should have written out just one stripe for our shard identity
1222 6 : assert_eq!(wrote_keys, 0x8000);
1223 6 : let replacement = replacement.unwrap();
1224 6 :
1225 6 : // We should have dropped some of the data
1226 6 : assert!(replacement.metadata().file_size < original_size);
1227 6 : assert!(replacement.metadata().file_size > 0);
1228 6 :
1229 6 : // Assert that we dropped ~3/4 of the data.
1230 6 : assert_eq!(replacement.metadata().file_size, 417792);
1231 6 : }
1232 6 : 1 => {
1233 6 : // Shard 1 has no keys in our input range
1234 6 : assert_eq!(wrote_keys, 0x0);
1235 6 : assert!(replacement.is_none());
1236 6 : }
1237 6 : 2 => {
1238 6 : // Shard 2 has one stripes in the input range
1239 6 : assert_eq!(wrote_keys, 0x8000);
1240 6 : let replacement = replacement.unwrap();
1241 6 : assert!(replacement.metadata().file_size < original_size);
1242 6 : assert!(replacement.metadata().file_size > 0);
1243 6 : assert_eq!(replacement.metadata().file_size, 417792);
1244 6 : }
1245 6 : 3 => {
1246 6 : // Shard 3 has two stripes in the input range
1247 6 : assert_eq!(wrote_keys, 0x10000);
1248 6 : let replacement = replacement.unwrap();
1249 6 : assert!(replacement.metadata().file_size < original_size);
1250 6 : assert!(replacement.metadata().file_size > 0);
1251 6 : assert_eq!(replacement.metadata().file_size, 811008);
1252 6 : }
1253 6 : _ => unreachable!(),
1254 6 : }
1255 6 : }
1256 6 : }
1257 :
1258 6 : async fn produce_image_layer(
1259 6 : tenant: &Tenant,
1260 6 : tline: &Arc<Timeline>,
1261 6 : mut images: Vec<(Key, Bytes)>,
1262 6 : lsn: Lsn,
1263 6 : ctx: &RequestContext,
1264 6 : ) -> anyhow::Result<ResidentLayer> {
1265 6 : images.sort();
1266 6 : let (key_start, _) = images.first().unwrap();
1267 6 : let (key_last, _) = images.last().unwrap();
1268 6 : let key_end = key_last.next();
1269 6 : let key_range = *key_start..key_end;
1270 6 : let mut writer = ImageLayerWriter::new(
1271 6 : tenant.conf,
1272 6 : tline.timeline_id,
1273 6 : tenant.tenant_shard_id,
1274 6 : &key_range,
1275 6 : lsn,
1276 6 : ctx,
1277 6 : )
1278 3 : .await?;
1279 :
1280 6006 : for (key, img) in images {
1281 6093 : writer.put_image(key, img, ctx).await?;
1282 : }
1283 12 : let (desc, path) = writer.finish(ctx).await?;
1284 6 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1285 :
1286 6 : Ok::<_, anyhow::Error>(img_layer)
1287 6 : }
1288 :
1289 84 : async fn assert_img_iter_equal(
1290 84 : img_iter: &mut ImageLayerIterator<'_>,
1291 84 : expect: &[(Key, Bytes)],
1292 84 : expect_lsn: Lsn,
1293 84 : ) {
1294 84 : let mut expect_iter = expect.iter();
1295 : loop {
1296 84084 : let o1 = img_iter.next().await.unwrap();
1297 84084 : let o2 = expect_iter.next();
1298 84084 : match (o1, o2) {
1299 84 : (None, None) => break,
1300 84000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1301 84000 : let Value::Image(i1) = v1 else {
1302 0 : panic!("expect Value::Image")
1303 : };
1304 84000 : assert_eq!(&k1, k2);
1305 84000 : assert_eq!(l1, expect_lsn);
1306 84000 : assert_eq!(&i1, i2);
1307 : }
1308 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1309 : }
1310 : }
1311 84 : }
1312 :
1313 : #[tokio::test]
1314 6 : async fn image_layer_iterator() {
1315 6 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1316 24 : let (tenant, ctx) = harness.load().await;
1317 6 :
1318 6 : let tline = tenant
1319 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1320 12 : .await
1321 6 : .unwrap();
1322 6 :
1323 6000 : fn get_key(id: u32) -> Key {
1324 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1325 6000 : key.field6 = id;
1326 6000 : key
1327 6000 : }
1328 6 : const N: usize = 1000;
1329 6 : let test_imgs = (0..N)
1330 6000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1331 6 : .collect_vec();
1332 6 : let resident_layer =
1333 6 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1334 6108 : .await
1335 6 : .unwrap();
1336 6 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1337 18 : for max_read_size in [1, 1024] {
1338 96 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1339 84 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1340 84 : // Test if the batch size is correctly determined
1341 84 : let mut iter = img_layer.iter(&ctx);
1342 84 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1343 84 : let mut num_items = 0;
1344 336 : for _ in 0..3 {
1345 252 : iter.next_batch().await.unwrap();
1346 252 : num_items += iter.key_values_batch.len();
1347 252 : if max_read_size == 1 {
1348 6 : // every key should be a batch b/c the value is larger than max_read_size
1349 126 : assert_eq!(iter.key_values_batch.len(), 1);
1350 6 : } else {
1351 126 : assert!(iter.key_values_batch.len() <= batch_size);
1352 6 : }
1353 252 : if num_items >= N {
1354 6 : break;
1355 252 : }
1356 252 : iter.key_values_batch.clear();
1357 6 : }
1358 6 : // Test if the result is correct
1359 84 : let mut iter = img_layer.iter(&ctx);
1360 84 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1361 28787 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1362 6 : }
1363 6 : }
1364 6 : }
1365 : }
|