Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{
33 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
34 : };
35 : use crate::tenant::timeline::GetVectoredError;
36 : use crate::tenant::vectored_blob_io::{
37 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
38 : VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
42 : use crate::virtual_file::{self, VirtualFile};
43 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
44 : use anyhow::{anyhow, bail, ensure, Context, Result};
45 : use bytes::{Bytes, BytesMut};
46 : use camino::{Utf8Path, Utf8PathBuf};
47 : use hex;
48 : use itertools::Itertools;
49 : use pageserver_api::keyspace::KeySpace;
50 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
51 : use rand::{distributions::Alphanumeric, Rng};
52 : use serde::{Deserialize, Serialize};
53 : use std::collections::VecDeque;
54 : use std::fs::File;
55 : use std::io::SeekFrom;
56 : use std::ops::Range;
57 : use std::os::unix::prelude::FileExt;
58 : use std::str::FromStr;
59 : use std::sync::Arc;
60 : use tokio::sync::OnceCell;
61 : use tokio_stream::StreamExt;
62 : use tracing::*;
63 :
64 : use utils::{
65 : bin_ser::BeSer,
66 : id::{TenantId, TimelineId},
67 : lsn::Lsn,
68 : };
69 :
70 : use super::layer_name::ImageLayerName;
71 : use super::{
72 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
73 : };
74 :
75 : ///
76 : /// Header stored in the beginning of the file
77 : ///
78 : /// After this comes the 'values' part, starting on block 1. After that,
79 : /// the 'index' starts at the block indicated by 'index_start_blk'
80 : ///
81 306 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
82 : pub struct Summary {
83 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
84 : pub magic: u16,
85 : pub format_version: u16,
86 :
87 : pub tenant_id: TenantId,
88 : pub timeline_id: TimelineId,
89 : pub key_range: Range<Key>,
90 : pub lsn: Lsn,
91 :
92 : /// Block number where the 'index' part of the file begins.
93 : pub index_start_blk: u32,
94 : /// Block within the 'index', where the B-tree root page is stored
95 : pub index_root_blk: u32,
96 : // the 'values' part starts after the summary header, on block 1.
97 : }
98 :
99 : impl From<&ImageLayer> for Summary {
100 0 : fn from(layer: &ImageLayer) -> Self {
101 0 : Self::expected(
102 0 : layer.desc.tenant_shard_id.tenant_id,
103 0 : layer.desc.timeline_id,
104 0 : layer.desc.key_range.clone(),
105 0 : layer.lsn,
106 0 : )
107 0 : }
108 : }
109 :
110 : impl Summary {
111 306 : pub(super) fn expected(
112 306 : tenant_id: TenantId,
113 306 : timeline_id: TimelineId,
114 306 : key_range: Range<Key>,
115 306 : lsn: Lsn,
116 306 : ) -> Self {
117 306 : Self {
118 306 : magic: IMAGE_FILE_MAGIC,
119 306 : format_version: STORAGE_FORMAT_VERSION,
120 306 : tenant_id,
121 306 : timeline_id,
122 306 : key_range,
123 306 : lsn,
124 306 :
125 306 : index_start_blk: 0,
126 306 : index_root_blk: 0,
127 306 : }
128 306 : }
129 : }
130 :
131 : /// This is used only from `pagectl`. Within pageserver, all layers are
132 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
133 : pub struct ImageLayer {
134 : path: Utf8PathBuf,
135 : pub desc: PersistentLayerDesc,
136 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
137 : pub lsn: Lsn,
138 : inner: OnceCell<ImageLayerInner>,
139 : }
140 :
141 : impl std::fmt::Debug for ImageLayer {
142 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 0 : use super::RangeDisplayDebug;
144 0 :
145 0 : f.debug_struct("ImageLayer")
146 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
147 0 : .field("file_size", &self.desc.file_size)
148 0 : .field("lsn", &self.lsn)
149 0 : .field("inner", &self.inner)
150 0 : .finish()
151 0 : }
152 : }
153 :
154 : /// ImageLayer is the in-memory data structure associated with an on-disk image
155 : /// file.
156 : pub struct ImageLayerInner {
157 : // values copied from summary
158 : index_start_blk: u32,
159 : index_root_blk: u32,
160 :
161 : key_range: Range<Key>,
162 : lsn: Lsn,
163 :
164 : file: VirtualFile,
165 : file_id: FileId,
166 :
167 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
168 : }
169 :
170 : impl ImageLayerInner {
171 0 : pub(crate) fn layer_dbg_info(&self) -> String {
172 0 : format!(
173 0 : "image {}..{} {}",
174 0 : self.key_range().start,
175 0 : self.key_range().end,
176 0 : self.lsn()
177 0 : )
178 0 : }
179 : }
180 :
181 : impl std::fmt::Debug for ImageLayerInner {
182 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 0 : f.debug_struct("ImageLayerInner")
184 0 : .field("index_start_blk", &self.index_start_blk)
185 0 : .field("index_root_blk", &self.index_root_blk)
186 0 : .finish()
187 0 : }
188 : }
189 :
190 : impl ImageLayerInner {
191 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
192 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
193 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
194 0 : self.index_start_blk,
195 0 : self.index_root_blk,
196 0 : block_reader,
197 0 : );
198 0 :
199 0 : tree_reader.dump().await?;
200 :
201 0 : tree_reader
202 0 : .visit(
203 0 : &[0u8; KEY_SIZE],
204 0 : VisitDirection::Forwards,
205 0 : |key, value| {
206 0 : println!("key: {} offset {}", hex::encode(key), value);
207 0 : true
208 0 : },
209 0 : ctx,
210 0 : )
211 0 : .await?;
212 :
213 0 : Ok(())
214 0 : }
215 : }
216 :
217 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
218 : impl std::fmt::Display for ImageLayer {
219 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 0 : write!(f, "{}", self.layer_desc().short_id())
221 0 : }
222 : }
223 :
224 : impl AsLayerDesc for ImageLayer {
225 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
226 0 : &self.desc
227 0 : }
228 : }
229 :
230 : impl ImageLayer {
231 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
232 0 : self.desc.dump();
233 0 :
234 0 : if !verbose {
235 0 : return Ok(());
236 0 : }
237 :
238 0 : let inner = self.load(ctx).await?;
239 :
240 0 : inner.dump(ctx).await?;
241 :
242 0 : Ok(())
243 0 : }
244 :
245 1470 : fn temp_path_for(
246 1470 : conf: &PageServerConf,
247 1470 : timeline_id: TimelineId,
248 1470 : tenant_shard_id: TenantShardId,
249 1470 : fname: &ImageLayerName,
250 1470 : ) -> Utf8PathBuf {
251 1470 : let rand_string: String = rand::thread_rng()
252 1470 : .sample_iter(&Alphanumeric)
253 1470 : .take(8)
254 1470 : .map(char::from)
255 1470 : .collect();
256 1470 :
257 1470 : conf.timeline_path(&tenant_shard_id, &timeline_id)
258 1470 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
259 1470 : }
260 :
261 : ///
262 : /// Open the underlying file and read the metadata into memory, if it's
263 : /// not loaded already.
264 : ///
265 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
266 0 : self.inner
267 0 : .get_or_try_init(|| self.load_inner(ctx))
268 0 : .await
269 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
270 0 : }
271 :
272 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
273 0 : let path = self.path();
274 :
275 0 : let loaded =
276 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
277 :
278 : // not production code
279 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
280 0 : let expected_layer_name = self.layer_desc().layer_name();
281 0 :
282 0 : if actual_layer_name != expected_layer_name {
283 0 : println!("warning: filename does not match what is expected from in-file summary");
284 0 : println!("actual: {:?}", actual_layer_name.to_string());
285 0 : println!("expected: {:?}", expected_layer_name.to_string());
286 0 : }
287 :
288 0 : Ok(loaded)
289 0 : }
290 :
291 : /// Create an ImageLayer struct representing an existing file on disk.
292 : ///
293 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
294 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
295 0 : let mut summary_buf = vec![0; PAGE_SZ];
296 0 : file.read_exact_at(&mut summary_buf, 0)?;
297 0 : let summary = Summary::des_prefix(&summary_buf)?;
298 0 : let metadata = file
299 0 : .metadata()
300 0 : .context("get file metadata to determine size")?;
301 :
302 : // This function is never used for constructing layers in a running pageserver,
303 : // so it does not need an accurate TenantShardId.
304 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
305 0 :
306 0 : Ok(ImageLayer {
307 0 : path: path.to_path_buf(),
308 0 : desc: PersistentLayerDesc::new_img(
309 0 : tenant_shard_id,
310 0 : summary.timeline_id,
311 0 : summary.key_range,
312 0 : summary.lsn,
313 0 : metadata.len(),
314 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
315 0 : lsn: summary.lsn,
316 0 : inner: OnceCell::new(),
317 0 : })
318 0 : }
319 :
320 0 : fn path(&self) -> Utf8PathBuf {
321 0 : self.path.clone()
322 0 : }
323 : }
324 :
325 0 : #[derive(thiserror::Error, Debug)]
326 : pub enum RewriteSummaryError {
327 : #[error("magic mismatch")]
328 : MagicMismatch,
329 : #[error(transparent)]
330 : Other(#[from] anyhow::Error),
331 : }
332 :
333 : impl From<std::io::Error> for RewriteSummaryError {
334 0 : fn from(e: std::io::Error) -> Self {
335 0 : Self::Other(anyhow::anyhow!(e))
336 0 : }
337 : }
338 :
339 : impl ImageLayer {
340 0 : pub async fn rewrite_summary<F>(
341 0 : path: &Utf8Path,
342 0 : rewrite: F,
343 0 : ctx: &RequestContext,
344 0 : ) -> Result<(), RewriteSummaryError>
345 0 : where
346 0 : F: Fn(Summary) -> Summary,
347 0 : {
348 0 : let mut file = VirtualFile::open_with_options(
349 0 : path,
350 0 : virtual_file::OpenOptions::new().read(true).write(true),
351 0 : ctx,
352 0 : )
353 0 : .await
354 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
355 0 : let file_id = page_cache::next_file_id();
356 0 : let block_reader = FileBlockReader::new(&file, file_id);
357 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
358 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
359 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
360 0 : return Err(RewriteSummaryError::MagicMismatch);
361 0 : }
362 0 :
363 0 : let new_summary = rewrite(actual_summary);
364 0 :
365 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
366 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
367 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
368 0 : file.seek(SeekFrom::Start(0)).await?;
369 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
370 0 : res?;
371 0 : Ok(())
372 0 : }
373 : }
374 :
375 : impl ImageLayerInner {
376 108 : pub(crate) fn key_range(&self) -> &Range<Key> {
377 108 : &self.key_range
378 108 : }
379 :
380 108 : pub(crate) fn lsn(&self) -> Lsn {
381 108 : self.lsn
382 108 : }
383 :
384 306 : pub(super) async fn load(
385 306 : path: &Utf8Path,
386 306 : lsn: Lsn,
387 306 : summary: Option<Summary>,
388 306 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
389 306 : ctx: &RequestContext,
390 306 : ) -> anyhow::Result<Self> {
391 306 : let file = VirtualFile::open(path, ctx)
392 153 : .await
393 306 : .context("open layer file")?;
394 306 : let file_id = page_cache::next_file_id();
395 306 : let block_reader = FileBlockReader::new(&file, file_id);
396 306 : let summary_blk = block_reader
397 306 : .read_blk(0, ctx)
398 157 : .await
399 306 : .context("read first block")?;
400 :
401 : // length is the only way how this could fail, so it's not actually likely at all unless
402 : // read_blk returns wrong sized block.
403 : //
404 : // TODO: confirm and make this into assertion
405 306 : let actual_summary =
406 306 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
407 :
408 306 : if let Some(mut expected_summary) = summary {
409 : // production code path
410 306 : expected_summary.index_start_blk = actual_summary.index_start_blk;
411 306 : expected_summary.index_root_blk = actual_summary.index_root_blk;
412 306 : // mask out the timeline_id, but still require the layers to be from the same tenant
413 306 : expected_summary.timeline_id = actual_summary.timeline_id;
414 306 :
415 306 : if actual_summary != expected_summary {
416 0 : bail!(
417 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
418 0 : actual_summary,
419 0 : expected_summary
420 0 : );
421 306 : }
422 0 : }
423 :
424 306 : Ok(ImageLayerInner {
425 306 : index_start_blk: actual_summary.index_start_blk,
426 306 : index_root_blk: actual_summary.index_root_blk,
427 306 : lsn,
428 306 : file,
429 306 : file_id,
430 306 : max_vectored_read_bytes,
431 306 : key_range: actual_summary.key_range,
432 306 : })
433 306 : }
434 :
435 : // Look up the keys in the provided keyspace and update
436 : // the reconstruct state with whatever is found.
437 24182 : pub(super) async fn get_values_reconstruct_data(
438 24182 : &self,
439 24182 : keyspace: KeySpace,
440 24182 : reconstruct_state: &mut ValuesReconstructState,
441 24182 : ctx: &RequestContext,
442 24182 : ) -> Result<(), GetVectoredError> {
443 24182 : let reads = self
444 24182 : .plan_reads(keyspace, None, ctx)
445 2146 : .await
446 24182 : .map_err(GetVectoredError::Other)?;
447 :
448 24182 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
449 14185 : .await;
450 :
451 24182 : reconstruct_state.on_image_layer_visited(&self.key_range);
452 24182 :
453 24182 : Ok(())
454 24182 : }
455 :
456 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
457 0 : pub(super) async fn load_key_values(
458 0 : &self,
459 0 : ctx: &RequestContext,
460 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
461 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
462 0 : let tree_reader =
463 0 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
464 0 : let mut result = Vec::new();
465 0 : let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
466 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
467 0 : let cursor = block_reader.block_cursor();
468 0 : while let Some(item) = stream.next().await {
469 : // TODO: dedup code with get_reconstruct_value
470 0 : let (raw_key, offset) = item?;
471 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
472 : // TODO: ctx handling and sharding
473 0 : let blob = cursor
474 0 : .read_blob(offset, ctx)
475 0 : .await
476 0 : .with_context(|| format!("failed to read value from offset {}", offset))?;
477 0 : let value = Bytes::from(blob);
478 0 : result.push((key, self.lsn, Value::Image(value)));
479 : }
480 0 : Ok(result)
481 0 : }
482 :
483 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
484 : /// and the keys in this layer.
485 : ///
486 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
487 : /// this shard.
488 24206 : async fn plan_reads(
489 24206 : &self,
490 24206 : keyspace: KeySpace,
491 24206 : shard_identity: Option<&ShardIdentity>,
492 24206 : ctx: &RequestContext,
493 24206 : ) -> anyhow::Result<Vec<VectoredRead>> {
494 24206 : let mut planner = VectoredReadPlanner::new(
495 24206 : self.max_vectored_read_bytes
496 24206 : .expect("Layer is loaded with max vectored bytes config")
497 24206 : .0
498 24206 : .into(),
499 24206 : );
500 24206 :
501 24206 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
502 24206 : let tree_reader =
503 24206 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
504 24206 :
505 24206 : let ctx = RequestContextBuilder::extend(ctx)
506 24206 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
507 24206 : .build();
508 :
509 89079 : for range in keyspace.ranges.iter() {
510 89079 : let mut range_end_handled = false;
511 89079 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
512 89079 : range.start.write_to_byte_slice(&mut search_key);
513 89079 :
514 89079 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
515 89079 : let mut index_stream = std::pin::pin!(index_stream);
516 :
517 3290085 : while let Some(index_entry) = index_stream.next().await {
518 3289483 : let (raw_key, offset) = index_entry?;
519 :
520 3289483 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
521 3289483 : assert!(key >= range.start);
522 :
523 3289483 : let flag = if let Some(shard_identity) = shard_identity {
524 3145728 : if shard_identity.is_key_disposable(&key) {
525 2359296 : BlobFlag::Ignore
526 : } else {
527 786432 : BlobFlag::None
528 : }
529 : } else {
530 143755 : BlobFlag::None
531 : };
532 :
533 3289483 : if key >= range.end {
534 88477 : planner.handle_range_end(offset);
535 88477 : range_end_handled = true;
536 88477 : break;
537 3201006 : } else {
538 3201006 : planner.handle(key, self.lsn, offset, flag);
539 3201006 : }
540 : }
541 :
542 89079 : if !range_end_handled {
543 602 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
544 602 : planner.handle_range_end(payload_end);
545 88477 : }
546 : }
547 :
548 24206 : Ok(planner.finish())
549 24206 : }
550 :
551 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
552 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
553 24 : pub(super) async fn filter(
554 24 : &self,
555 24 : shard_identity: &ShardIdentity,
556 24 : writer: &mut ImageLayerWriter,
557 24 : ctx: &RequestContext,
558 24 : ) -> anyhow::Result<usize> {
559 : // Fragment the range into the regions owned by this ShardIdentity
560 24 : let plan = self
561 24 : .plan_reads(
562 24 : KeySpace {
563 24 : // If asked for the total key space, plan_reads will give us all the keys in the layer
564 24 : ranges: vec![Key::MIN..Key::MAX],
565 24 : },
566 24 : Some(shard_identity),
567 24 : ctx,
568 24 : )
569 1407 : .await?;
570 :
571 24 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
572 24 : let mut key_count = 0;
573 48 : for read in plan.into_iter() {
574 48 : let buf_size = read.size();
575 48 :
576 48 : let buf = BytesMut::with_capacity(buf_size);
577 48 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
578 :
579 48 : let frozen_buf = blobs_buf.buf.freeze();
580 :
581 786432 : for meta in blobs_buf.blobs.iter() {
582 786432 : let img_buf = frozen_buf.slice(meta.start..meta.end);
583 786432 :
584 786432 : key_count += 1;
585 786432 : writer
586 786432 : .put_image(meta.meta.key, img_buf, ctx)
587 798720 : .await
588 786432 : .context(format!("Storing key {}", meta.meta.key))?;
589 : }
590 : }
591 :
592 24 : Ok(key_count)
593 24 : }
594 :
595 24182 : async fn do_reads_and_update_state(
596 24182 : &self,
597 24182 : reads: Vec<VectoredRead>,
598 24182 : reconstruct_state: &mut ValuesReconstructState,
599 24182 : ctx: &RequestContext,
600 24182 : ) {
601 24182 : let max_vectored_read_bytes = self
602 24182 : .max_vectored_read_bytes
603 24182 : .expect("Layer is loaded with max vectored bytes config")
604 24182 : .0
605 24182 : .into();
606 24182 :
607 24182 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
608 27825 : for read in reads.into_iter() {
609 27825 : let buf_size = read.size();
610 27825 :
611 27825 : if buf_size > max_vectored_read_bytes {
612 : // If the read is oversized, it should only contain one key.
613 0 : let offenders = read
614 0 : .blobs_at
615 0 : .as_slice()
616 0 : .iter()
617 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
618 0 : .join(", ");
619 0 : tracing::warn!(
620 0 : "Oversized vectored read ({} > {}) for keys {}",
621 : buf_size,
622 : max_vectored_read_bytes,
623 : offenders
624 : );
625 27825 : }
626 :
627 27825 : let buf = BytesMut::with_capacity(buf_size);
628 27825 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
629 :
630 27825 : match res {
631 27825 : Ok(blobs_buf) => {
632 27825 : let frozen_buf = blobs_buf.buf.freeze();
633 :
634 55278 : for meta in blobs_buf.blobs.iter() {
635 55278 : let img_buf = frozen_buf.slice(meta.start..meta.end);
636 55278 : reconstruct_state.update_key(
637 55278 : &meta.meta.key,
638 55278 : self.lsn,
639 55278 : Value::Image(img_buf),
640 55278 : );
641 55278 : }
642 : }
643 0 : Err(err) => {
644 0 : let kind = err.kind();
645 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
646 0 : reconstruct_state.on_key_error(
647 0 : blob_meta.key,
648 0 : PageReconstructError::from(anyhow!(
649 0 : "Failed to read blobs from virtual file {}: {}",
650 0 : self.file.path,
651 0 : kind
652 0 : )),
653 0 : );
654 0 : }
655 : }
656 : };
657 : }
658 24182 : }
659 :
660 276 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
661 276 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
662 276 : let tree_reader =
663 276 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
664 276 : ImageLayerIterator {
665 276 : image_layer: self,
666 276 : ctx,
667 276 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
668 276 : key_values_batch: VecDeque::new(),
669 276 : is_end: false,
670 276 : planner: StreamingVectoredReadPlanner::new(
671 276 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
672 276 : 1024, // The default value. Unit tests might use a different value
673 276 : ),
674 276 : }
675 276 : }
676 : }
677 :
678 : /// A builder object for constructing a new image layer.
679 : ///
680 : /// Usage:
681 : ///
682 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
683 : ///
684 : /// 2. Write the contents by calling `put_page_image` for every key-value
685 : /// pair in the key range.
686 : ///
687 : /// 3. Call `finish`.
688 : ///
689 : struct ImageLayerWriterInner {
690 : conf: &'static PageServerConf,
691 : path: Utf8PathBuf,
692 : timeline_id: TimelineId,
693 : tenant_shard_id: TenantShardId,
694 : key_range: Range<Key>,
695 : lsn: Lsn,
696 :
697 : // Total uncompressed bytes passed into put_image
698 : uncompressed_bytes: u64,
699 :
700 : // Like `uncompressed_bytes`,
701 : // but only of images we might consider for compression
702 : uncompressed_bytes_eligible: u64,
703 :
704 : // Like `uncompressed_bytes`, but only of images
705 : // where we have chosen their compressed form
706 : uncompressed_bytes_chosen: u64,
707 :
708 : // Number of keys in the layer.
709 : num_keys: usize,
710 :
711 : blob_writer: BlobWriter<false>,
712 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
713 :
714 : #[cfg_attr(not(feature = "testing"), allow(dead_code))]
715 : last_written_key: Key,
716 : }
717 :
718 : impl ImageLayerWriterInner {
719 : ///
720 : /// Start building a new image layer.
721 : ///
722 1470 : async fn new(
723 1470 : conf: &'static PageServerConf,
724 1470 : timeline_id: TimelineId,
725 1470 : tenant_shard_id: TenantShardId,
726 1470 : key_range: &Range<Key>,
727 1470 : lsn: Lsn,
728 1470 : ctx: &RequestContext,
729 1470 : ) -> anyhow::Result<Self> {
730 1470 : // Create the file initially with a temporary filename.
731 1470 : // We'll atomically rename it to the final name when we're done.
732 1470 : let path = ImageLayer::temp_path_for(
733 1470 : conf,
734 1470 : timeline_id,
735 1470 : tenant_shard_id,
736 1470 : &ImageLayerName {
737 1470 : key_range: key_range.clone(),
738 1470 : lsn,
739 1470 : },
740 1470 : );
741 1470 : trace!("creating image layer {}", path);
742 1470 : let mut file = {
743 1470 : VirtualFile::open_with_options(
744 1470 : &path,
745 1470 : virtual_file::OpenOptions::new()
746 1470 : .write(true)
747 1470 : .create_new(true),
748 1470 : ctx,
749 1470 : )
750 987 : .await?
751 : };
752 : // make room for the header block
753 1470 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
754 1470 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
755 1470 :
756 1470 : // Initialize the b-tree index builder
757 1470 : let block_buf = BlockBuf::new();
758 1470 : let tree_builder = DiskBtreeBuilder::new(block_buf);
759 1470 :
760 1470 : let writer = Self {
761 1470 : conf,
762 1470 : path,
763 1470 : timeline_id,
764 1470 : tenant_shard_id,
765 1470 : key_range: key_range.clone(),
766 1470 : lsn,
767 1470 : tree: tree_builder,
768 1470 : blob_writer,
769 1470 : uncompressed_bytes: 0,
770 1470 : uncompressed_bytes_eligible: 0,
771 1470 : uncompressed_bytes_chosen: 0,
772 1470 : num_keys: 0,
773 1470 : last_written_key: Key::MIN,
774 1470 : };
775 1470 :
776 1470 : Ok(writer)
777 1470 : }
778 :
779 : ///
780 : /// Write next value to the file.
781 : ///
782 : /// The page versions must be appended in blknum order.
783 : ///
784 1638450 : async fn put_image(
785 1638450 : &mut self,
786 1638450 : key: Key,
787 1638450 : img: Bytes,
788 1638450 : ctx: &RequestContext,
789 1638450 : ) -> anyhow::Result<()> {
790 1638450 : ensure!(self.key_range.contains(&key));
791 1638450 : let compression = self.conf.image_compression;
792 1638450 : let uncompressed_len = img.len() as u64;
793 1638450 : self.uncompressed_bytes += uncompressed_len;
794 1638450 : self.num_keys += 1;
795 1638450 : let (_img, res) = self
796 1638450 : .blob_writer
797 1638450 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
798 1664368 : .await;
799 : // TODO: re-use the buffer for `img` further upstack
800 1638450 : let (off, compression_info) = res?;
801 1638450 : if compression_info.compressed_size.is_some() {
802 24006 : // The image has been considered for compression at least
803 24006 : self.uncompressed_bytes_eligible += uncompressed_len;
804 1614444 : }
805 1638450 : if compression_info.written_compressed {
806 0 : // The image has been compressed
807 0 : self.uncompressed_bytes_chosen += uncompressed_len;
808 1638450 : }
809 :
810 1638450 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
811 1638450 : key.write_to_byte_slice(&mut keybuf);
812 1638450 : self.tree.append(&keybuf, off)?;
813 :
814 : #[cfg(feature = "testing")]
815 1638450 : {
816 1638450 : self.last_written_key = key;
817 1638450 : }
818 1638450 :
819 1638450 : Ok(())
820 1638450 : }
821 :
822 : ///
823 : /// Finish writing the image layer.
824 : ///
825 900 : async fn finish(
826 900 : self,
827 900 : timeline: &Arc<Timeline>,
828 900 : ctx: &RequestContext,
829 900 : end_key: Option<Key>,
830 900 : ) -> anyhow::Result<ResidentLayer> {
831 900 : let index_start_blk =
832 900 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
833 900 :
834 900 : // Calculate compression ratio
835 900 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
836 900 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
837 900 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
838 900 : .inc_by(self.uncompressed_bytes_eligible);
839 900 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
840 900 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
841 900 :
842 900 : let mut file = self.blob_writer.into_inner();
843 900 :
844 900 : // Write out the index
845 900 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
846 0 : .await?;
847 900 : let (index_root_blk, block_buf) = self.tree.finish()?;
848 3168 : for buf in block_buf.blocks {
849 2268 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
850 2268 : res?;
851 : }
852 :
853 900 : let final_key_range = if let Some(end_key) = end_key {
854 102 : self.key_range.start..end_key
855 : } else {
856 798 : self.key_range.clone()
857 : };
858 :
859 : // Fill in the summary on blk 0
860 900 : let summary = Summary {
861 900 : magic: IMAGE_FILE_MAGIC,
862 900 : format_version: STORAGE_FORMAT_VERSION,
863 900 : tenant_id: self.tenant_shard_id.tenant_id,
864 900 : timeline_id: self.timeline_id,
865 900 : key_range: final_key_range.clone(),
866 900 : lsn: self.lsn,
867 900 : index_start_blk,
868 900 : index_root_blk,
869 900 : };
870 900 :
871 900 : let mut buf = Vec::with_capacity(PAGE_SZ);
872 900 : // TODO: could use smallvec here but it's a pain with Slice<T>
873 900 : Summary::ser_into(&summary, &mut buf)?;
874 900 : file.seek(SeekFrom::Start(0)).await?;
875 900 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
876 900 : res?;
877 :
878 900 : let metadata = file
879 900 : .metadata()
880 457 : .await
881 900 : .context("get metadata to determine file size")?;
882 :
883 900 : let desc = PersistentLayerDesc::new_img(
884 900 : self.tenant_shard_id,
885 900 : self.timeline_id,
886 900 : final_key_range,
887 900 : self.lsn,
888 900 : metadata.len(),
889 900 : );
890 :
891 : #[cfg(feature = "testing")]
892 900 : if let Some(end_key) = end_key {
893 102 : assert!(
894 102 : self.last_written_key < end_key,
895 0 : "written key violates end_key range"
896 : );
897 798 : }
898 :
899 : // Note: Because we open the file in write-only mode, we cannot
900 : // reuse the same VirtualFile for reading later. That's why we don't
901 : // set inner.file here. The first read will have to re-open it.
902 :
903 : // fsync the file
904 900 : file.sync_all().await?;
905 :
906 : // FIXME: why not carry the virtualfile here, it supports renaming?
907 900 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
908 :
909 900 : info!("created image layer {}", layer.local_path());
910 :
911 900 : Ok(layer)
912 900 : }
913 : }
914 :
915 : /// A builder object for constructing a new image layer.
916 : ///
917 : /// Usage:
918 : ///
919 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
920 : ///
921 : /// 2. Write the contents by calling `put_page_image` for every key-value
922 : /// pair in the key range.
923 : ///
924 : /// 3. Call `finish`.
925 : ///
926 : /// # Note
927 : ///
928 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
929 : /// possible for the writer to drop before `finish` is actually called. So this
930 : /// could lead to odd temporary files in the directory, exhausting file system.
931 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
932 : /// implementation that cleans up the temporary file in failure. It's not
933 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
934 : /// out some fields, making it impossible to implement `Drop`.
935 : ///
936 : #[must_use]
937 : pub struct ImageLayerWriter {
938 : inner: Option<ImageLayerWriterInner>,
939 : }
940 :
941 : impl ImageLayerWriter {
942 : ///
943 : /// Start building a new image layer.
944 : ///
945 1470 : pub async fn new(
946 1470 : conf: &'static PageServerConf,
947 1470 : timeline_id: TimelineId,
948 1470 : tenant_shard_id: TenantShardId,
949 1470 : key_range: &Range<Key>,
950 1470 : lsn: Lsn,
951 1470 : ctx: &RequestContext,
952 1470 : ) -> anyhow::Result<ImageLayerWriter> {
953 1470 : Ok(Self {
954 1470 : inner: Some(
955 1470 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
956 987 : .await?,
957 : ),
958 : })
959 1470 : }
960 :
961 : ///
962 : /// Write next value to the file.
963 : ///
964 : /// The page versions must be appended in blknum order.
965 : ///
966 1638450 : pub async fn put_image(
967 1638450 : &mut self,
968 1638450 : key: Key,
969 1638450 : img: Bytes,
970 1638450 : ctx: &RequestContext,
971 1638450 : ) -> anyhow::Result<()> {
972 1664368 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
973 1638450 : }
974 :
975 : /// Estimated size of the image layer.
976 25146 : pub(crate) fn estimated_size(&self) -> u64 {
977 25146 : let inner = self.inner.as_ref().unwrap();
978 25146 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
979 25146 : }
980 :
981 25326 : pub(crate) fn num_keys(&self) -> usize {
982 25326 : self.inner.as_ref().unwrap().num_keys
983 25326 : }
984 :
985 : ///
986 : /// Finish writing the image layer.
987 : ///
988 798 : pub(crate) async fn finish(
989 798 : mut self,
990 798 : timeline: &Arc<Timeline>,
991 798 : ctx: &RequestContext,
992 798 : ) -> anyhow::Result<super::ResidentLayer> {
993 2326 : self.inner.take().unwrap().finish(timeline, ctx, None).await
994 798 : }
995 :
996 : /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
997 102 : pub(super) async fn finish_with_end_key(
998 102 : mut self,
999 102 : timeline: &Arc<Timeline>,
1000 102 : end_key: Key,
1001 102 : ctx: &RequestContext,
1002 102 : ) -> anyhow::Result<super::ResidentLayer> {
1003 102 : self.inner
1004 102 : .take()
1005 102 : .unwrap()
1006 102 : .finish(timeline, ctx, Some(end_key))
1007 210 : .await
1008 102 : }
1009 : }
1010 :
1011 : impl Drop for ImageLayerWriter {
1012 1470 : fn drop(&mut self) {
1013 1470 : if let Some(inner) = self.inner.take() {
1014 570 : inner.blob_writer.into_inner().remove();
1015 900 : }
1016 1470 : }
1017 : }
1018 :
1019 : pub struct ImageLayerIterator<'a> {
1020 : image_layer: &'a ImageLayerInner,
1021 : ctx: &'a RequestContext,
1022 : planner: StreamingVectoredReadPlanner,
1023 : index_iter: DiskBtreeIterator<'a>,
1024 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1025 : is_end: bool,
1026 : }
1027 :
1028 : impl<'a> ImageLayerIterator<'a> {
1029 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1030 0 : self.image_layer.layer_dbg_info()
1031 0 : }
1032 :
1033 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1034 57052 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1035 57052 : assert!(self.key_values_batch.is_empty());
1036 57052 : assert!(!self.is_end);
1037 :
1038 57052 : let plan = loop {
1039 86310 : if let Some(res) = self.index_iter.next().await {
1040 86118 : let (raw_key, offset) = res?;
1041 86118 : if let Some(batch_plan) = self.planner.handle(
1042 86118 : Key::from_slice(&raw_key[..KEY_SIZE]),
1043 86118 : self.image_layer.lsn,
1044 86118 : offset,
1045 86118 : ) {
1046 56860 : break batch_plan;
1047 29258 : }
1048 : } else {
1049 192 : self.is_end = true;
1050 192 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1051 192 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1052 192 : break item;
1053 : } else {
1054 0 : return Ok(()); // TODO: a test case on empty iterator
1055 : }
1056 : }
1057 : };
1058 57052 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1059 57052 : let mut next_batch = std::collections::VecDeque::new();
1060 57052 : let buf_size = plan.size();
1061 57052 : let buf = BytesMut::with_capacity(buf_size);
1062 57052 : let blobs_buf = vectored_blob_reader
1063 57052 : .read_blobs(&plan, buf, self.ctx)
1064 28970 : .await?;
1065 57052 : let frozen_buf: Bytes = blobs_buf.buf.freeze();
1066 86034 : for meta in blobs_buf.blobs.iter() {
1067 86034 : let img_buf = frozen_buf.slice(meta.start..meta.end);
1068 86034 : next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
1069 86034 : }
1070 57052 : self.key_values_batch = next_batch;
1071 57052 : Ok(())
1072 57052 : }
1073 :
1074 85524 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1075 85524 : if self.key_values_batch.is_empty() {
1076 57100 : if self.is_end {
1077 300 : return Ok(None);
1078 56800 : }
1079 56800 : self.next_batch().await?;
1080 28424 : }
1081 85224 : Ok(Some(
1082 85224 : self.key_values_batch
1083 85224 : .pop_front()
1084 85224 : .expect("should not be empty"),
1085 85224 : ))
1086 85524 : }
1087 : }
1088 :
1089 : #[cfg(test)]
1090 : mod test {
1091 : use std::{sync::Arc, time::Duration};
1092 :
1093 : use bytes::Bytes;
1094 : use itertools::Itertools;
1095 : use pageserver_api::{
1096 : key::Key,
1097 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1098 : };
1099 : use utils::{
1100 : generation::Generation,
1101 : id::{TenantId, TimelineId},
1102 : lsn::Lsn,
1103 : };
1104 :
1105 : use crate::{
1106 : context::RequestContext,
1107 : repository::Value,
1108 : tenant::{
1109 : config::TenantConf,
1110 : harness::{TenantHarness, TIMELINE_ID},
1111 : storage_layer::ResidentLayer,
1112 : vectored_blob_io::StreamingVectoredReadPlanner,
1113 : Tenant, Timeline,
1114 : },
1115 : DEFAULT_PG_VERSION,
1116 : };
1117 :
1118 : use super::{ImageLayerIterator, ImageLayerWriter};
1119 :
1120 : #[tokio::test]
1121 6 : async fn image_layer_rewrite() {
1122 6 : let tenant_conf = TenantConf {
1123 6 : gc_period: Duration::ZERO,
1124 6 : compaction_period: Duration::ZERO,
1125 6 : ..TenantConf::default()
1126 6 : };
1127 6 : let tenant_id = TenantId::generate();
1128 6 : let mut gen = Generation::new(0xdead0001);
1129 30 : let mut get_next_gen = || {
1130 30 : let ret = gen;
1131 30 : gen = gen.next();
1132 30 : ret
1133 30 : };
1134 6 : // The LSN at which we will create an image layer to filter
1135 6 : let lsn = Lsn(0xdeadbeef0000);
1136 6 : let timeline_id = TimelineId::generate();
1137 6 :
1138 6 : //
1139 6 : // Create an unsharded parent with a layer.
1140 6 : //
1141 6 :
1142 6 : let harness = TenantHarness::create_custom(
1143 6 : "test_image_layer_rewrite--parent",
1144 6 : tenant_conf.clone(),
1145 6 : tenant_id,
1146 6 : ShardIdentity::unsharded(),
1147 6 : get_next_gen(),
1148 6 : )
1149 6 : .await
1150 6 : .unwrap();
1151 24 : let (tenant, ctx) = harness.load().await;
1152 6 : let timeline = tenant
1153 6 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1154 12 : .await
1155 6 : .unwrap();
1156 6 :
1157 6 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1158 6 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1159 6 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1160 6 : let range = input_start..input_end;
1161 6 :
1162 6 : // Build an image layer to filter
1163 6 : let resident = {
1164 6 : let mut writer = ImageLayerWriter::new(
1165 6 : harness.conf,
1166 6 : timeline_id,
1167 6 : harness.tenant_shard_id,
1168 6 : &range,
1169 6 : lsn,
1170 6 : &ctx,
1171 6 : )
1172 6 : .await
1173 6 : .unwrap();
1174 6 :
1175 6 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1176 6 : let mut key = range.start;
1177 786438 : while key < range.end {
1178 798717 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1179 786432 :
1180 786432 : key = key.next();
1181 6 : }
1182 357 : writer.finish(&timeline, &ctx).await.unwrap()
1183 6 : };
1184 6 : let original_size = resident.metadata().file_size;
1185 6 :
1186 6 : //
1187 6 : // Create child shards and do the rewrite, exercising filter().
1188 6 : // TODO: abstraction in TenantHarness for splits.
1189 6 : //
1190 6 :
1191 6 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1192 6 : // range, middle of key range.
1193 6 : let shard_count = ShardCount::new(4);
1194 24 : for shard_number in 0..shard_count.count() {
1195 6 : //
1196 6 : // mimic the shard split
1197 6 : //
1198 24 : let shard_identity = ShardIdentity::new(
1199 24 : ShardNumber(shard_number),
1200 24 : shard_count,
1201 24 : ShardStripeSize(0x8000),
1202 24 : )
1203 24 : .unwrap();
1204 24 : let harness = TenantHarness::create_custom(
1205 24 : Box::leak(Box::new(format!(
1206 24 : "test_image_layer_rewrite--child{}",
1207 24 : shard_identity.shard_slug()
1208 24 : ))),
1209 24 : tenant_conf.clone(),
1210 24 : tenant_id,
1211 24 : shard_identity,
1212 24 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1213 24 : // But here, all we care about is that the gen number is unique.
1214 24 : get_next_gen(),
1215 24 : )
1216 6 : .await
1217 24 : .unwrap();
1218 96 : let (tenant, ctx) = harness.load().await;
1219 24 : let timeline = tenant
1220 24 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1221 36 : .await
1222 24 : .unwrap();
1223 6 :
1224 6 : //
1225 6 : // use filter() and make assertions
1226 6 : //
1227 6 :
1228 24 : let mut filtered_writer = ImageLayerWriter::new(
1229 24 : harness.conf,
1230 24 : timeline_id,
1231 24 : harness.tenant_shard_id,
1232 24 : &range,
1233 24 : lsn,
1234 24 : &ctx,
1235 24 : )
1236 12 : .await
1237 24 : .unwrap();
1238 6 :
1239 24 : let wrote_keys = resident
1240 24 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1241 800157 : .await
1242 24 : .unwrap();
1243 24 : let replacement = if wrote_keys > 0 {
1244 387 : Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
1245 6 : } else {
1246 6 : None
1247 6 : };
1248 6 :
1249 6 : // This exact size and those below will need updating as/when the layer encoding changes, but
1250 6 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1251 24 : assert_eq!(original_size, 1597440);
1252 6 :
1253 24 : match shard_number {
1254 6 : 0 => {
1255 6 : // We should have written out just one stripe for our shard identity
1256 6 : assert_eq!(wrote_keys, 0x8000);
1257 6 : let replacement = replacement.unwrap();
1258 6 :
1259 6 : // We should have dropped some of the data
1260 6 : assert!(replacement.metadata().file_size < original_size);
1261 6 : assert!(replacement.metadata().file_size > 0);
1262 6 :
1263 6 : // Assert that we dropped ~3/4 of the data.
1264 6 : assert_eq!(replacement.metadata().file_size, 417792);
1265 6 : }
1266 6 : 1 => {
1267 6 : // Shard 1 has no keys in our input range
1268 6 : assert_eq!(wrote_keys, 0x0);
1269 6 : assert!(replacement.is_none());
1270 6 : }
1271 6 : 2 => {
1272 6 : // Shard 2 has one stripes in the input range
1273 6 : assert_eq!(wrote_keys, 0x8000);
1274 6 : let replacement = replacement.unwrap();
1275 6 : assert!(replacement.metadata().file_size < original_size);
1276 6 : assert!(replacement.metadata().file_size > 0);
1277 6 : assert_eq!(replacement.metadata().file_size, 417792);
1278 6 : }
1279 6 : 3 => {
1280 6 : // Shard 3 has two stripes in the input range
1281 6 : assert_eq!(wrote_keys, 0x10000);
1282 6 : let replacement = replacement.unwrap();
1283 6 : assert!(replacement.metadata().file_size < original_size);
1284 6 : assert!(replacement.metadata().file_size > 0);
1285 6 : assert_eq!(replacement.metadata().file_size, 811008);
1286 6 : }
1287 6 : _ => unreachable!(),
1288 6 : }
1289 6 : }
1290 6 : }
1291 :
1292 6 : async fn produce_image_layer(
1293 6 : tenant: &Tenant,
1294 6 : tline: &Arc<Timeline>,
1295 6 : mut images: Vec<(Key, Bytes)>,
1296 6 : lsn: Lsn,
1297 6 : ctx: &RequestContext,
1298 6 : ) -> anyhow::Result<ResidentLayer> {
1299 6 : images.sort();
1300 6 : let (key_start, _) = images.first().unwrap();
1301 6 : let (key_last, _) = images.last().unwrap();
1302 6 : let key_end = key_last.next();
1303 6 : let key_range = *key_start..key_end;
1304 6 : let mut writer = ImageLayerWriter::new(
1305 6 : tenant.conf,
1306 6 : tline.timeline_id,
1307 6 : tenant.tenant_shard_id,
1308 6 : &key_range,
1309 6 : lsn,
1310 6 : ctx,
1311 6 : )
1312 3 : .await?;
1313 :
1314 6006 : for (key, img) in images {
1315 6093 : writer.put_image(key, img, ctx).await?;
1316 : }
1317 12 : let img_layer = writer.finish(tline, ctx).await?;
1318 :
1319 6 : Ok::<_, anyhow::Error>(img_layer)
1320 6 : }
1321 :
1322 84 : async fn assert_img_iter_equal(
1323 84 : img_iter: &mut ImageLayerIterator<'_>,
1324 84 : expect: &[(Key, Bytes)],
1325 84 : expect_lsn: Lsn,
1326 84 : ) {
1327 84 : let mut expect_iter = expect.iter();
1328 : loop {
1329 84084 : let o1 = img_iter.next().await.unwrap();
1330 84084 : let o2 = expect_iter.next();
1331 84084 : match (o1, o2) {
1332 84 : (None, None) => break,
1333 84000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1334 84000 : let Value::Image(i1) = v1 else {
1335 0 : panic!("expect Value::Image")
1336 : };
1337 84000 : assert_eq!(&k1, k2);
1338 84000 : assert_eq!(l1, expect_lsn);
1339 84000 : assert_eq!(&i1, i2);
1340 : }
1341 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1342 : }
1343 : }
1344 84 : }
1345 :
1346 : #[tokio::test]
1347 6 : async fn image_layer_iterator() {
1348 6 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1349 24 : let (tenant, ctx) = harness.load().await;
1350 6 :
1351 6 : let tline = tenant
1352 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1353 12 : .await
1354 6 : .unwrap();
1355 6 :
1356 6000 : fn get_key(id: u32) -> Key {
1357 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1358 6000 : key.field6 = id;
1359 6000 : key
1360 6000 : }
1361 6 : const N: usize = 1000;
1362 6 : let test_imgs = (0..N)
1363 6000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1364 6 : .collect_vec();
1365 6 : let resident_layer =
1366 6 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1367 6108 : .await
1368 6 : .unwrap();
1369 6 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1370 18 : for max_read_size in [1, 1024] {
1371 96 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1372 84 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1373 84 : // Test if the batch size is correctly determined
1374 84 : let mut iter = img_layer.iter(&ctx);
1375 84 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1376 84 : let mut num_items = 0;
1377 336 : for _ in 0..3 {
1378 252 : iter.next_batch().await.unwrap();
1379 252 : num_items += iter.key_values_batch.len();
1380 252 : if max_read_size == 1 {
1381 6 : // every key should be a batch b/c the value is larger than max_read_size
1382 126 : assert_eq!(iter.key_values_batch.len(), 1);
1383 6 : } else {
1384 126 : assert!(iter.key_values_batch.len() <= batch_size);
1385 6 : }
1386 252 : if num_items >= N {
1387 6 : break;
1388 252 : }
1389 252 : iter.key_values_batch.clear();
1390 6 : }
1391 6 : // Test if the result is correct
1392 84 : let mut iter = img_layer.iter(&ctx);
1393 84 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1394 28787 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1395 6 : }
1396 6 : }
1397 6 : }
1398 : }
|