Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::{self, VirtualFile};
42 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
43 : use anyhow::{anyhow, bail, ensure, Context, Result};
44 : use bytes::{Bytes, BytesMut};
45 : use camino::{Utf8Path, Utf8PathBuf};
46 : use hex;
47 : use itertools::Itertools;
48 : use pageserver_api::keyspace::KeySpace;
49 : use pageserver_api::models::LayerAccessKind;
50 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
51 : use rand::{distributions::Alphanumeric, Rng};
52 : use serde::{Deserialize, Serialize};
53 : use std::fs::File;
54 : use std::io::SeekFrom;
55 : use std::ops::Range;
56 : use std::os::unix::prelude::FileExt;
57 : use std::str::FromStr;
58 : use std::sync::Arc;
59 : use tokio::sync::OnceCell;
60 : use tokio_stream::StreamExt;
61 : use tracing::*;
62 :
63 : use utils::{
64 : bin_ser::BeSer,
65 : id::{TenantId, TimelineId},
66 : lsn::Lsn,
67 : };
68 :
69 : use super::layer_name::ImageLayerName;
70 : use super::{
71 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
72 : };
73 :
74 : ///
75 : /// Header stored in the beginning of the file
76 : ///
77 : /// After this comes the 'values' part, starting on block 1. After that,
78 : /// the 'index' starts at the block indicated by 'index_start_blk'
79 : ///
80 90 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
81 : pub struct Summary {
82 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
83 : pub magic: u16,
84 : pub format_version: u16,
85 :
86 : pub tenant_id: TenantId,
87 : pub timeline_id: TimelineId,
88 : pub key_range: Range<Key>,
89 : pub lsn: Lsn,
90 :
91 : /// Block number where the 'index' part of the file begins.
92 : pub index_start_blk: u32,
93 : /// Block within the 'index', where the B-tree root page is stored
94 : pub index_root_blk: u32,
95 : // the 'values' part starts after the summary header, on block 1.
96 : }
97 :
98 : impl From<&ImageLayer> for Summary {
99 0 : fn from(layer: &ImageLayer) -> Self {
100 0 : Self::expected(
101 0 : layer.desc.tenant_shard_id.tenant_id,
102 0 : layer.desc.timeline_id,
103 0 : layer.desc.key_range.clone(),
104 0 : layer.lsn,
105 0 : )
106 0 : }
107 : }
108 :
109 : impl Summary {
110 90 : pub(super) fn expected(
111 90 : tenant_id: TenantId,
112 90 : timeline_id: TimelineId,
113 90 : key_range: Range<Key>,
114 90 : lsn: Lsn,
115 90 : ) -> Self {
116 90 : Self {
117 90 : magic: IMAGE_FILE_MAGIC,
118 90 : format_version: STORAGE_FORMAT_VERSION,
119 90 : tenant_id,
120 90 : timeline_id,
121 90 : key_range,
122 90 : lsn,
123 90 :
124 90 : index_start_blk: 0,
125 90 : index_root_blk: 0,
126 90 : }
127 90 : }
128 : }
129 :
130 : /// This is used only from `pagectl`. Within pageserver, all layers are
131 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
132 : pub struct ImageLayer {
133 : path: Utf8PathBuf,
134 : pub desc: PersistentLayerDesc,
135 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
136 : pub lsn: Lsn,
137 : access_stats: LayerAccessStats,
138 : inner: OnceCell<ImageLayerInner>,
139 : }
140 :
141 : impl std::fmt::Debug for ImageLayer {
142 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 0 : use super::RangeDisplayDebug;
144 0 :
145 0 : f.debug_struct("ImageLayer")
146 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
147 0 : .field("file_size", &self.desc.file_size)
148 0 : .field("lsn", &self.lsn)
149 0 : .field("inner", &self.inner)
150 0 : .finish()
151 0 : }
152 : }
153 :
154 : /// ImageLayer is the in-memory data structure associated with an on-disk image
155 : /// file.
156 : pub struct ImageLayerInner {
157 : // values copied from summary
158 : index_start_blk: u32,
159 : index_root_blk: u32,
160 :
161 : key_range: Range<Key>,
162 : lsn: Lsn,
163 :
164 : file: VirtualFile,
165 : file_id: FileId,
166 :
167 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
168 : compressed_reads: bool,
169 : }
170 :
171 : impl std::fmt::Debug for ImageLayerInner {
172 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 0 : f.debug_struct("ImageLayerInner")
174 0 : .field("index_start_blk", &self.index_start_blk)
175 0 : .field("index_root_blk", &self.index_root_blk)
176 0 : .finish()
177 0 : }
178 : }
179 :
180 : impl ImageLayerInner {
181 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
182 0 : let block_reader =
183 0 : FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
184 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
185 0 : self.index_start_blk,
186 0 : self.index_root_blk,
187 0 : block_reader,
188 0 : );
189 0 :
190 0 : tree_reader.dump().await?;
191 :
192 0 : tree_reader
193 0 : .visit(
194 0 : &[0u8; KEY_SIZE],
195 0 : VisitDirection::Forwards,
196 0 : |key, value| {
197 0 : println!("key: {} offset {}", hex::encode(key), value);
198 0 : true
199 0 : },
200 0 : ctx,
201 0 : )
202 0 : .await?;
203 :
204 0 : Ok(())
205 0 : }
206 : }
207 :
208 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
209 : impl std::fmt::Display for ImageLayer {
210 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 0 : write!(f, "{}", self.layer_desc().short_id())
212 0 : }
213 : }
214 :
215 : impl AsLayerDesc for ImageLayer {
216 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
217 0 : &self.desc
218 0 : }
219 : }
220 :
221 : impl ImageLayer {
222 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
223 0 : self.desc.dump();
224 0 :
225 0 : if !verbose {
226 0 : return Ok(());
227 0 : }
228 :
229 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
230 :
231 0 : inner.dump(ctx).await?;
232 :
233 0 : Ok(())
234 0 : }
235 :
236 244 : fn temp_path_for(
237 244 : conf: &PageServerConf,
238 244 : timeline_id: TimelineId,
239 244 : tenant_shard_id: TenantShardId,
240 244 : fname: &ImageLayerName,
241 244 : ) -> Utf8PathBuf {
242 244 : let rand_string: String = rand::thread_rng()
243 244 : .sample_iter(&Alphanumeric)
244 244 : .take(8)
245 244 : .map(char::from)
246 244 : .collect();
247 244 :
248 244 : conf.timeline_path(&tenant_shard_id, &timeline_id)
249 244 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
250 244 : }
251 :
252 : ///
253 : /// Open the underlying file and read the metadata into memory, if it's
254 : /// not loaded already.
255 : ///
256 0 : async fn load(
257 0 : &self,
258 0 : access_kind: LayerAccessKind,
259 0 : ctx: &RequestContext,
260 0 : ) -> Result<&ImageLayerInner> {
261 0 : self.access_stats.record_access(access_kind, ctx);
262 0 : self.inner
263 0 : .get_or_try_init(|| self.load_inner(ctx))
264 0 : .await
265 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
266 0 : }
267 :
268 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
269 0 : let path = self.path();
270 :
271 0 : let loaded =
272 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, false, ctx)
273 0 : .await
274 0 : .and_then(|res| res)?;
275 :
276 : // not production code
277 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
278 0 : let expected_layer_name = self.layer_desc().layer_name();
279 0 :
280 0 : if actual_layer_name != expected_layer_name {
281 0 : println!("warning: filename does not match what is expected from in-file summary");
282 0 : println!("actual: {:?}", actual_layer_name.to_string());
283 0 : println!("expected: {:?}", expected_layer_name.to_string());
284 0 : }
285 :
286 0 : Ok(loaded)
287 0 : }
288 :
289 : /// Create an ImageLayer struct representing an existing file on disk.
290 : ///
291 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
292 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
293 0 : let mut summary_buf = vec![0; PAGE_SZ];
294 0 : file.read_exact_at(&mut summary_buf, 0)?;
295 0 : let summary = Summary::des_prefix(&summary_buf)?;
296 0 : let metadata = file
297 0 : .metadata()
298 0 : .context("get file metadata to determine size")?;
299 :
300 : // This function is never used for constructing layers in a running pageserver,
301 : // so it does not need an accurate TenantShardId.
302 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
303 0 :
304 0 : Ok(ImageLayer {
305 0 : path: path.to_path_buf(),
306 0 : desc: PersistentLayerDesc::new_img(
307 0 : tenant_shard_id,
308 0 : summary.timeline_id,
309 0 : summary.key_range,
310 0 : summary.lsn,
311 0 : metadata.len(),
312 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
313 0 : lsn: summary.lsn,
314 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
315 0 : inner: OnceCell::new(),
316 0 : })
317 0 : }
318 :
319 0 : fn path(&self) -> Utf8PathBuf {
320 0 : self.path.clone()
321 0 : }
322 : }
323 :
324 0 : #[derive(thiserror::Error, Debug)]
325 : pub enum RewriteSummaryError {
326 : #[error("magic mismatch")]
327 : MagicMismatch,
328 : #[error(transparent)]
329 : Other(#[from] anyhow::Error),
330 : }
331 :
332 : impl From<std::io::Error> for RewriteSummaryError {
333 0 : fn from(e: std::io::Error) -> Self {
334 0 : Self::Other(anyhow::anyhow!(e))
335 0 : }
336 : }
337 :
338 : impl ImageLayer {
339 0 : pub async fn rewrite_summary<F>(
340 0 : path: &Utf8Path,
341 0 : rewrite: F,
342 0 : ctx: &RequestContext,
343 0 : ) -> Result<(), RewriteSummaryError>
344 0 : where
345 0 : F: Fn(Summary) -> Summary,
346 0 : {
347 0 : let mut file = VirtualFile::open_with_options(
348 0 : path,
349 0 : virtual_file::OpenOptions::new().read(true).write(true),
350 0 : ctx,
351 0 : )
352 0 : .await
353 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
354 0 : let file_id = page_cache::next_file_id();
355 0 : let block_reader = FileBlockReader::new(&file, file_id);
356 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
357 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
358 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
359 0 : return Err(RewriteSummaryError::MagicMismatch);
360 0 : }
361 0 :
362 0 : let new_summary = rewrite(actual_summary);
363 0 :
364 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
365 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
366 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
367 0 : file.seek(SeekFrom::Start(0)).await?;
368 0 : let (_buf, res) = file.write_all(buf, ctx).await;
369 0 : res?;
370 0 : Ok(())
371 0 : }
372 : }
373 :
374 : impl ImageLayerInner {
375 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
376 : /// - inner has the success or transient failure
377 : /// - outer has the permanent failure
378 90 : pub(super) async fn load(
379 90 : path: &Utf8Path,
380 90 : lsn: Lsn,
381 90 : summary: Option<Summary>,
382 90 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
383 90 : support_compressed_reads: bool,
384 90 : ctx: &RequestContext,
385 90 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
386 90 : let file = match VirtualFile::open(path, ctx).await {
387 90 : Ok(file) => file,
388 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
389 : };
390 90 : let file_id = page_cache::next_file_id();
391 90 : let block_reader = FileBlockReader::new(&file, file_id);
392 90 : let summary_blk = match block_reader.read_blk(0, ctx).await {
393 90 : Ok(blk) => blk,
394 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
395 : };
396 :
397 : // length is the only way how this could fail, so it's not actually likely at all unless
398 : // read_blk returns wrong sized block.
399 : //
400 : // TODO: confirm and make this into assertion
401 90 : let actual_summary =
402 90 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
403 :
404 90 : if let Some(mut expected_summary) = summary {
405 : // production code path
406 90 : expected_summary.index_start_blk = actual_summary.index_start_blk;
407 90 : expected_summary.index_root_blk = actual_summary.index_root_blk;
408 90 : // mask out the timeline_id, but still require the layers to be from the same tenant
409 90 : expected_summary.timeline_id = actual_summary.timeline_id;
410 90 :
411 90 : if actual_summary != expected_summary {
412 0 : bail!(
413 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
414 0 : actual_summary,
415 0 : expected_summary
416 0 : );
417 90 : }
418 0 : }
419 :
420 90 : Ok(Ok(ImageLayerInner {
421 90 : index_start_blk: actual_summary.index_start_blk,
422 90 : index_root_blk: actual_summary.index_root_blk,
423 90 : lsn,
424 90 : file,
425 90 : file_id,
426 90 : max_vectored_read_bytes,
427 90 : compressed_reads: support_compressed_reads,
428 90 : key_range: actual_summary.key_range,
429 90 : }))
430 90 : }
431 :
432 7101 : pub(super) async fn get_value_reconstruct_data(
433 7101 : &self,
434 7101 : key: Key,
435 7101 : reconstruct_state: &mut ValueReconstructState,
436 7101 : ctx: &RequestContext,
437 7101 : ) -> anyhow::Result<ValueReconstructResult> {
438 7101 : let block_reader =
439 7101 : FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
440 7101 : let tree_reader =
441 7101 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
442 7101 :
443 7101 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
444 7101 : key.write_to_byte_slice(&mut keybuf);
445 7101 : if let Some(offset) = tree_reader
446 7101 : .get(
447 7101 : &keybuf,
448 7101 : &RequestContextBuilder::extend(ctx)
449 7101 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
450 7101 : .build(),
451 7101 : )
452 415 : .await?
453 : {
454 7097 : let blob = block_reader
455 7097 : .block_cursor()
456 7097 : .read_blob(
457 7097 : offset,
458 7097 : &RequestContextBuilder::extend(ctx)
459 7097 : .page_content_kind(PageContentKind::ImageLayerValue)
460 7097 : .build(),
461 7097 : )
462 314 : .await
463 7097 : .with_context(|| format!("failed to read value from offset {}", offset))?;
464 7097 : let value = Bytes::from(blob);
465 7097 :
466 7097 : reconstruct_state.img = Some((self.lsn, value));
467 7097 : Ok(ValueReconstructResult::Complete)
468 : } else {
469 4 : Ok(ValueReconstructResult::Missing)
470 : }
471 7101 : }
472 :
473 : // Look up the keys in the provided keyspace and update
474 : // the reconstruct state with whatever is found.
475 74 : pub(super) async fn get_values_reconstruct_data(
476 74 : &self,
477 74 : keyspace: KeySpace,
478 74 : reconstruct_state: &mut ValuesReconstructState,
479 74 : ctx: &RequestContext,
480 74 : ) -> Result<(), GetVectoredError> {
481 74 : let reads = self
482 74 : .plan_reads(keyspace, None, ctx)
483 345 : .await
484 74 : .map_err(GetVectoredError::Other)?;
485 :
486 74 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
487 872 : .await;
488 :
489 74 : reconstruct_state.on_image_layer_visited(&self.key_range);
490 74 :
491 74 : Ok(())
492 74 : }
493 :
494 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
495 8 : pub(super) async fn load_key_values(
496 8 : &self,
497 8 : ctx: &RequestContext,
498 8 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
499 8 : let block_reader =
500 8 : FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
501 8 : let tree_reader =
502 8 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
503 8 : let mut result = Vec::new();
504 8 : let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
505 8 : let block_reader =
506 8 : FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
507 8 : let cursor = block_reader.block_cursor();
508 80 : while let Some(item) = stream.next().await {
509 : // TODO: dedup code with get_reconstruct_value
510 72 : let (raw_key, offset) = item?;
511 72 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
512 : // TODO: ctx handling and sharding
513 72 : let blob = cursor
514 72 : .read_blob(offset, ctx)
515 2 : .await
516 72 : .with_context(|| format!("failed to read value from offset {}", offset))?;
517 72 : let value = Bytes::from(blob);
518 72 : result.push((key, self.lsn, Value::Image(value)));
519 : }
520 8 : Ok(result)
521 8 : }
522 :
523 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
524 : /// and the keys in this layer.
525 : ///
526 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
527 : /// this shard.
528 82 : async fn plan_reads(
529 82 : &self,
530 82 : keyspace: KeySpace,
531 82 : shard_identity: Option<&ShardIdentity>,
532 82 : ctx: &RequestContext,
533 82 : ) -> anyhow::Result<Vec<VectoredRead>> {
534 82 : let mut planner = VectoredReadPlanner::new(
535 82 : self.max_vectored_read_bytes
536 82 : .expect("Layer is loaded with max vectored bytes config")
537 82 : .0
538 82 : .into(),
539 82 : );
540 82 :
541 82 : let block_reader =
542 82 : FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
543 82 : let tree_reader =
544 82 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
545 82 :
546 82 : let ctx = RequestContextBuilder::extend(ctx)
547 82 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
548 82 : .build();
549 :
550 21742 : for range in keyspace.ranges.iter() {
551 21742 : let mut range_end_handled = false;
552 21742 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
553 21742 : range.start.write_to_byte_slice(&mut search_key);
554 21742 :
555 21742 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
556 21742 : let mut index_stream = std::pin::pin!(index_stream);
557 :
558 1080723 : while let Some(index_entry) = index_stream.next().await {
559 1080663 : let (raw_key, offset) = index_entry?;
560 :
561 1080663 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
562 1080663 : assert!(key >= range.start);
563 :
564 1080663 : let flag = if let Some(shard_identity) = shard_identity {
565 1048576 : if shard_identity.is_key_disposable(&key) {
566 786432 : BlobFlag::Ignore
567 : } else {
568 262144 : BlobFlag::None
569 : }
570 : } else {
571 32087 : BlobFlag::None
572 : };
573 :
574 1080663 : if key >= range.end {
575 21682 : planner.handle_range_end(offset);
576 21682 : range_end_handled = true;
577 21682 : break;
578 1058981 : } else {
579 1058981 : planner.handle(key, self.lsn, offset, flag);
580 1058981 : }
581 : }
582 :
583 21742 : if !range_end_handled {
584 60 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
585 60 : planner.handle_range_end(payload_end);
586 21682 : }
587 : }
588 :
589 82 : Ok(planner.finish())
590 82 : }
591 :
592 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
593 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
594 8 : pub(super) async fn filter(
595 8 : &self,
596 8 : shard_identity: &ShardIdentity,
597 8 : writer: &mut ImageLayerWriter,
598 8 : ctx: &RequestContext,
599 8 : ) -> anyhow::Result<usize> {
600 : // Fragment the range into the regions owned by this ShardIdentity
601 8 : let plan = self
602 8 : .plan_reads(
603 8 : KeySpace {
604 8 : // If asked for the total key space, plan_reads will give us all the keys in the layer
605 8 : ranges: vec![Key::MIN..Key::MAX],
606 8 : },
607 8 : Some(shard_identity),
608 8 : ctx,
609 8 : )
610 469 : .await?;
611 :
612 8 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
613 8 : let mut key_count = 0;
614 16 : for read in plan.into_iter() {
615 16 : let buf_size = read.size();
616 16 :
617 16 : let buf = BytesMut::with_capacity(buf_size);
618 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
619 :
620 16 : let frozen_buf = blobs_buf.buf.freeze();
621 :
622 262144 : for meta in blobs_buf.blobs.iter() {
623 262144 : let img_buf = frozen_buf.slice(meta.start..meta.end);
624 262144 :
625 262144 : key_count += 1;
626 262144 : writer
627 262144 : .put_image(meta.meta.key, img_buf, ctx)
628 266240 : .await
629 262144 : .context(format!("Storing key {}", meta.meta.key))?;
630 : }
631 : }
632 :
633 8 : Ok(key_count)
634 8 : }
635 :
636 74 : async fn do_reads_and_update_state(
637 74 : &self,
638 74 : reads: Vec<VectoredRead>,
639 74 : reconstruct_state: &mut ValuesReconstructState,
640 74 : ctx: &RequestContext,
641 74 : ) {
642 74 : let max_vectored_read_bytes = self
643 74 : .max_vectored_read_bytes
644 74 : .expect("Layer is loaded with max vectored bytes config")
645 74 : .0
646 74 : .into();
647 74 :
648 74 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
649 1723 : for read in reads.into_iter() {
650 1723 : let buf_size = read.size();
651 1723 :
652 1723 : if buf_size > max_vectored_read_bytes {
653 : // If the read is oversized, it should only contain one key.
654 0 : let offenders = read
655 0 : .blobs_at
656 0 : .as_slice()
657 0 : .iter()
658 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
659 0 : .join(", ");
660 0 : tracing::warn!(
661 0 : "Oversized vectored read ({} > {}) for keys {}",
662 : buf_size,
663 : max_vectored_read_bytes,
664 : offenders
665 : );
666 1723 : }
667 :
668 1723 : let buf = BytesMut::with_capacity(buf_size);
669 1723 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
670 :
671 1723 : match res {
672 1723 : Ok(blobs_buf) => {
673 1723 : let frozen_buf = blobs_buf.buf.freeze();
674 :
675 10405 : for meta in blobs_buf.blobs.iter() {
676 10405 : let img_buf = frozen_buf.slice(meta.start..meta.end);
677 10405 : reconstruct_state.update_key(
678 10405 : &meta.meta.key,
679 10405 : self.lsn,
680 10405 : Value::Image(img_buf),
681 10405 : );
682 10405 : }
683 : }
684 0 : Err(err) => {
685 0 : let kind = err.kind();
686 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
687 0 : reconstruct_state.on_key_error(
688 0 : blob_meta.key,
689 0 : PageReconstructError::from(anyhow!(
690 0 : "Failed to read blobs from virtual file {}: {}",
691 0 : self.file.path,
692 0 : kind
693 0 : )),
694 0 : );
695 0 : }
696 : }
697 : };
698 : }
699 74 : }
700 :
701 : #[cfg(test)]
702 56 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
703 56 : let block_reader =
704 56 : FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
705 56 : let tree_reader =
706 56 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
707 56 : ImageLayerIterator {
708 56 : image_layer: self,
709 56 : ctx,
710 56 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
711 56 : key_values_batch: std::collections::VecDeque::new(),
712 56 : is_end: false,
713 56 : planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
714 56 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
715 56 : 1024, // The default value. Unit tests might use a different value
716 56 : ),
717 56 : }
718 56 : }
719 : }
720 :
721 : /// A builder object for constructing a new image layer.
722 : ///
723 : /// Usage:
724 : ///
725 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
726 : ///
727 : /// 2. Write the contents by calling `put_page_image` for every key-value
728 : /// pair in the key range.
729 : ///
730 : /// 3. Call `finish`.
731 : ///
732 : struct ImageLayerWriterInner {
733 : conf: &'static PageServerConf,
734 : path: Utf8PathBuf,
735 : timeline_id: TimelineId,
736 : tenant_shard_id: TenantShardId,
737 : key_range: Range<Key>,
738 : lsn: Lsn,
739 :
740 : blob_writer: BlobWriter<false>,
741 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
742 : }
743 :
744 : impl ImageLayerWriterInner {
745 : ///
746 : /// Start building a new image layer.
747 : ///
748 244 : async fn new(
749 244 : conf: &'static PageServerConf,
750 244 : timeline_id: TimelineId,
751 244 : tenant_shard_id: TenantShardId,
752 244 : key_range: &Range<Key>,
753 244 : lsn: Lsn,
754 244 : ctx: &RequestContext,
755 244 : ) -> anyhow::Result<Self> {
756 244 : // Create the file initially with a temporary filename.
757 244 : // We'll atomically rename it to the final name when we're done.
758 244 : let path = ImageLayer::temp_path_for(
759 244 : conf,
760 244 : timeline_id,
761 244 : tenant_shard_id,
762 244 : &ImageLayerName {
763 244 : key_range: key_range.clone(),
764 244 : lsn,
765 244 : },
766 244 : );
767 244 : trace!("creating image layer {}", path);
768 244 : let mut file = {
769 244 : VirtualFile::open_with_options(
770 244 : &path,
771 244 : virtual_file::OpenOptions::new()
772 244 : .write(true)
773 244 : .create_new(true),
774 244 : ctx,
775 244 : )
776 189 : .await?
777 : };
778 : // make room for the header block
779 244 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
780 244 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
781 244 :
782 244 : // Initialize the b-tree index builder
783 244 : let block_buf = BlockBuf::new();
784 244 : let tree_builder = DiskBtreeBuilder::new(block_buf);
785 244 :
786 244 : let writer = Self {
787 244 : conf,
788 244 : path,
789 244 : timeline_id,
790 244 : tenant_shard_id,
791 244 : key_range: key_range.clone(),
792 244 : lsn,
793 244 : tree: tree_builder,
794 244 : blob_writer,
795 244 : };
796 244 :
797 244 : Ok(writer)
798 244 : }
799 :
800 : ///
801 : /// Write next value to the file.
802 : ///
803 : /// The page versions must be appended in blknum order.
804 : ///
805 537718 : async fn put_image(
806 537718 : &mut self,
807 537718 : key: Key,
808 537718 : img: Bytes,
809 537718 : ctx: &RequestContext,
810 537718 : ) -> anyhow::Result<()> {
811 537718 : ensure!(self.key_range.contains(&key));
812 546219 : let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
813 : // TODO: re-use the buffer for `img` further upstack
814 537718 : let off = res?;
815 :
816 537718 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
817 537718 : key.write_to_byte_slice(&mut keybuf);
818 537718 : self.tree.append(&keybuf, off)?;
819 :
820 537718 : Ok(())
821 537718 : }
822 :
823 : ///
824 : /// Finish writing the image layer.
825 : ///
826 238 : async fn finish(
827 238 : self,
828 238 : timeline: &Arc<Timeline>,
829 238 : ctx: &RequestContext,
830 238 : ) -> anyhow::Result<ResidentLayer> {
831 238 : let index_start_blk =
832 238 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
833 238 :
834 238 : let mut file = self.blob_writer.into_inner();
835 238 :
836 238 : // Write out the index
837 238 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
838 0 : .await?;
839 238 : let (index_root_blk, block_buf) = self.tree.finish()?;
840 932 : for buf in block_buf.blocks {
841 694 : let (_buf, res) = file.write_all(buf, ctx).await;
842 694 : res?;
843 : }
844 :
845 : // Fill in the summary on blk 0
846 238 : let summary = Summary {
847 238 : magic: IMAGE_FILE_MAGIC,
848 238 : format_version: STORAGE_FORMAT_VERSION,
849 238 : tenant_id: self.tenant_shard_id.tenant_id,
850 238 : timeline_id: self.timeline_id,
851 238 : key_range: self.key_range.clone(),
852 238 : lsn: self.lsn,
853 238 : index_start_blk,
854 238 : index_root_blk,
855 238 : };
856 238 :
857 238 : let mut buf = Vec::with_capacity(PAGE_SZ);
858 238 : // TODO: could use smallvec here but it's a pain with Slice<T>
859 238 : Summary::ser_into(&summary, &mut buf)?;
860 238 : file.seek(SeekFrom::Start(0)).await?;
861 238 : let (_buf, res) = file.write_all(buf, ctx).await;
862 238 : res?;
863 :
864 238 : let metadata = file
865 238 : .metadata()
866 122 : .await
867 238 : .context("get metadata to determine file size")?;
868 :
869 238 : let desc = PersistentLayerDesc::new_img(
870 238 : self.tenant_shard_id,
871 238 : self.timeline_id,
872 238 : self.key_range.clone(),
873 238 : self.lsn,
874 238 : metadata.len(),
875 238 : );
876 238 :
877 238 : // Note: Because we open the file in write-only mode, we cannot
878 238 : // reuse the same VirtualFile for reading later. That's why we don't
879 238 : // set inner.file here. The first read will have to re-open it.
880 238 :
881 238 : // fsync the file
882 238 : file.sync_all().await?;
883 :
884 : // FIXME: why not carry the virtualfile here, it supports renaming?
885 238 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
886 :
887 238 : info!("created image layer {}", layer.local_path());
888 :
889 238 : Ok(layer)
890 238 : }
891 : }
892 :
893 : /// A builder object for constructing a new image layer.
894 : ///
895 : /// Usage:
896 : ///
897 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
898 : ///
899 : /// 2. Write the contents by calling `put_page_image` for every key-value
900 : /// pair in the key range.
901 : ///
902 : /// 3. Call `finish`.
903 : ///
904 : /// # Note
905 : ///
906 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
907 : /// possible for the writer to drop before `finish` is actually called. So this
908 : /// could lead to odd temporary files in the directory, exhausting file system.
909 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
910 : /// implementation that cleans up the temporary file in failure. It's not
911 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
912 : /// out some fields, making it impossible to implement `Drop`.
913 : ///
914 : #[must_use]
915 : pub struct ImageLayerWriter {
916 : inner: Option<ImageLayerWriterInner>,
917 : }
918 :
919 : impl ImageLayerWriter {
920 : ///
921 : /// Start building a new image layer.
922 : ///
923 244 : pub async fn new(
924 244 : conf: &'static PageServerConf,
925 244 : timeline_id: TimelineId,
926 244 : tenant_shard_id: TenantShardId,
927 244 : key_range: &Range<Key>,
928 244 : lsn: Lsn,
929 244 : ctx: &RequestContext,
930 244 : ) -> anyhow::Result<ImageLayerWriter> {
931 244 : Ok(Self {
932 244 : inner: Some(
933 244 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
934 189 : .await?,
935 : ),
936 : })
937 244 : }
938 :
939 : ///
940 : /// Write next value to the file.
941 : ///
942 : /// The page versions must be appended in blknum order.
943 : ///
944 537718 : pub async fn put_image(
945 537718 : &mut self,
946 537718 : key: Key,
947 537718 : img: Bytes,
948 537718 : ctx: &RequestContext,
949 537718 : ) -> anyhow::Result<()> {
950 546219 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
951 537718 : }
952 :
953 : ///
954 : /// Finish writing the image layer.
955 : ///
956 238 : pub(crate) async fn finish(
957 238 : mut self,
958 238 : timeline: &Arc<Timeline>,
959 238 : ctx: &RequestContext,
960 238 : ) -> anyhow::Result<super::ResidentLayer> {
961 718 : self.inner.take().unwrap().finish(timeline, ctx).await
962 238 : }
963 : }
964 :
965 : impl Drop for ImageLayerWriter {
966 244 : fn drop(&mut self) {
967 244 : if let Some(inner) = self.inner.take() {
968 6 : inner.blob_writer.into_inner().remove();
969 238 : }
970 244 : }
971 : }
972 :
973 : #[cfg(test)]
974 : pub struct ImageLayerIterator<'a> {
975 : image_layer: &'a ImageLayerInner,
976 : ctx: &'a RequestContext,
977 : planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
978 : index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
979 : key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
980 : is_end: bool,
981 : }
982 :
983 : #[cfg(test)]
984 : impl<'a> ImageLayerIterator<'a> {
985 : /// Retrieve a batch of key-value pairs into the iterator buffer.
986 18942 : async fn next_batch(&mut self) -> anyhow::Result<()> {
987 18942 : assert!(self.key_values_batch.is_empty());
988 18942 : assert!(!self.is_end);
989 :
990 18942 : let plan = loop {
991 28326 : if let Some(res) = self.index_iter.next().await {
992 28298 : let (raw_key, offset) = res?;
993 28298 : if let Some(batch_plan) = self.planner.handle(
994 28298 : Key::from_slice(&raw_key[..KEY_SIZE]),
995 28298 : self.image_layer.lsn,
996 28298 : offset,
997 28298 : BlobFlag::None,
998 28298 : ) {
999 18914 : break batch_plan;
1000 9384 : }
1001 : } else {
1002 28 : self.is_end = true;
1003 28 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1004 28 : break self.planner.handle_range_end(payload_end);
1005 : }
1006 : };
1007 18942 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1008 18942 : let mut next_batch = std::collections::VecDeque::new();
1009 18942 : let buf_size = plan.size();
1010 18942 : let buf = BytesMut::with_capacity(buf_size);
1011 18942 : let blobs_buf = vectored_blob_reader
1012 18942 : .read_blobs(&plan, buf, self.ctx)
1013 9619 : .await?;
1014 18942 : let frozen_buf: Bytes = blobs_buf.buf.freeze();
1015 28270 : for meta in blobs_buf.blobs.iter() {
1016 28270 : let img_buf = frozen_buf.slice(meta.start..meta.end);
1017 28270 : next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
1018 28270 : }
1019 18942 : self.key_values_batch = next_batch;
1020 18942 : Ok(())
1021 18942 : }
1022 :
1023 28028 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1024 28028 : if self.key_values_batch.is_empty() {
1025 18886 : if self.is_end {
1026 28 : return Ok(None);
1027 18858 : }
1028 18858 : self.next_batch().await?;
1029 9142 : }
1030 28000 : Ok(Some(
1031 28000 : self.key_values_batch
1032 28000 : .pop_front()
1033 28000 : .expect("should not be empty"),
1034 28000 : ))
1035 28028 : }
1036 : }
1037 :
1038 : #[cfg(test)]
1039 : mod test {
1040 : use std::{sync::Arc, time::Duration};
1041 :
1042 : use bytes::Bytes;
1043 : use itertools::Itertools;
1044 : use pageserver_api::{
1045 : key::Key,
1046 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1047 : };
1048 : use utils::{
1049 : generation::Generation,
1050 : id::{TenantId, TimelineId},
1051 : lsn::Lsn,
1052 : };
1053 :
1054 : use crate::{
1055 : context::RequestContext,
1056 : repository::Value,
1057 : tenant::{
1058 : config::TenantConf,
1059 : harness::{TenantHarness, TIMELINE_ID},
1060 : storage_layer::ResidentLayer,
1061 : vectored_blob_io::StreamingVectoredReadPlanner,
1062 : Tenant, Timeline,
1063 : },
1064 : DEFAULT_PG_VERSION,
1065 : };
1066 :
1067 : use super::{ImageLayerIterator, ImageLayerWriter};
1068 :
1069 : #[tokio::test]
1070 2 : async fn image_layer_rewrite() {
1071 2 : let tenant_conf = TenantConf {
1072 2 : gc_period: Duration::ZERO,
1073 2 : compaction_period: Duration::ZERO,
1074 2 : ..TenantConf::default()
1075 2 : };
1076 2 : let tenant_id = TenantId::generate();
1077 2 : let mut gen = Generation::new(0xdead0001);
1078 10 : let mut get_next_gen = || {
1079 10 : let ret = gen;
1080 10 : gen = gen.next();
1081 10 : ret
1082 10 : };
1083 2 : // The LSN at which we will create an image layer to filter
1084 2 : let lsn = Lsn(0xdeadbeef0000);
1085 2 : let timeline_id = TimelineId::generate();
1086 2 :
1087 2 : //
1088 2 : // Create an unsharded parent with a layer.
1089 2 : //
1090 2 :
1091 2 : let harness = TenantHarness::create_custom(
1092 2 : "test_image_layer_rewrite--parent",
1093 2 : tenant_conf.clone(),
1094 2 : tenant_id,
1095 2 : ShardIdentity::unsharded(),
1096 2 : get_next_gen(),
1097 2 : )
1098 2 : .unwrap();
1099 8 : let (tenant, ctx) = harness.load().await;
1100 2 : let timeline = tenant
1101 2 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1102 6 : .await
1103 2 : .unwrap();
1104 2 :
1105 2 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1106 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1107 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1108 2 : let range = input_start..input_end;
1109 2 :
1110 2 : // Build an image layer to filter
1111 2 : let resident = {
1112 2 : let mut writer = ImageLayerWriter::new(
1113 2 : harness.conf,
1114 2 : timeline_id,
1115 2 : harness.tenant_shard_id,
1116 2 : &range,
1117 2 : lsn,
1118 2 : &ctx,
1119 2 : )
1120 2 : .await
1121 2 : .unwrap();
1122 2 :
1123 2 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1124 2 : let mut key = range.start;
1125 262146 : while key < range.end {
1126 266239 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1127 262144 :
1128 262144 : key = key.next();
1129 2 : }
1130 119 : writer.finish(&timeline, &ctx).await.unwrap()
1131 2 : };
1132 2 : let original_size = resident.metadata().file_size;
1133 2 :
1134 2 : //
1135 2 : // Create child shards and do the rewrite, exercising filter().
1136 2 : // TODO: abstraction in TenantHarness for splits.
1137 2 : //
1138 2 :
1139 2 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1140 2 : // range, middle of key range.
1141 2 : let shard_count = ShardCount::new(4);
1142 8 : for shard_number in 0..shard_count.count() {
1143 2 : //
1144 2 : // mimic the shard split
1145 2 : //
1146 8 : let shard_identity = ShardIdentity::new(
1147 8 : ShardNumber(shard_number),
1148 8 : shard_count,
1149 8 : ShardStripeSize(0x8000),
1150 8 : )
1151 8 : .unwrap();
1152 8 : let harness = TenantHarness::create_custom(
1153 8 : Box::leak(Box::new(format!(
1154 8 : "test_image_layer_rewrite--child{}",
1155 8 : shard_identity.shard_slug()
1156 8 : ))),
1157 8 : tenant_conf.clone(),
1158 8 : tenant_id,
1159 8 : shard_identity,
1160 8 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1161 8 : // But here, all we care about is that the gen number is unique.
1162 8 : get_next_gen(),
1163 8 : )
1164 8 : .unwrap();
1165 32 : let (tenant, ctx) = harness.load().await;
1166 8 : let timeline = tenant
1167 8 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1168 20 : .await
1169 8 : .unwrap();
1170 2 :
1171 2 : //
1172 2 : // use filter() and make assertions
1173 2 : //
1174 2 :
1175 8 : let mut filtered_writer = ImageLayerWriter::new(
1176 8 : harness.conf,
1177 8 : timeline_id,
1178 8 : harness.tenant_shard_id,
1179 8 : &range,
1180 8 : lsn,
1181 8 : &ctx,
1182 8 : )
1183 4 : .await
1184 8 : .unwrap();
1185 2 :
1186 8 : let wrote_keys = resident
1187 8 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1188 266719 : .await
1189 8 : .unwrap();
1190 8 : let replacement = if wrote_keys > 0 {
1191 129 : Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
1192 2 : } else {
1193 2 : None
1194 2 : };
1195 2 :
1196 2 : // This exact size and those below will need updating as/when the layer encoding changes, but
1197 2 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1198 8 : assert_eq!(original_size, 1597440);
1199 2 :
1200 8 : match shard_number {
1201 2 : 0 => {
1202 2 : // We should have written out just one stripe for our shard identity
1203 2 : assert_eq!(wrote_keys, 0x8000);
1204 2 : let replacement = replacement.unwrap();
1205 2 :
1206 2 : // We should have dropped some of the data
1207 2 : assert!(replacement.metadata().file_size < original_size);
1208 2 : assert!(replacement.metadata().file_size > 0);
1209 2 :
1210 2 : // Assert that we dropped ~3/4 of the data.
1211 2 : assert_eq!(replacement.metadata().file_size, 417792);
1212 2 : }
1213 2 : 1 => {
1214 2 : // Shard 1 has no keys in our input range
1215 2 : assert_eq!(wrote_keys, 0x0);
1216 2 : assert!(replacement.is_none());
1217 2 : }
1218 2 : 2 => {
1219 2 : // Shard 2 has one stripes in the input range
1220 2 : assert_eq!(wrote_keys, 0x8000);
1221 2 : let replacement = replacement.unwrap();
1222 2 : assert!(replacement.metadata().file_size < original_size);
1223 2 : assert!(replacement.metadata().file_size > 0);
1224 2 : assert_eq!(replacement.metadata().file_size, 417792);
1225 2 : }
1226 2 : 3 => {
1227 2 : // Shard 3 has two stripes in the input range
1228 2 : assert_eq!(wrote_keys, 0x10000);
1229 2 : let replacement = replacement.unwrap();
1230 2 : assert!(replacement.metadata().file_size < original_size);
1231 2 : assert!(replacement.metadata().file_size > 0);
1232 2 : assert_eq!(replacement.metadata().file_size, 811008);
1233 2 : }
1234 2 : _ => unreachable!(),
1235 2 : }
1236 2 : }
1237 2 : }
1238 :
1239 2 : async fn produce_image_layer(
1240 2 : tenant: &Tenant,
1241 2 : tline: &Arc<Timeline>,
1242 2 : mut images: Vec<(Key, Bytes)>,
1243 2 : lsn: Lsn,
1244 2 : ctx: &RequestContext,
1245 2 : ) -> anyhow::Result<ResidentLayer> {
1246 2 : images.sort();
1247 2 : let (key_start, _) = images.first().unwrap();
1248 2 : let (key_last, _) = images.last().unwrap();
1249 2 : let key_end = key_last.next();
1250 2 : let key_range = *key_start..key_end;
1251 2 : let mut writer = ImageLayerWriter::new(
1252 2 : tenant.conf,
1253 2 : tline.timeline_id,
1254 2 : tenant.tenant_shard_id,
1255 2 : &key_range,
1256 2 : lsn,
1257 2 : ctx,
1258 2 : )
1259 1 : .await?;
1260 :
1261 2002 : for (key, img) in images {
1262 2031 : writer.put_image(key, img, ctx).await?;
1263 : }
1264 4 : let img_layer = writer.finish(tline, ctx).await?;
1265 :
1266 2 : Ok::<_, anyhow::Error>(img_layer)
1267 2 : }
1268 :
1269 28 : async fn assert_img_iter_equal(
1270 28 : img_iter: &mut ImageLayerIterator<'_>,
1271 28 : expect: &[(Key, Bytes)],
1272 28 : expect_lsn: Lsn,
1273 28 : ) {
1274 28 : let mut expect_iter = expect.iter();
1275 : loop {
1276 28028 : let o1 = img_iter.next().await.unwrap();
1277 28028 : let o2 = expect_iter.next();
1278 28028 : match (o1, o2) {
1279 28 : (None, None) => break,
1280 28000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1281 28000 : let Value::Image(i1) = v1 else {
1282 0 : panic!("expect Value::Image")
1283 : };
1284 28000 : assert_eq!(&k1, k2);
1285 28000 : assert_eq!(l1, expect_lsn);
1286 28000 : assert_eq!(&i1, i2);
1287 : }
1288 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1289 : }
1290 : }
1291 28 : }
1292 :
1293 : #[tokio::test]
1294 2 : async fn image_layer_iterator() {
1295 2 : let harness = TenantHarness::create("image_layer_iterator").unwrap();
1296 8 : let (tenant, ctx) = harness.load().await;
1297 2 :
1298 2 : let tline = tenant
1299 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1300 6 : .await
1301 2 : .unwrap();
1302 2 :
1303 2000 : fn get_key(id: u32) -> Key {
1304 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1305 2000 : key.field6 = id;
1306 2000 : key
1307 2000 : }
1308 2 : const N: usize = 1000;
1309 2 : let test_imgs = (0..N)
1310 2000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1311 2 : .collect_vec();
1312 2 : let resident_layer =
1313 2 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1314 2036 : .await
1315 2 : .unwrap();
1316 2 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1317 6 : for max_read_size in [1, 1024] {
1318 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1319 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1320 28 : // Test if the batch size is correctly determined
1321 28 : let mut iter = img_layer.iter(&ctx);
1322 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1323 28 : let mut num_items = 0;
1324 112 : for _ in 0..3 {
1325 84 : iter.next_batch().await.unwrap();
1326 84 : num_items += iter.key_values_batch.len();
1327 84 : if max_read_size == 1 {
1328 2 : // every key should be a batch b/c the value is larger than max_read_size
1329 42 : assert_eq!(iter.key_values_batch.len(), 1);
1330 2 : } else {
1331 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
1332 2 : }
1333 84 : if num_items >= N {
1334 2 : break;
1335 84 : }
1336 84 : iter.key_values_batch.clear();
1337 2 : }
1338 2 : // Test if the result is correct
1339 28 : let mut iter = img_layer.iter(&ctx);
1340 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1341 9576 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1342 2 : }
1343 2 : }
1344 2 : }
1345 : }
|