Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use crate::config::PageServerConf;
29 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
30 : use crate::page_cache::{self, FileId, PAGE_SZ};
31 : use crate::repository::{Key, Value, KEY_SIZE};
32 : use crate::tenant::blob_io::BlobWriter;
33 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
34 : use crate::tenant::disk_btree::{
35 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
36 : };
37 : use crate::tenant::timeline::GetVectoredError;
38 : use crate::tenant::vectored_blob_io::{
39 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
40 : VectoredReadPlanner,
41 : };
42 : use crate::tenant::PageReconstructError;
43 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
44 : use crate::virtual_file::IoBufferMut;
45 : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
46 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
47 : use anyhow::{anyhow, bail, ensure, Context, Result};
48 : use bytes::Bytes;
49 : use camino::{Utf8Path, Utf8PathBuf};
50 : use hex;
51 : use itertools::Itertools;
52 : use pageserver_api::config::MaxVectoredReadBytes;
53 : use pageserver_api::key::DBDIR_KEY;
54 : use pageserver_api::keyspace::KeySpace;
55 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
56 : use rand::{distributions::Alphanumeric, Rng};
57 : use serde::{Deserialize, Serialize};
58 : use std::collections::VecDeque;
59 : use std::fs::File;
60 : use std::io::SeekFrom;
61 : use std::ops::Range;
62 : use std::os::unix::prelude::FileExt;
63 : use std::str::FromStr;
64 : use tokio::sync::OnceCell;
65 : use tokio_stream::StreamExt;
66 : use tracing::*;
67 :
68 : use utils::{
69 : bin_ser::BeSer,
70 : id::{TenantId, TimelineId},
71 : lsn::Lsn,
72 : };
73 :
74 : use super::layer_name::ImageLayerName;
75 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
76 :
77 : ///
78 : /// Header stored in the beginning of the file
79 : ///
80 : /// After this comes the 'values' part, starting on block 1. After that,
81 : /// the 'index' starts at the block indicated by 'index_start_blk'
82 : ///
83 102 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
84 : pub struct Summary {
85 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
86 : pub magic: u16,
87 : pub format_version: u16,
88 :
89 : pub tenant_id: TenantId,
90 : pub timeline_id: TimelineId,
91 : pub key_range: Range<Key>,
92 : pub lsn: Lsn,
93 :
94 : /// Block number where the 'index' part of the file begins.
95 : pub index_start_blk: u32,
96 : /// Block within the 'index', where the B-tree root page is stored
97 : pub index_root_blk: u32,
98 : // the 'values' part starts after the summary header, on block 1.
99 : }
100 :
101 : impl From<&ImageLayer> for Summary {
102 0 : fn from(layer: &ImageLayer) -> Self {
103 0 : Self::expected(
104 0 : layer.desc.tenant_shard_id.tenant_id,
105 0 : layer.desc.timeline_id,
106 0 : layer.desc.key_range.clone(),
107 0 : layer.lsn,
108 0 : )
109 0 : }
110 : }
111 :
112 : impl Summary {
113 102 : pub(super) fn expected(
114 102 : tenant_id: TenantId,
115 102 : timeline_id: TimelineId,
116 102 : key_range: Range<Key>,
117 102 : lsn: Lsn,
118 102 : ) -> Self {
119 102 : Self {
120 102 : magic: IMAGE_FILE_MAGIC,
121 102 : format_version: STORAGE_FORMAT_VERSION,
122 102 : tenant_id,
123 102 : timeline_id,
124 102 : key_range,
125 102 : lsn,
126 102 :
127 102 : index_start_blk: 0,
128 102 : index_root_blk: 0,
129 102 : }
130 102 : }
131 : }
132 :
133 : /// This is used only from `pagectl`. Within pageserver, all layers are
134 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
135 : pub struct ImageLayer {
136 : path: Utf8PathBuf,
137 : pub desc: PersistentLayerDesc,
138 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
139 : pub lsn: Lsn,
140 : inner: OnceCell<ImageLayerInner>,
141 : }
142 :
143 : impl std::fmt::Debug for ImageLayer {
144 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 : use super::RangeDisplayDebug;
146 :
147 0 : f.debug_struct("ImageLayer")
148 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
149 0 : .field("file_size", &self.desc.file_size)
150 0 : .field("lsn", &self.lsn)
151 0 : .field("inner", &self.inner)
152 0 : .finish()
153 0 : }
154 : }
155 :
156 : /// ImageLayer is the in-memory data structure associated with an on-disk image
157 : /// file.
158 : pub struct ImageLayerInner {
159 : // values copied from summary
160 : index_start_blk: u32,
161 : index_root_blk: u32,
162 :
163 : key_range: Range<Key>,
164 : lsn: Lsn,
165 :
166 : file: VirtualFile,
167 : file_id: FileId,
168 :
169 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
170 : }
171 :
172 : impl ImageLayerInner {
173 0 : pub(crate) fn layer_dbg_info(&self) -> String {
174 0 : format!(
175 0 : "image {}..{} {}",
176 0 : self.key_range().start,
177 0 : self.key_range().end,
178 0 : self.lsn()
179 0 : )
180 0 : }
181 : }
182 :
183 : impl std::fmt::Debug for ImageLayerInner {
184 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 0 : f.debug_struct("ImageLayerInner")
186 0 : .field("index_start_blk", &self.index_start_blk)
187 0 : .field("index_root_blk", &self.index_root_blk)
188 0 : .finish()
189 0 : }
190 : }
191 :
192 : impl ImageLayerInner {
193 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
194 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
195 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
196 0 : self.index_start_blk,
197 0 : self.index_root_blk,
198 0 : block_reader,
199 0 : );
200 0 :
201 0 : tree_reader.dump().await?;
202 :
203 0 : tree_reader
204 0 : .visit(
205 0 : &[0u8; KEY_SIZE],
206 0 : VisitDirection::Forwards,
207 0 : |key, value| {
208 0 : println!("key: {} offset {}", hex::encode(key), value);
209 0 : true
210 0 : },
211 0 : ctx,
212 0 : )
213 0 : .await?;
214 :
215 0 : Ok(())
216 0 : }
217 : }
218 :
219 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
220 : impl std::fmt::Display for ImageLayer {
221 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 0 : write!(f, "{}", self.layer_desc().short_id())
223 0 : }
224 : }
225 :
226 : impl AsLayerDesc for ImageLayer {
227 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
228 0 : &self.desc
229 0 : }
230 : }
231 :
232 : impl ImageLayer {
233 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
234 0 : self.desc.dump();
235 0 :
236 0 : if !verbose {
237 0 : return Ok(());
238 0 : }
239 :
240 0 : let inner = self.load(ctx).await?;
241 :
242 0 : inner.dump(ctx).await?;
243 :
244 0 : Ok(())
245 0 : }
246 :
247 490 : fn temp_path_for(
248 490 : conf: &PageServerConf,
249 490 : timeline_id: TimelineId,
250 490 : tenant_shard_id: TenantShardId,
251 490 : fname: &ImageLayerName,
252 490 : ) -> Utf8PathBuf {
253 490 : let rand_string: String = rand::thread_rng()
254 490 : .sample_iter(&Alphanumeric)
255 490 : .take(8)
256 490 : .map(char::from)
257 490 : .collect();
258 490 :
259 490 : conf.timeline_path(&tenant_shard_id, &timeline_id)
260 490 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
261 490 : }
262 :
263 : ///
264 : /// Open the underlying file and read the metadata into memory, if it's
265 : /// not loaded already.
266 : ///
267 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
268 0 : self.inner
269 0 : .get_or_try_init(|| self.load_inner(ctx))
270 0 : .await
271 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
272 0 : }
273 :
274 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
275 0 : let path = self.path();
276 :
277 0 : let loaded =
278 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
279 :
280 : // not production code
281 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
282 0 : let expected_layer_name = self.layer_desc().layer_name();
283 0 :
284 0 : if actual_layer_name != expected_layer_name {
285 0 : println!("warning: filename does not match what is expected from in-file summary");
286 0 : println!("actual: {:?}", actual_layer_name.to_string());
287 0 : println!("expected: {:?}", expected_layer_name.to_string());
288 0 : }
289 :
290 0 : Ok(loaded)
291 0 : }
292 :
293 : /// Create an ImageLayer struct representing an existing file on disk.
294 : ///
295 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
296 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
297 0 : let mut summary_buf = vec![0; PAGE_SZ];
298 0 : file.read_exact_at(&mut summary_buf, 0)?;
299 0 : let summary = Summary::des_prefix(&summary_buf)?;
300 0 : let metadata = file
301 0 : .metadata()
302 0 : .context("get file metadata to determine size")?;
303 :
304 : // This function is never used for constructing layers in a running pageserver,
305 : // so it does not need an accurate TenantShardId.
306 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
307 0 :
308 0 : Ok(ImageLayer {
309 0 : path: path.to_path_buf(),
310 0 : desc: PersistentLayerDesc::new_img(
311 0 : tenant_shard_id,
312 0 : summary.timeline_id,
313 0 : summary.key_range,
314 0 : summary.lsn,
315 0 : metadata.len(),
316 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
317 0 : lsn: summary.lsn,
318 0 : inner: OnceCell::new(),
319 0 : })
320 0 : }
321 :
322 0 : fn path(&self) -> Utf8PathBuf {
323 0 : self.path.clone()
324 0 : }
325 : }
326 :
327 0 : #[derive(thiserror::Error, Debug)]
328 : pub enum RewriteSummaryError {
329 : #[error("magic mismatch")]
330 : MagicMismatch,
331 : #[error(transparent)]
332 : Other(#[from] anyhow::Error),
333 : }
334 :
335 : impl From<std::io::Error> for RewriteSummaryError {
336 0 : fn from(e: std::io::Error) -> Self {
337 0 : Self::Other(anyhow::anyhow!(e))
338 0 : }
339 : }
340 :
341 : impl ImageLayer {
342 0 : pub async fn rewrite_summary<F>(
343 0 : path: &Utf8Path,
344 0 : rewrite: F,
345 0 : ctx: &RequestContext,
346 0 : ) -> Result<(), RewriteSummaryError>
347 0 : where
348 0 : F: Fn(Summary) -> Summary,
349 0 : {
350 0 : let mut file = VirtualFile::open_with_options(
351 0 : path,
352 0 : virtual_file::OpenOptions::new().read(true).write(true),
353 0 : ctx,
354 0 : )
355 0 : .await
356 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
357 0 : let file_id = page_cache::next_file_id();
358 0 : let block_reader = FileBlockReader::new(&file, file_id);
359 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
360 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
361 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
362 0 : return Err(RewriteSummaryError::MagicMismatch);
363 0 : }
364 0 :
365 0 : let new_summary = rewrite(actual_summary);
366 0 :
367 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
368 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
369 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
370 0 : file.seek(SeekFrom::Start(0)).await?;
371 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
372 0 : res?;
373 0 : Ok(())
374 0 : }
375 : }
376 :
377 : impl ImageLayerInner {
378 36 : pub(crate) fn key_range(&self) -> &Range<Key> {
379 36 : &self.key_range
380 36 : }
381 :
382 36 : pub(crate) fn lsn(&self) -> Lsn {
383 36 : self.lsn
384 36 : }
385 :
386 102 : pub(super) async fn load(
387 102 : path: &Utf8Path,
388 102 : lsn: Lsn,
389 102 : summary: Option<Summary>,
390 102 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
391 102 : ctx: &RequestContext,
392 102 : ) -> anyhow::Result<Self> {
393 102 : let file = VirtualFile::open_v2(path, ctx)
394 51 : .await
395 102 : .context("open layer file")?;
396 102 : let file_id = page_cache::next_file_id();
397 102 : let block_reader = FileBlockReader::new(&file, file_id);
398 102 : let summary_blk = block_reader
399 102 : .read_blk(0, ctx)
400 52 : .await
401 102 : .context("read first block")?;
402 :
403 : // length is the only way how this could fail, so it's not actually likely at all unless
404 : // read_blk returns wrong sized block.
405 : //
406 : // TODO: confirm and make this into assertion
407 102 : let actual_summary =
408 102 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
409 :
410 102 : if let Some(mut expected_summary) = summary {
411 : // production code path
412 102 : expected_summary.index_start_blk = actual_summary.index_start_blk;
413 102 : expected_summary.index_root_blk = actual_summary.index_root_blk;
414 102 : // mask out the timeline_id, but still require the layers to be from the same tenant
415 102 : expected_summary.timeline_id = actual_summary.timeline_id;
416 102 :
417 102 : if actual_summary != expected_summary {
418 0 : bail!(
419 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
420 0 : actual_summary,
421 0 : expected_summary
422 0 : );
423 102 : }
424 0 : }
425 :
426 102 : Ok(ImageLayerInner {
427 102 : index_start_blk: actual_summary.index_start_blk,
428 102 : index_root_blk: actual_summary.index_root_blk,
429 102 : lsn,
430 102 : file,
431 102 : file_id,
432 102 : max_vectored_read_bytes,
433 102 : key_range: actual_summary.key_range,
434 102 : })
435 102 : }
436 :
437 : // Look up the keys in the provided keyspace and update
438 : // the reconstruct state with whatever is found.
439 7969 : pub(super) async fn get_values_reconstruct_data(
440 7969 : &self,
441 7969 : keyspace: KeySpace,
442 7969 : reconstruct_state: &mut ValuesReconstructState,
443 7969 : ctx: &RequestContext,
444 7969 : ) -> Result<(), GetVectoredError> {
445 7969 : let reads = self
446 7969 : .plan_reads(keyspace, None, ctx)
447 710 : .await
448 7969 : .map_err(GetVectoredError::Other)?;
449 :
450 7969 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
451 4235 : .await;
452 :
453 7969 : reconstruct_state.on_image_layer_visited(&self.key_range);
454 7969 :
455 7969 : Ok(())
456 7969 : }
457 :
458 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
459 : /// and the keys in this layer.
460 : ///
461 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
462 : /// this shard.
463 7977 : async fn plan_reads(
464 7977 : &self,
465 7977 : keyspace: KeySpace,
466 7977 : shard_identity: Option<&ShardIdentity>,
467 7977 : ctx: &RequestContext,
468 7977 : ) -> anyhow::Result<Vec<VectoredRead>> {
469 7977 : let mut planner = VectoredReadPlanner::new(
470 7977 : self.max_vectored_read_bytes
471 7977 : .expect("Layer is loaded with max vectored bytes config")
472 7977 : .0
473 7977 : .into(),
474 7977 : );
475 7977 :
476 7977 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
477 7977 : let tree_reader =
478 7977 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
479 7977 :
480 7977 : let ctx = RequestContextBuilder::extend(ctx)
481 7977 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
482 7977 : .build();
483 :
484 29707 : for range in keyspace.ranges.iter() {
485 29707 : let mut range_end_handled = false;
486 29707 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
487 29707 : range.start.write_to_byte_slice(&mut search_key);
488 29707 :
489 29707 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
490 29707 : let mut index_stream = std::pin::pin!(index_stream);
491 :
492 1096536 : while let Some(index_entry) = index_stream.next().await {
493 1096337 : let (raw_key, offset) = index_entry?;
494 :
495 1096337 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
496 1096337 : assert!(key >= range.start);
497 :
498 1096337 : let flag = if let Some(shard_identity) = shard_identity {
499 1048576 : if shard_identity.is_key_disposable(&key) {
500 786432 : BlobFlag::Ignore
501 : } else {
502 262144 : BlobFlag::None
503 : }
504 : } else {
505 47761 : BlobFlag::None
506 : };
507 :
508 1096337 : if key >= range.end {
509 29508 : planner.handle_range_end(offset);
510 29508 : range_end_handled = true;
511 29508 : break;
512 1066829 : } else {
513 1066829 : planner.handle(key, self.lsn, offset, flag);
514 1066829 : }
515 : }
516 :
517 29707 : if !range_end_handled {
518 199 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
519 199 : planner.handle_range_end(payload_end);
520 29508 : }
521 : }
522 :
523 7977 : Ok(planner.finish())
524 7977 : }
525 :
526 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
527 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
528 8 : pub(super) async fn filter(
529 8 : &self,
530 8 : shard_identity: &ShardIdentity,
531 8 : writer: &mut ImageLayerWriter,
532 8 : ctx: &RequestContext,
533 8 : ) -> anyhow::Result<usize> {
534 : // Fragment the range into the regions owned by this ShardIdentity
535 8 : let plan = self
536 8 : .plan_reads(
537 8 : KeySpace {
538 8 : // If asked for the total key space, plan_reads will give us all the keys in the layer
539 8 : ranges: vec![Key::MIN..Key::MAX],
540 8 : },
541 8 : Some(shard_identity),
542 8 : ctx,
543 8 : )
544 469 : .await?;
545 :
546 8 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
547 8 : let mut key_count = 0;
548 16 : for read in plan.into_iter() {
549 16 : let buf_size = read.size();
550 16 :
551 16 : let buf = IoBufferMut::with_capacity(buf_size);
552 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
553 :
554 16 : let view = BufView::new_slice(&blobs_buf.buf);
555 :
556 262144 : for meta in blobs_buf.blobs.iter() {
557 262144 : let img_buf = meta.read(&view).await?;
558 :
559 262144 : key_count += 1;
560 262144 : writer
561 262144 : .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
562 266240 : .await
563 262144 : .context(format!("Storing key {}", meta.meta.key))?;
564 : }
565 : }
566 :
567 8 : Ok(key_count)
568 8 : }
569 :
570 7969 : async fn do_reads_and_update_state(
571 7969 : &self,
572 7969 : reads: Vec<VectoredRead>,
573 7969 : reconstruct_state: &mut ValuesReconstructState,
574 7969 : ctx: &RequestContext,
575 7969 : ) {
576 7969 : let max_vectored_read_bytes = self
577 7969 : .max_vectored_read_bytes
578 7969 : .expect("Layer is loaded with max vectored bytes config")
579 7969 : .0
580 7969 : .into();
581 7969 :
582 7969 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
583 8230 : for read in reads.into_iter() {
584 8230 : let buf_size = read.size();
585 8230 :
586 8230 : if buf_size > max_vectored_read_bytes {
587 : // If the read is oversized, it should only contain one key.
588 0 : let offenders = read
589 0 : .blobs_at
590 0 : .as_slice()
591 0 : .iter()
592 0 : .filter_map(|(_, blob_meta)| {
593 0 : if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
594 : // The size of values for these keys is unbounded and can
595 : // grow very large in pathological cases.
596 0 : None
597 : } else {
598 0 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
599 : }
600 0 : })
601 0 : .join(", ");
602 0 :
603 0 : if !offenders.is_empty() {
604 0 : tracing::warn!(
605 0 : "Oversized vectored read ({} > {}) for keys {}",
606 : buf_size,
607 : max_vectored_read_bytes,
608 : offenders
609 : );
610 0 : }
611 8230 : }
612 :
613 8230 : let buf = IoBufferMut::with_capacity(buf_size);
614 8230 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
615 :
616 8230 : match res {
617 8230 : Ok(blobs_buf) => {
618 8230 : let view = BufView::new_slice(&blobs_buf.buf);
619 18253 : for meta in blobs_buf.blobs.iter() {
620 18253 : let img_buf = meta.read(&view).await;
621 :
622 18253 : let img_buf = match img_buf {
623 18253 : Ok(img_buf) => img_buf,
624 0 : Err(e) => {
625 0 : reconstruct_state.on_key_error(
626 0 : meta.meta.key,
627 0 : PageReconstructError::Other(anyhow!(e).context(format!(
628 0 : "Failed to decompress blob from virtual file {}",
629 0 : self.file.path(),
630 0 : ))),
631 0 : );
632 0 :
633 0 : continue;
634 : }
635 : };
636 18253 : reconstruct_state.update_key(
637 18253 : &meta.meta.key,
638 18253 : self.lsn,
639 18253 : Value::Image(img_buf.into_bytes()),
640 18253 : );
641 : }
642 : }
643 0 : Err(err) => {
644 0 : let kind = err.kind();
645 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
646 0 : reconstruct_state.on_key_error(
647 0 : blob_meta.key,
648 0 : PageReconstructError::from(anyhow!(
649 0 : "Failed to read blobs from virtual file {}: {}",
650 0 : self.file.path(),
651 0 : kind
652 0 : )),
653 0 : );
654 0 : }
655 : }
656 : };
657 : }
658 7969 : }
659 :
660 92 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
661 92 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
662 92 : let tree_reader =
663 92 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
664 92 : ImageLayerIterator {
665 92 : image_layer: self,
666 92 : ctx,
667 92 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
668 92 : key_values_batch: VecDeque::new(),
669 92 : is_end: false,
670 92 : planner: StreamingVectoredReadPlanner::new(
671 92 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
672 92 : 1024, // The default value. Unit tests might use a different value
673 92 : ),
674 92 : }
675 92 : }
676 : }
677 :
678 : /// A builder object for constructing a new image layer.
679 : ///
680 : /// Usage:
681 : ///
682 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
683 : ///
684 : /// 2. Write the contents by calling `put_page_image` for every key-value
685 : /// pair in the key range.
686 : ///
687 : /// 3. Call `finish`.
688 : ///
689 : struct ImageLayerWriterInner {
690 : conf: &'static PageServerConf,
691 : path: Utf8PathBuf,
692 : timeline_id: TimelineId,
693 : tenant_shard_id: TenantShardId,
694 : key_range: Range<Key>,
695 : lsn: Lsn,
696 :
697 : // Total uncompressed bytes passed into put_image
698 : uncompressed_bytes: u64,
699 :
700 : // Like `uncompressed_bytes`,
701 : // but only of images we might consider for compression
702 : uncompressed_bytes_eligible: u64,
703 :
704 : // Like `uncompressed_bytes`, but only of images
705 : // where we have chosen their compressed form
706 : uncompressed_bytes_chosen: u64,
707 :
708 : // Number of keys in the layer.
709 : num_keys: usize,
710 :
711 : blob_writer: BlobWriter<false>,
712 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
713 :
714 : #[cfg(feature = "testing")]
715 : last_written_key: Key,
716 : }
717 :
718 : impl ImageLayerWriterInner {
719 : ///
720 : /// Start building a new image layer.
721 : ///
722 490 : async fn new(
723 490 : conf: &'static PageServerConf,
724 490 : timeline_id: TimelineId,
725 490 : tenant_shard_id: TenantShardId,
726 490 : key_range: &Range<Key>,
727 490 : lsn: Lsn,
728 490 : ctx: &RequestContext,
729 490 : ) -> anyhow::Result<Self> {
730 490 : // Create the file initially with a temporary filename.
731 490 : // We'll atomically rename it to the final name when we're done.
732 490 : let path = ImageLayer::temp_path_for(
733 490 : conf,
734 490 : timeline_id,
735 490 : tenant_shard_id,
736 490 : &ImageLayerName {
737 490 : key_range: key_range.clone(),
738 490 : lsn,
739 490 : },
740 490 : );
741 490 : trace!("creating image layer {}", path);
742 490 : let mut file = {
743 490 : VirtualFile::open_with_options(
744 490 : &path,
745 490 : virtual_file::OpenOptions::new()
746 490 : .write(true)
747 490 : .create_new(true),
748 490 : ctx,
749 490 : )
750 328 : .await?
751 : };
752 : // make room for the header block
753 490 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
754 490 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
755 490 :
756 490 : // Initialize the b-tree index builder
757 490 : let block_buf = BlockBuf::new();
758 490 : let tree_builder = DiskBtreeBuilder::new(block_buf);
759 490 :
760 490 : let writer = Self {
761 490 : conf,
762 490 : path,
763 490 : timeline_id,
764 490 : tenant_shard_id,
765 490 : key_range: key_range.clone(),
766 490 : lsn,
767 490 : tree: tree_builder,
768 490 : blob_writer,
769 490 : uncompressed_bytes: 0,
770 490 : uncompressed_bytes_eligible: 0,
771 490 : uncompressed_bytes_chosen: 0,
772 490 : num_keys: 0,
773 490 : #[cfg(feature = "testing")]
774 490 : last_written_key: Key::MIN,
775 490 : };
776 490 :
777 490 : Ok(writer)
778 490 : }
779 :
780 : ///
781 : /// Write next value to the file.
782 : ///
783 : /// The page versions must be appended in blknum order.
784 : ///
785 546152 : async fn put_image(
786 546152 : &mut self,
787 546152 : key: Key,
788 546152 : img: Bytes,
789 546152 : ctx: &RequestContext,
790 546152 : ) -> anyhow::Result<()> {
791 546152 : ensure!(self.key_range.contains(&key));
792 546152 : let compression = self.conf.image_compression;
793 546152 : let uncompressed_len = img.len() as u64;
794 546152 : self.uncompressed_bytes += uncompressed_len;
795 546152 : self.num_keys += 1;
796 546152 : let (_img, res) = self
797 546152 : .blob_writer
798 546152 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
799 554810 : .await;
800 : // TODO: re-use the buffer for `img` further upstack
801 546152 : let (off, compression_info) = res?;
802 546152 : if compression_info.compressed_size.is_some() {
803 8002 : // The image has been considered for compression at least
804 8002 : self.uncompressed_bytes_eligible += uncompressed_len;
805 538150 : }
806 546152 : if compression_info.written_compressed {
807 0 : // The image has been compressed
808 0 : self.uncompressed_bytes_chosen += uncompressed_len;
809 546152 : }
810 :
811 546152 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
812 546152 : key.write_to_byte_slice(&mut keybuf);
813 546152 : self.tree.append(&keybuf, off)?;
814 :
815 : #[cfg(feature = "testing")]
816 546152 : {
817 546152 : self.last_written_key = key;
818 546152 : }
819 546152 :
820 546152 : Ok(())
821 546152 : }
822 :
823 : ///
824 : /// Finish writing the image layer.
825 : ///
826 294 : async fn finish(
827 294 : self,
828 294 : ctx: &RequestContext,
829 294 : end_key: Option<Key>,
830 294 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
831 294 : let temp_path = self.path.clone();
832 829 : let result = self.finish0(ctx, end_key).await;
833 294 : if let Err(ref e) = result {
834 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
835 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
836 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
837 0 : }
838 294 : }
839 294 : result
840 294 : }
841 :
842 : ///
843 : /// Finish writing the image layer.
844 : ///
845 294 : async fn finish0(
846 294 : self,
847 294 : ctx: &RequestContext,
848 294 : end_key: Option<Key>,
849 294 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
850 294 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
851 294 :
852 294 : // Calculate compression ratio
853 294 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
854 294 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
855 294 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
856 294 : .inc_by(self.uncompressed_bytes_eligible);
857 294 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
858 294 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
859 294 :
860 294 : let mut file = self.blob_writer.into_inner();
861 294 :
862 294 : // Write out the index
863 294 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
864 0 : .await?;
865 294 : let (index_root_blk, block_buf) = self.tree.finish()?;
866 1044 : for buf in block_buf.blocks {
867 750 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
868 750 : res?;
869 : }
870 :
871 294 : let final_key_range = if let Some(end_key) = end_key {
872 26 : self.key_range.start..end_key
873 : } else {
874 268 : self.key_range.clone()
875 : };
876 :
877 : // Fill in the summary on blk 0
878 294 : let summary = Summary {
879 294 : magic: IMAGE_FILE_MAGIC,
880 294 : format_version: STORAGE_FORMAT_VERSION,
881 294 : tenant_id: self.tenant_shard_id.tenant_id,
882 294 : timeline_id: self.timeline_id,
883 294 : key_range: final_key_range.clone(),
884 294 : lsn: self.lsn,
885 294 : index_start_blk,
886 294 : index_root_blk,
887 294 : };
888 294 :
889 294 : let mut buf = Vec::with_capacity(PAGE_SZ);
890 294 : // TODO: could use smallvec here but it's a pain with Slice<T>
891 294 : Summary::ser_into(&summary, &mut buf)?;
892 294 : file.seek(SeekFrom::Start(0)).await?;
893 294 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
894 294 : res?;
895 :
896 294 : let metadata = file
897 294 : .metadata()
898 148 : .await
899 294 : .context("get metadata to determine file size")?;
900 :
901 294 : let desc = PersistentLayerDesc::new_img(
902 294 : self.tenant_shard_id,
903 294 : self.timeline_id,
904 294 : final_key_range,
905 294 : self.lsn,
906 294 : metadata.len(),
907 294 : );
908 :
909 : #[cfg(feature = "testing")]
910 294 : if let Some(end_key) = end_key {
911 26 : assert!(
912 26 : self.last_written_key < end_key,
913 0 : "written key violates end_key range"
914 : );
915 268 : }
916 :
917 : // Note: Because we open the file in write-only mode, we cannot
918 : // reuse the same VirtualFile for reading later. That's why we don't
919 : // set inner.file here. The first read will have to re-open it.
920 :
921 : // fsync the file
922 294 : file.sync_all()
923 149 : .await
924 294 : .maybe_fatal_err("image_layer sync_all")?;
925 :
926 294 : trace!("created image layer {}", self.path);
927 :
928 294 : Ok((desc, self.path))
929 294 : }
930 : }
931 :
932 : /// A builder object for constructing a new image layer.
933 : ///
934 : /// Usage:
935 : ///
936 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
937 : ///
938 : /// 2. Write the contents by calling `put_page_image` for every key-value
939 : /// pair in the key range.
940 : ///
941 : /// 3. Call `finish`.
942 : ///
943 : /// # Note
944 : ///
945 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
946 : /// possible for the writer to drop before `finish` is actually called. So this
947 : /// could lead to odd temporary files in the directory, exhausting file system.
948 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
949 : /// implementation that cleans up the temporary file in failure. It's not
950 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
951 : /// out some fields, making it impossible to implement `Drop`.
952 : ///
953 : #[must_use]
954 : pub struct ImageLayerWriter {
955 : inner: Option<ImageLayerWriterInner>,
956 : }
957 :
958 : impl ImageLayerWriter {
959 : ///
960 : /// Start building a new image layer.
961 : ///
962 490 : pub async fn new(
963 490 : conf: &'static PageServerConf,
964 490 : timeline_id: TimelineId,
965 490 : tenant_shard_id: TenantShardId,
966 490 : key_range: &Range<Key>,
967 490 : lsn: Lsn,
968 490 : ctx: &RequestContext,
969 490 : ) -> anyhow::Result<ImageLayerWriter> {
970 490 : Ok(Self {
971 490 : inner: Some(
972 490 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
973 328 : .await?,
974 : ),
975 : })
976 490 : }
977 :
978 : ///
979 : /// Write next value to the file.
980 : ///
981 : /// The page versions must be appended in blknum order.
982 : ///
983 546152 : pub async fn put_image(
984 546152 : &mut self,
985 546152 : key: Key,
986 546152 : img: Bytes,
987 546152 : ctx: &RequestContext,
988 546152 : ) -> anyhow::Result<()> {
989 554810 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
990 546152 : }
991 :
992 : /// Estimated size of the image layer.
993 8382 : pub(crate) fn estimated_size(&self) -> u64 {
994 8382 : let inner = self.inner.as_ref().unwrap();
995 8382 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
996 8382 : }
997 :
998 8442 : pub(crate) fn num_keys(&self) -> usize {
999 8442 : self.inner.as_ref().unwrap().num_keys
1000 8442 : }
1001 :
1002 : ///
1003 : /// Finish writing the image layer.
1004 : ///
1005 268 : pub(crate) async fn finish(
1006 268 : mut self,
1007 268 : ctx: &RequestContext,
1008 268 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1009 776 : self.inner.take().unwrap().finish(ctx, None).await
1010 268 : }
1011 :
1012 : /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
1013 26 : pub(super) async fn finish_with_end_key(
1014 26 : mut self,
1015 26 : end_key: Key,
1016 26 : ctx: &RequestContext,
1017 26 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1018 53 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
1019 26 : }
1020 : }
1021 :
1022 : impl Drop for ImageLayerWriter {
1023 490 : fn drop(&mut self) {
1024 490 : if let Some(inner) = self.inner.take() {
1025 196 : inner.blob_writer.into_inner().remove();
1026 294 : }
1027 490 : }
1028 : }
1029 :
1030 : pub struct ImageLayerIterator<'a> {
1031 : image_layer: &'a ImageLayerInner,
1032 : ctx: &'a RequestContext,
1033 : planner: StreamingVectoredReadPlanner,
1034 : index_iter: DiskBtreeIterator<'a>,
1035 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1036 : is_end: bool,
1037 : }
1038 :
1039 : impl<'a> ImageLayerIterator<'a> {
1040 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1041 0 : self.image_layer.layer_dbg_info()
1042 0 : }
1043 :
1044 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1045 19096 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1046 19096 : assert!(self.key_values_batch.is_empty());
1047 19096 : assert!(!self.is_end);
1048 :
1049 19096 : let plan = loop {
1050 28770 : if let Some(res) = self.index_iter.next().await {
1051 28706 : let (raw_key, offset) = res?;
1052 28706 : if let Some(batch_plan) = self.planner.handle(
1053 28706 : Key::from_slice(&raw_key[..KEY_SIZE]),
1054 28706 : self.image_layer.lsn,
1055 28706 : offset,
1056 28706 : ) {
1057 19032 : break batch_plan;
1058 9674 : }
1059 : } else {
1060 64 : self.is_end = true;
1061 64 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1062 64 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1063 64 : break item;
1064 : } else {
1065 0 : return Ok(()); // TODO: a test case on empty iterator
1066 : }
1067 : }
1068 : };
1069 19096 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1070 19096 : let mut next_batch = std::collections::VecDeque::new();
1071 19096 : let buf_size = plan.size();
1072 19096 : let buf = IoBufferMut::with_capacity(buf_size);
1073 19096 : let blobs_buf = vectored_blob_reader
1074 19096 : .read_blobs(&plan, buf, self.ctx)
1075 9696 : .await?;
1076 19096 : let view = BufView::new_slice(&blobs_buf.buf);
1077 28678 : for meta in blobs_buf.blobs.iter() {
1078 28678 : let img_buf = meta.read(&view).await?;
1079 28678 : next_batch.push_back((
1080 28678 : meta.meta.key,
1081 28678 : self.image_layer.lsn,
1082 28678 : Value::Image(img_buf.into_bytes()),
1083 28678 : ));
1084 : }
1085 19096 : self.key_values_batch = next_batch;
1086 19096 : Ok(())
1087 19096 : }
1088 :
1089 28508 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1090 28508 : if self.key_values_batch.is_empty() {
1091 19112 : if self.is_end {
1092 100 : return Ok(None);
1093 19012 : }
1094 19012 : self.next_batch().await?;
1095 9396 : }
1096 28408 : Ok(Some(
1097 28408 : self.key_values_batch
1098 28408 : .pop_front()
1099 28408 : .expect("should not be empty"),
1100 28408 : ))
1101 28508 : }
1102 : }
1103 :
1104 : #[cfg(test)]
1105 : mod test {
1106 : use std::{sync::Arc, time::Duration};
1107 :
1108 : use bytes::Bytes;
1109 : use itertools::Itertools;
1110 : use pageserver_api::{
1111 : key::Key,
1112 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1113 : };
1114 : use utils::{
1115 : generation::Generation,
1116 : id::{TenantId, TimelineId},
1117 : lsn::Lsn,
1118 : };
1119 :
1120 : use crate::{
1121 : context::RequestContext,
1122 : repository::Value,
1123 : tenant::{
1124 : config::TenantConf,
1125 : harness::{TenantHarness, TIMELINE_ID},
1126 : storage_layer::{Layer, ResidentLayer},
1127 : vectored_blob_io::StreamingVectoredReadPlanner,
1128 : Tenant, Timeline,
1129 : },
1130 : DEFAULT_PG_VERSION,
1131 : };
1132 :
1133 : use super::{ImageLayerIterator, ImageLayerWriter};
1134 :
1135 : #[tokio::test]
1136 2 : async fn image_layer_rewrite() {
1137 2 : let tenant_conf = TenantConf {
1138 2 : gc_period: Duration::ZERO,
1139 2 : compaction_period: Duration::ZERO,
1140 2 : ..TenantConf::default()
1141 2 : };
1142 2 : let tenant_id = TenantId::generate();
1143 2 : let mut gen = Generation::new(0xdead0001);
1144 10 : let mut get_next_gen = || {
1145 10 : let ret = gen;
1146 10 : gen = gen.next();
1147 10 : ret
1148 10 : };
1149 2 : // The LSN at which we will create an image layer to filter
1150 2 : let lsn = Lsn(0xdeadbeef0000);
1151 2 : let timeline_id = TimelineId::generate();
1152 2 :
1153 2 : //
1154 2 : // Create an unsharded parent with a layer.
1155 2 : //
1156 2 :
1157 2 : let harness = TenantHarness::create_custom(
1158 2 : "test_image_layer_rewrite--parent",
1159 2 : tenant_conf.clone(),
1160 2 : tenant_id,
1161 2 : ShardIdentity::unsharded(),
1162 2 : get_next_gen(),
1163 2 : )
1164 2 : .await
1165 2 : .unwrap();
1166 8 : let (tenant, ctx) = harness.load().await;
1167 2 : let timeline = tenant
1168 2 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1169 4 : .await
1170 2 : .unwrap();
1171 2 :
1172 2 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1173 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1174 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1175 2 : let range = input_start..input_end;
1176 2 :
1177 2 : // Build an image layer to filter
1178 2 : let resident = {
1179 2 : let mut writer = ImageLayerWriter::new(
1180 2 : harness.conf,
1181 2 : timeline_id,
1182 2 : harness.tenant_shard_id,
1183 2 : &range,
1184 2 : lsn,
1185 2 : &ctx,
1186 2 : )
1187 2 : .await
1188 2 : .unwrap();
1189 2 :
1190 2 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1191 2 : let mut key = range.start;
1192 262146 : while key < range.end {
1193 266239 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1194 262144 :
1195 262144 : key = key.next();
1196 2 : }
1197 119 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1198 2 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1199 2 : };
1200 2 : let original_size = resident.metadata().file_size;
1201 2 :
1202 2 : //
1203 2 : // Create child shards and do the rewrite, exercising filter().
1204 2 : // TODO: abstraction in TenantHarness for splits.
1205 2 : //
1206 2 :
1207 2 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1208 2 : // range, middle of key range.
1209 2 : let shard_count = ShardCount::new(4);
1210 8 : for shard_number in 0..shard_count.count() {
1211 2 : //
1212 2 : // mimic the shard split
1213 2 : //
1214 8 : let shard_identity = ShardIdentity::new(
1215 8 : ShardNumber(shard_number),
1216 8 : shard_count,
1217 8 : ShardStripeSize(0x8000),
1218 8 : )
1219 8 : .unwrap();
1220 8 : let harness = TenantHarness::create_custom(
1221 8 : Box::leak(Box::new(format!(
1222 8 : "test_image_layer_rewrite--child{}",
1223 8 : shard_identity.shard_slug()
1224 8 : ))),
1225 8 : tenant_conf.clone(),
1226 8 : tenant_id,
1227 8 : shard_identity,
1228 8 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1229 8 : // But here, all we care about is that the gen number is unique.
1230 8 : get_next_gen(),
1231 8 : )
1232 2 : .await
1233 8 : .unwrap();
1234 32 : let (tenant, ctx) = harness.load().await;
1235 8 : let timeline = tenant
1236 8 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1237 12 : .await
1238 8 : .unwrap();
1239 2 :
1240 2 : //
1241 2 : // use filter() and make assertions
1242 2 : //
1243 2 :
1244 8 : let mut filtered_writer = ImageLayerWriter::new(
1245 8 : harness.conf,
1246 8 : timeline_id,
1247 8 : harness.tenant_shard_id,
1248 8 : &range,
1249 8 : lsn,
1250 8 : &ctx,
1251 8 : )
1252 4 : .await
1253 8 : .unwrap();
1254 2 :
1255 8 : let wrote_keys = resident
1256 8 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1257 266719 : .await
1258 8 : .unwrap();
1259 8 : let replacement = if wrote_keys > 0 {
1260 129 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1261 6 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1262 6 : Some(resident)
1263 2 : } else {
1264 2 : None
1265 2 : };
1266 2 :
1267 2 : // This exact size and those below will need updating as/when the layer encoding changes, but
1268 2 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1269 8 : assert_eq!(original_size, 1597440);
1270 2 :
1271 8 : match shard_number {
1272 2 : 0 => {
1273 2 : // We should have written out just one stripe for our shard identity
1274 2 : assert_eq!(wrote_keys, 0x8000);
1275 2 : let replacement = replacement.unwrap();
1276 2 :
1277 2 : // We should have dropped some of the data
1278 2 : assert!(replacement.metadata().file_size < original_size);
1279 2 : assert!(replacement.metadata().file_size > 0);
1280 2 :
1281 2 : // Assert that we dropped ~3/4 of the data.
1282 2 : assert_eq!(replacement.metadata().file_size, 417792);
1283 2 : }
1284 2 : 1 => {
1285 2 : // Shard 1 has no keys in our input range
1286 2 : assert_eq!(wrote_keys, 0x0);
1287 2 : assert!(replacement.is_none());
1288 2 : }
1289 2 : 2 => {
1290 2 : // Shard 2 has one stripes in the input range
1291 2 : assert_eq!(wrote_keys, 0x8000);
1292 2 : let replacement = replacement.unwrap();
1293 2 : assert!(replacement.metadata().file_size < original_size);
1294 2 : assert!(replacement.metadata().file_size > 0);
1295 2 : assert_eq!(replacement.metadata().file_size, 417792);
1296 2 : }
1297 2 : 3 => {
1298 2 : // Shard 3 has two stripes in the input range
1299 2 : assert_eq!(wrote_keys, 0x10000);
1300 2 : let replacement = replacement.unwrap();
1301 2 : assert!(replacement.metadata().file_size < original_size);
1302 2 : assert!(replacement.metadata().file_size > 0);
1303 2 : assert_eq!(replacement.metadata().file_size, 811008);
1304 2 : }
1305 2 : _ => unreachable!(),
1306 2 : }
1307 2 : }
1308 2 : }
1309 :
1310 2 : async fn produce_image_layer(
1311 2 : tenant: &Tenant,
1312 2 : tline: &Arc<Timeline>,
1313 2 : mut images: Vec<(Key, Bytes)>,
1314 2 : lsn: Lsn,
1315 2 : ctx: &RequestContext,
1316 2 : ) -> anyhow::Result<ResidentLayer> {
1317 2 : images.sort();
1318 2 : let (key_start, _) = images.first().unwrap();
1319 2 : let (key_last, _) = images.last().unwrap();
1320 2 : let key_end = key_last.next();
1321 2 : let key_range = *key_start..key_end;
1322 2 : let mut writer = ImageLayerWriter::new(
1323 2 : tenant.conf,
1324 2 : tline.timeline_id,
1325 2 : tenant.tenant_shard_id,
1326 2 : &key_range,
1327 2 : lsn,
1328 2 : ctx,
1329 2 : )
1330 1 : .await?;
1331 :
1332 2002 : for (key, img) in images {
1333 2031 : writer.put_image(key, img, ctx).await?;
1334 : }
1335 4 : let (desc, path) = writer.finish(ctx).await?;
1336 2 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1337 :
1338 2 : Ok::<_, anyhow::Error>(img_layer)
1339 2 : }
1340 :
1341 28 : async fn assert_img_iter_equal(
1342 28 : img_iter: &mut ImageLayerIterator<'_>,
1343 28 : expect: &[(Key, Bytes)],
1344 28 : expect_lsn: Lsn,
1345 28 : ) {
1346 28 : let mut expect_iter = expect.iter();
1347 : loop {
1348 28028 : let o1 = img_iter.next().await.unwrap();
1349 28028 : let o2 = expect_iter.next();
1350 28028 : match (o1, o2) {
1351 28 : (None, None) => break,
1352 28000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1353 28000 : let Value::Image(i1) = v1 else {
1354 0 : panic!("expect Value::Image")
1355 : };
1356 28000 : assert_eq!(&k1, k2);
1357 28000 : assert_eq!(l1, expect_lsn);
1358 28000 : assert_eq!(&i1, i2);
1359 : }
1360 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1361 : }
1362 : }
1363 28 : }
1364 :
1365 : #[tokio::test]
1366 2 : async fn image_layer_iterator() {
1367 2 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1368 8 : let (tenant, ctx) = harness.load().await;
1369 2 :
1370 2 : let tline = tenant
1371 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1372 4 : .await
1373 2 : .unwrap();
1374 2 :
1375 2000 : fn get_key(id: u32) -> Key {
1376 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1377 2000 : key.field6 = id;
1378 2000 : key
1379 2000 : }
1380 2 : const N: usize = 1000;
1381 2 : let test_imgs = (0..N)
1382 2000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1383 2 : .collect_vec();
1384 2 : let resident_layer =
1385 2 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1386 2036 : .await
1387 2 : .unwrap();
1388 2 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1389 6 : for max_read_size in [1, 1024] {
1390 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1391 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1392 28 : // Test if the batch size is correctly determined
1393 28 : let mut iter = img_layer.iter(&ctx);
1394 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1395 28 : let mut num_items = 0;
1396 112 : for _ in 0..3 {
1397 84 : iter.next_batch().await.unwrap();
1398 84 : num_items += iter.key_values_batch.len();
1399 84 : if max_read_size == 1 {
1400 2 : // every key should be a batch b/c the value is larger than max_read_size
1401 42 : assert_eq!(iter.key_values_batch.len(), 1);
1402 2 : } else {
1403 42 : assert!(iter.key_values_batch.len() <= batch_size);
1404 2 : }
1405 84 : if num_items >= N {
1406 2 : break;
1407 84 : }
1408 84 : iter.key_values_batch.clear();
1409 2 : }
1410 2 : // Test if the result is correct
1411 28 : let mut iter = img_layer.iter(&ctx);
1412 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1413 9635 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1414 2 : }
1415 2 : }
1416 2 : }
1417 : }
|