Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{
33 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
34 : };
35 : use crate::tenant::storage_layer::{
36 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
37 : };
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
41 : VectoredReadPlanner,
42 : };
43 : use crate::tenant::{PageReconstructError, Timeline};
44 : use crate::virtual_file::{self, VirtualFile};
45 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::{Bytes, BytesMut};
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use hex;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::models::LayerAccessKind;
53 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
54 : use rand::{distributions::Alphanumeric, Rng};
55 : use serde::{Deserialize, Serialize};
56 : use std::collections::VecDeque;
57 : use std::fs::File;
58 : use std::io::SeekFrom;
59 : use std::ops::Range;
60 : use std::os::unix::prelude::FileExt;
61 : use std::str::FromStr;
62 : use std::sync::Arc;
63 : use tokio::sync::OnceCell;
64 : use tokio_stream::StreamExt;
65 : use tracing::*;
66 :
67 : use utils::{
68 : bin_ser::BeSer,
69 : id::{TenantId, TimelineId},
70 : lsn::Lsn,
71 : };
72 :
73 : use super::layer_name::ImageLayerName;
74 : use super::{
75 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
76 : };
77 :
78 : ///
79 : /// Header stored in the beginning of the file
80 : ///
81 : /// After this comes the 'values' part, starting on block 1. After that,
82 : /// the 'index' starts at the block indicated by 'index_start_blk'
83 : ///
84 90 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
85 : pub struct Summary {
86 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
87 : pub magic: u16,
88 : pub format_version: u16,
89 :
90 : pub tenant_id: TenantId,
91 : pub timeline_id: TimelineId,
92 : pub key_range: Range<Key>,
93 : pub lsn: Lsn,
94 :
95 : /// Block number where the 'index' part of the file begins.
96 : pub index_start_blk: u32,
97 : /// Block within the 'index', where the B-tree root page is stored
98 : pub index_root_blk: u32,
99 : // the 'values' part starts after the summary header, on block 1.
100 : }
101 :
102 : impl From<&ImageLayer> for Summary {
103 0 : fn from(layer: &ImageLayer) -> Self {
104 0 : Self::expected(
105 0 : layer.desc.tenant_shard_id.tenant_id,
106 0 : layer.desc.timeline_id,
107 0 : layer.desc.key_range.clone(),
108 0 : layer.lsn,
109 0 : )
110 0 : }
111 : }
112 :
113 : impl Summary {
114 90 : pub(super) fn expected(
115 90 : tenant_id: TenantId,
116 90 : timeline_id: TimelineId,
117 90 : key_range: Range<Key>,
118 90 : lsn: Lsn,
119 90 : ) -> Self {
120 90 : Self {
121 90 : magic: IMAGE_FILE_MAGIC,
122 90 : format_version: STORAGE_FORMAT_VERSION,
123 90 : tenant_id,
124 90 : timeline_id,
125 90 : key_range,
126 90 : lsn,
127 90 :
128 90 : index_start_blk: 0,
129 90 : index_root_blk: 0,
130 90 : }
131 90 : }
132 : }
133 :
134 : /// This is used only from `pagectl`. Within pageserver, all layers are
135 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
136 : pub struct ImageLayer {
137 : path: Utf8PathBuf,
138 : pub desc: PersistentLayerDesc,
139 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
140 : pub lsn: Lsn,
141 : access_stats: LayerAccessStats,
142 : inner: OnceCell<ImageLayerInner>,
143 : }
144 :
145 : impl std::fmt::Debug for ImageLayer {
146 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 0 : use super::RangeDisplayDebug;
148 0 :
149 0 : f.debug_struct("ImageLayer")
150 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
151 0 : .field("file_size", &self.desc.file_size)
152 0 : .field("lsn", &self.lsn)
153 0 : .field("inner", &self.inner)
154 0 : .finish()
155 0 : }
156 : }
157 :
158 : /// ImageLayer is the in-memory data structure associated with an on-disk image
159 : /// file.
160 : pub struct ImageLayerInner {
161 : // values copied from summary
162 : index_start_blk: u32,
163 : index_root_blk: u32,
164 :
165 : key_range: Range<Key>,
166 : lsn: Lsn,
167 :
168 : file: VirtualFile,
169 : file_id: FileId,
170 :
171 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
172 : }
173 :
174 : impl std::fmt::Debug for ImageLayerInner {
175 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 0 : f.debug_struct("ImageLayerInner")
177 0 : .field("index_start_blk", &self.index_start_blk)
178 0 : .field("index_root_blk", &self.index_root_blk)
179 0 : .finish()
180 0 : }
181 : }
182 :
183 : impl ImageLayerInner {
184 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
185 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
186 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
187 0 : self.index_start_blk,
188 0 : self.index_root_blk,
189 0 : block_reader,
190 0 : );
191 0 :
192 0 : tree_reader.dump().await?;
193 :
194 0 : tree_reader
195 0 : .visit(
196 0 : &[0u8; KEY_SIZE],
197 0 : VisitDirection::Forwards,
198 0 : |key, value| {
199 0 : println!("key: {} offset {}", hex::encode(key), value);
200 0 : true
201 0 : },
202 0 : ctx,
203 0 : )
204 0 : .await?;
205 :
206 0 : Ok(())
207 0 : }
208 : }
209 :
210 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
211 : impl std::fmt::Display for ImageLayer {
212 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 0 : write!(f, "{}", self.layer_desc().short_id())
214 0 : }
215 : }
216 :
217 : impl AsLayerDesc for ImageLayer {
218 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
219 0 : &self.desc
220 0 : }
221 : }
222 :
223 : impl ImageLayer {
224 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
225 0 : self.desc.dump();
226 0 :
227 0 : if !verbose {
228 0 : return Ok(());
229 0 : }
230 :
231 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
232 :
233 0 : inner.dump(ctx).await?;
234 :
235 0 : Ok(())
236 0 : }
237 :
238 250 : fn temp_path_for(
239 250 : conf: &PageServerConf,
240 250 : timeline_id: TimelineId,
241 250 : tenant_shard_id: TenantShardId,
242 250 : fname: &ImageLayerName,
243 250 : ) -> Utf8PathBuf {
244 250 : let rand_string: String = rand::thread_rng()
245 250 : .sample_iter(&Alphanumeric)
246 250 : .take(8)
247 250 : .map(char::from)
248 250 : .collect();
249 250 :
250 250 : conf.timeline_path(&tenant_shard_id, &timeline_id)
251 250 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
252 250 : }
253 :
254 : ///
255 : /// Open the underlying file and read the metadata into memory, if it's
256 : /// not loaded already.
257 : ///
258 0 : async fn load(
259 0 : &self,
260 0 : access_kind: LayerAccessKind,
261 0 : ctx: &RequestContext,
262 0 : ) -> Result<&ImageLayerInner> {
263 0 : self.access_stats.record_access(access_kind, ctx);
264 0 : self.inner
265 0 : .get_or_try_init(|| self.load_inner(ctx))
266 0 : .await
267 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
268 0 : }
269 :
270 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
271 0 : let path = self.path();
272 :
273 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
274 0 : .await
275 0 : .and_then(|res| res)?;
276 :
277 : // not production code
278 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
279 0 : let expected_layer_name = self.layer_desc().layer_name();
280 0 :
281 0 : if actual_layer_name != expected_layer_name {
282 0 : println!("warning: filename does not match what is expected from in-file summary");
283 0 : println!("actual: {:?}", actual_layer_name.to_string());
284 0 : println!("expected: {:?}", expected_layer_name.to_string());
285 0 : }
286 :
287 0 : Ok(loaded)
288 0 : }
289 :
290 : /// Create an ImageLayer struct representing an existing file on disk.
291 : ///
292 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
293 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
294 0 : let mut summary_buf = vec![0; PAGE_SZ];
295 0 : file.read_exact_at(&mut summary_buf, 0)?;
296 0 : let summary = Summary::des_prefix(&summary_buf)?;
297 0 : let metadata = file
298 0 : .metadata()
299 0 : .context("get file metadata to determine size")?;
300 :
301 : // This function is never used for constructing layers in a running pageserver,
302 : // so it does not need an accurate TenantShardId.
303 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
304 0 :
305 0 : Ok(ImageLayer {
306 0 : path: path.to_path_buf(),
307 0 : desc: PersistentLayerDesc::new_img(
308 0 : tenant_shard_id,
309 0 : summary.timeline_id,
310 0 : summary.key_range,
311 0 : summary.lsn,
312 0 : metadata.len(),
313 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
314 0 : lsn: summary.lsn,
315 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
316 0 : inner: OnceCell::new(),
317 0 : })
318 0 : }
319 :
320 0 : fn path(&self) -> Utf8PathBuf {
321 0 : self.path.clone()
322 0 : }
323 : }
324 :
325 0 : #[derive(thiserror::Error, Debug)]
326 : pub enum RewriteSummaryError {
327 : #[error("magic mismatch")]
328 : MagicMismatch,
329 : #[error(transparent)]
330 : Other(#[from] anyhow::Error),
331 : }
332 :
333 : impl From<std::io::Error> for RewriteSummaryError {
334 0 : fn from(e: std::io::Error) -> Self {
335 0 : Self::Other(anyhow::anyhow!(e))
336 0 : }
337 : }
338 :
339 : impl ImageLayer {
340 0 : pub async fn rewrite_summary<F>(
341 0 : path: &Utf8Path,
342 0 : rewrite: F,
343 0 : ctx: &RequestContext,
344 0 : ) -> Result<(), RewriteSummaryError>
345 0 : where
346 0 : F: Fn(Summary) -> Summary,
347 0 : {
348 0 : let mut file = VirtualFile::open_with_options(
349 0 : path,
350 0 : virtual_file::OpenOptions::new().read(true).write(true),
351 0 : ctx,
352 0 : )
353 0 : .await
354 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
355 0 : let file_id = page_cache::next_file_id();
356 0 : let block_reader = FileBlockReader::new(&file, file_id);
357 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
358 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
359 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
360 0 : return Err(RewriteSummaryError::MagicMismatch);
361 0 : }
362 0 :
363 0 : let new_summary = rewrite(actual_summary);
364 0 :
365 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
366 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
367 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
368 0 : file.seek(SeekFrom::Start(0)).await?;
369 0 : let (_buf, res) = file.write_all(buf, ctx).await;
370 0 : res?;
371 0 : Ok(())
372 0 : }
373 : }
374 :
375 : impl ImageLayerInner {
376 8 : pub(crate) fn key_range(&self) -> &Range<Key> {
377 8 : &self.key_range
378 8 : }
379 :
380 8 : pub(crate) fn lsn(&self) -> Lsn {
381 8 : self.lsn
382 8 : }
383 :
384 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
385 : /// - inner has the success or transient failure
386 : /// - outer has the permanent failure
387 90 : pub(super) async fn load(
388 90 : path: &Utf8Path,
389 90 : lsn: Lsn,
390 90 : summary: Option<Summary>,
391 90 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
392 90 : ctx: &RequestContext,
393 90 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
394 90 : let file = match VirtualFile::open(path, ctx).await {
395 90 : Ok(file) => file,
396 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
397 : };
398 90 : let file_id = page_cache::next_file_id();
399 90 : let block_reader = FileBlockReader::new(&file, file_id);
400 90 : let summary_blk = match block_reader.read_blk(0, ctx).await {
401 90 : Ok(blk) => blk,
402 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
403 : };
404 :
405 : // length is the only way how this could fail, so it's not actually likely at all unless
406 : // read_blk returns wrong sized block.
407 : //
408 : // TODO: confirm and make this into assertion
409 90 : let actual_summary =
410 90 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
411 :
412 90 : if let Some(mut expected_summary) = summary {
413 : // production code path
414 90 : expected_summary.index_start_blk = actual_summary.index_start_blk;
415 90 : expected_summary.index_root_blk = actual_summary.index_root_blk;
416 90 : // mask out the timeline_id, but still require the layers to be from the same tenant
417 90 : expected_summary.timeline_id = actual_summary.timeline_id;
418 90 :
419 90 : if actual_summary != expected_summary {
420 0 : bail!(
421 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
422 0 : actual_summary,
423 0 : expected_summary
424 0 : );
425 90 : }
426 0 : }
427 :
428 90 : Ok(Ok(ImageLayerInner {
429 90 : index_start_blk: actual_summary.index_start_blk,
430 90 : index_root_blk: actual_summary.index_root_blk,
431 90 : lsn,
432 90 : file,
433 90 : file_id,
434 90 : max_vectored_read_bytes,
435 90 : key_range: actual_summary.key_range,
436 90 : }))
437 90 : }
438 :
439 7073 : pub(super) async fn get_value_reconstruct_data(
440 7073 : &self,
441 7073 : key: Key,
442 7073 : reconstruct_state: &mut ValueReconstructState,
443 7073 : ctx: &RequestContext,
444 7073 : ) -> anyhow::Result<ValueReconstructResult> {
445 7073 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
446 7073 : let tree_reader =
447 7073 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
448 7073 :
449 7073 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
450 7073 : key.write_to_byte_slice(&mut keybuf);
451 7073 : if let Some(offset) = tree_reader
452 7073 : .get(
453 7073 : &keybuf,
454 7073 : &RequestContextBuilder::extend(ctx)
455 7073 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
456 7073 : .build(),
457 7073 : )
458 416 : .await?
459 : {
460 7069 : let blob = block_reader
461 7069 : .block_cursor()
462 7069 : .read_blob(
463 7069 : offset,
464 7069 : &RequestContextBuilder::extend(ctx)
465 7069 : .page_content_kind(PageContentKind::ImageLayerValue)
466 7069 : .build(),
467 7069 : )
468 318 : .await
469 7069 : .with_context(|| format!("failed to read value from offset {}", offset))?;
470 7069 : let value = Bytes::from(blob);
471 7069 :
472 7069 : reconstruct_state.img = Some((self.lsn, value));
473 7069 : Ok(ValueReconstructResult::Complete)
474 : } else {
475 4 : Ok(ValueReconstructResult::Missing)
476 : }
477 7073 : }
478 :
479 : // Look up the keys in the provided keyspace and update
480 : // the reconstruct state with whatever is found.
481 74 : pub(super) async fn get_values_reconstruct_data(
482 74 : &self,
483 74 : keyspace: KeySpace,
484 74 : reconstruct_state: &mut ValuesReconstructState,
485 74 : ctx: &RequestContext,
486 74 : ) -> Result<(), GetVectoredError> {
487 74 : let reads = self
488 74 : .plan_reads(keyspace, None, ctx)
489 345 : .await
490 74 : .map_err(GetVectoredError::Other)?;
491 :
492 74 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
493 885 : .await;
494 :
495 74 : reconstruct_state.on_image_layer_visited(&self.key_range);
496 74 :
497 74 : Ok(())
498 74 : }
499 :
500 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
501 0 : pub(super) async fn load_key_values(
502 0 : &self,
503 0 : ctx: &RequestContext,
504 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
505 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
506 0 : let tree_reader =
507 0 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
508 0 : let mut result = Vec::new();
509 0 : let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
510 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
511 0 : let cursor = block_reader.block_cursor();
512 0 : while let Some(item) = stream.next().await {
513 : // TODO: dedup code with get_reconstruct_value
514 0 : let (raw_key, offset) = item?;
515 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
516 : // TODO: ctx handling and sharding
517 0 : let blob = cursor
518 0 : .read_blob(offset, ctx)
519 0 : .await
520 0 : .with_context(|| format!("failed to read value from offset {}", offset))?;
521 0 : let value = Bytes::from(blob);
522 0 : result.push((key, self.lsn, Value::Image(value)));
523 : }
524 0 : Ok(result)
525 0 : }
526 :
527 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
528 : /// and the keys in this layer.
529 : ///
530 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
531 : /// this shard.
532 82 : async fn plan_reads(
533 82 : &self,
534 82 : keyspace: KeySpace,
535 82 : shard_identity: Option<&ShardIdentity>,
536 82 : ctx: &RequestContext,
537 82 : ) -> anyhow::Result<Vec<VectoredRead>> {
538 82 : let mut planner = VectoredReadPlanner::new(
539 82 : self.max_vectored_read_bytes
540 82 : .expect("Layer is loaded with max vectored bytes config")
541 82 : .0
542 82 : .into(),
543 82 : );
544 82 :
545 82 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
546 82 : let tree_reader =
547 82 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
548 82 :
549 82 : let ctx = RequestContextBuilder::extend(ctx)
550 82 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
551 82 : .build();
552 :
553 21775 : for range in keyspace.ranges.iter() {
554 21775 : let mut range_end_handled = false;
555 21775 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
556 21775 : range.start.write_to_byte_slice(&mut search_key);
557 21775 :
558 21775 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
559 21775 : let mut index_stream = std::pin::pin!(index_stream);
560 :
561 1080721 : while let Some(index_entry) = index_stream.next().await {
562 1080661 : let (raw_key, offset) = index_entry?;
563 :
564 1080661 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
565 1080661 : assert!(key >= range.start);
566 :
567 1080661 : let flag = if let Some(shard_identity) = shard_identity {
568 1048576 : if shard_identity.is_key_disposable(&key) {
569 786432 : BlobFlag::Ignore
570 : } else {
571 262144 : BlobFlag::None
572 : }
573 : } else {
574 32085 : BlobFlag::None
575 : };
576 :
577 1080661 : if key >= range.end {
578 21715 : planner.handle_range_end(offset);
579 21715 : range_end_handled = true;
580 21715 : break;
581 1058946 : } else {
582 1058946 : planner.handle(key, self.lsn, offset, flag);
583 1058946 : }
584 : }
585 :
586 21775 : if !range_end_handled {
587 60 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
588 60 : planner.handle_range_end(payload_end);
589 21715 : }
590 : }
591 :
592 82 : Ok(planner.finish())
593 82 : }
594 :
595 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
596 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
597 8 : pub(super) async fn filter(
598 8 : &self,
599 8 : shard_identity: &ShardIdentity,
600 8 : writer: &mut ImageLayerWriter,
601 8 : ctx: &RequestContext,
602 8 : ) -> anyhow::Result<usize> {
603 : // Fragment the range into the regions owned by this ShardIdentity
604 8 : let plan = self
605 8 : .plan_reads(
606 8 : KeySpace {
607 8 : // If asked for the total key space, plan_reads will give us all the keys in the layer
608 8 : ranges: vec![Key::MIN..Key::MAX],
609 8 : },
610 8 : Some(shard_identity),
611 8 : ctx,
612 8 : )
613 469 : .await?;
614 :
615 8 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
616 8 : let mut key_count = 0;
617 16 : for read in plan.into_iter() {
618 16 : let buf_size = read.size();
619 16 :
620 16 : let buf = BytesMut::with_capacity(buf_size);
621 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
622 :
623 16 : let frozen_buf = blobs_buf.buf.freeze();
624 :
625 262144 : for meta in blobs_buf.blobs.iter() {
626 262144 : let img_buf = frozen_buf.slice(meta.start..meta.end);
627 262144 :
628 262144 : key_count += 1;
629 262144 : writer
630 262144 : .put_image(meta.meta.key, img_buf, ctx)
631 266240 : .await
632 262144 : .context(format!("Storing key {}", meta.meta.key))?;
633 : }
634 : }
635 :
636 8 : Ok(key_count)
637 8 : }
638 :
639 74 : async fn do_reads_and_update_state(
640 74 : &self,
641 74 : reads: Vec<VectoredRead>,
642 74 : reconstruct_state: &mut ValuesReconstructState,
643 74 : ctx: &RequestContext,
644 74 : ) {
645 74 : let max_vectored_read_bytes = self
646 74 : .max_vectored_read_bytes
647 74 : .expect("Layer is loaded with max vectored bytes config")
648 74 : .0
649 74 : .into();
650 74 :
651 74 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
652 1719 : for read in reads.into_iter() {
653 1719 : let buf_size = read.size();
654 1719 :
655 1719 : if buf_size > max_vectored_read_bytes {
656 : // If the read is oversized, it should only contain one key.
657 0 : let offenders = read
658 0 : .blobs_at
659 0 : .as_slice()
660 0 : .iter()
661 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
662 0 : .join(", ");
663 0 : tracing::warn!(
664 0 : "Oversized vectored read ({} > {}) for keys {}",
665 : buf_size,
666 : max_vectored_read_bytes,
667 : offenders
668 : );
669 1719 : }
670 :
671 1719 : let buf = BytesMut::with_capacity(buf_size);
672 1719 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
673 :
674 1719 : match res {
675 1719 : Ok(blobs_buf) => {
676 1719 : let frozen_buf = blobs_buf.buf.freeze();
677 :
678 10370 : for meta in blobs_buf.blobs.iter() {
679 10370 : let img_buf = frozen_buf.slice(meta.start..meta.end);
680 10370 : reconstruct_state.update_key(
681 10370 : &meta.meta.key,
682 10370 : self.lsn,
683 10370 : Value::Image(img_buf),
684 10370 : );
685 10370 : }
686 : }
687 0 : Err(err) => {
688 0 : let kind = err.kind();
689 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
690 0 : reconstruct_state.on_key_error(
691 0 : blob_meta.key,
692 0 : PageReconstructError::from(anyhow!(
693 0 : "Failed to read blobs from virtual file {}: {}",
694 0 : self.file.path,
695 0 : kind
696 0 : )),
697 0 : );
698 0 : }
699 : }
700 : };
701 : }
702 74 : }
703 :
704 64 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
705 64 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
706 64 : let tree_reader =
707 64 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
708 64 : ImageLayerIterator {
709 64 : image_layer: self,
710 64 : ctx,
711 64 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
712 64 : key_values_batch: VecDeque::new(),
713 64 : is_end: false,
714 64 : planner: StreamingVectoredReadPlanner::new(
715 64 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
716 64 : 1024, // The default value. Unit tests might use a different value
717 64 : ),
718 64 : }
719 64 : }
720 : }
721 :
722 : /// A builder object for constructing a new image layer.
723 : ///
724 : /// Usage:
725 : ///
726 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
727 : ///
728 : /// 2. Write the contents by calling `put_page_image` for every key-value
729 : /// pair in the key range.
730 : ///
731 : /// 3. Call `finish`.
732 : ///
733 : struct ImageLayerWriterInner {
734 : conf: &'static PageServerConf,
735 : path: Utf8PathBuf,
736 : timeline_id: TimelineId,
737 : tenant_shard_id: TenantShardId,
738 : key_range: Range<Key>,
739 : lsn: Lsn,
740 :
741 : // Total uncompressed bytes passed into put_image
742 : uncompressed_bytes: u64,
743 :
744 : blob_writer: BlobWriter<false>,
745 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
746 : }
747 :
748 : impl ImageLayerWriterInner {
749 : ///
750 : /// Start building a new image layer.
751 : ///
752 250 : async fn new(
753 250 : conf: &'static PageServerConf,
754 250 : timeline_id: TimelineId,
755 250 : tenant_shard_id: TenantShardId,
756 250 : key_range: &Range<Key>,
757 250 : lsn: Lsn,
758 250 : ctx: &RequestContext,
759 250 : ) -> anyhow::Result<Self> {
760 250 : // Create the file initially with a temporary filename.
761 250 : // We'll atomically rename it to the final name when we're done.
762 250 : let path = ImageLayer::temp_path_for(
763 250 : conf,
764 250 : timeline_id,
765 250 : tenant_shard_id,
766 250 : &ImageLayerName {
767 250 : key_range: key_range.clone(),
768 250 : lsn,
769 250 : },
770 250 : );
771 250 : trace!("creating image layer {}", path);
772 250 : let mut file = {
773 250 : VirtualFile::open_with_options(
774 250 : &path,
775 250 : virtual_file::OpenOptions::new()
776 250 : .write(true)
777 250 : .create_new(true),
778 250 : ctx,
779 250 : )
780 196 : .await?
781 : };
782 : // make room for the header block
783 250 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
784 250 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
785 250 :
786 250 : // Initialize the b-tree index builder
787 250 : let block_buf = BlockBuf::new();
788 250 : let tree_builder = DiskBtreeBuilder::new(block_buf);
789 250 :
790 250 : let writer = Self {
791 250 : conf,
792 250 : path,
793 250 : timeline_id,
794 250 : tenant_shard_id,
795 250 : key_range: key_range.clone(),
796 250 : lsn,
797 250 : tree: tree_builder,
798 250 : blob_writer,
799 250 : uncompressed_bytes: 0,
800 250 : };
801 250 :
802 250 : Ok(writer)
803 250 : }
804 :
805 : ///
806 : /// Write next value to the file.
807 : ///
808 : /// The page versions must be appended in blknum order.
809 : ///
810 537766 : async fn put_image(
811 537766 : &mut self,
812 537766 : key: Key,
813 537766 : img: Bytes,
814 537766 : ctx: &RequestContext,
815 537766 : ) -> anyhow::Result<()> {
816 537766 : ensure!(self.key_range.contains(&key));
817 537766 : let compression = self.conf.image_compression;
818 537766 : self.uncompressed_bytes += img.len() as u64;
819 537766 : let (_img, res) = self
820 537766 : .blob_writer
821 537766 : .write_blob_maybe_compressed(img, ctx, compression)
822 546285 : .await;
823 : // TODO: re-use the buffer for `img` further upstack
824 537766 : let off = res?;
825 :
826 537766 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
827 537766 : key.write_to_byte_slice(&mut keybuf);
828 537766 : self.tree.append(&keybuf, off)?;
829 :
830 537766 : Ok(())
831 537766 : }
832 :
833 : ///
834 : /// Finish writing the image layer.
835 : ///
836 244 : async fn finish(
837 244 : self,
838 244 : timeline: &Arc<Timeline>,
839 244 : ctx: &RequestContext,
840 244 : ) -> anyhow::Result<ResidentLayer> {
841 244 : let index_start_blk =
842 244 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
843 244 :
844 244 : // Calculate compression ratio
845 244 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
846 244 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
847 244 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
848 244 :
849 244 : let mut file = self.blob_writer.into_inner();
850 244 :
851 244 : // Write out the index
852 244 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
853 0 : .await?;
854 244 : let (index_root_blk, block_buf) = self.tree.finish()?;
855 944 : for buf in block_buf.blocks {
856 700 : let (_buf, res) = file.write_all(buf, ctx).await;
857 700 : res?;
858 : }
859 :
860 : // Fill in the summary on blk 0
861 244 : let summary = Summary {
862 244 : magic: IMAGE_FILE_MAGIC,
863 244 : format_version: STORAGE_FORMAT_VERSION,
864 244 : tenant_id: self.tenant_shard_id.tenant_id,
865 244 : timeline_id: self.timeline_id,
866 244 : key_range: self.key_range.clone(),
867 244 : lsn: self.lsn,
868 244 : index_start_blk,
869 244 : index_root_blk,
870 244 : };
871 244 :
872 244 : let mut buf = Vec::with_capacity(PAGE_SZ);
873 244 : // TODO: could use smallvec here but it's a pain with Slice<T>
874 244 : Summary::ser_into(&summary, &mut buf)?;
875 244 : file.seek(SeekFrom::Start(0)).await?;
876 244 : let (_buf, res) = file.write_all(buf, ctx).await;
877 244 : res?;
878 :
879 244 : let metadata = file
880 244 : .metadata()
881 128 : .await
882 244 : .context("get metadata to determine file size")?;
883 :
884 244 : let desc = PersistentLayerDesc::new_img(
885 244 : self.tenant_shard_id,
886 244 : self.timeline_id,
887 244 : self.key_range.clone(),
888 244 : self.lsn,
889 244 : metadata.len(),
890 244 : );
891 244 :
892 244 : // Note: Because we open the file in write-only mode, we cannot
893 244 : // reuse the same VirtualFile for reading later. That's why we don't
894 244 : // set inner.file here. The first read will have to re-open it.
895 244 :
896 244 : // fsync the file
897 244 : file.sync_all().await?;
898 :
899 : // FIXME: why not carry the virtualfile here, it supports renaming?
900 244 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
901 :
902 244 : info!("created image layer {}", layer.local_path());
903 :
904 244 : Ok(layer)
905 244 : }
906 : }
907 :
908 : /// A builder object for constructing a new image layer.
909 : ///
910 : /// Usage:
911 : ///
912 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
913 : ///
914 : /// 2. Write the contents by calling `put_page_image` for every key-value
915 : /// pair in the key range.
916 : ///
917 : /// 3. Call `finish`.
918 : ///
919 : /// # Note
920 : ///
921 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
922 : /// possible for the writer to drop before `finish` is actually called. So this
923 : /// could lead to odd temporary files in the directory, exhausting file system.
924 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
925 : /// implementation that cleans up the temporary file in failure. It's not
926 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
927 : /// out some fields, making it impossible to implement `Drop`.
928 : ///
929 : #[must_use]
930 : pub struct ImageLayerWriter {
931 : inner: Option<ImageLayerWriterInner>,
932 : }
933 :
934 : impl ImageLayerWriter {
935 : ///
936 : /// Start building a new image layer.
937 : ///
938 250 : pub async fn new(
939 250 : conf: &'static PageServerConf,
940 250 : timeline_id: TimelineId,
941 250 : tenant_shard_id: TenantShardId,
942 250 : key_range: &Range<Key>,
943 250 : lsn: Lsn,
944 250 : ctx: &RequestContext,
945 250 : ) -> anyhow::Result<ImageLayerWriter> {
946 250 : Ok(Self {
947 250 : inner: Some(
948 250 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
949 196 : .await?,
950 : ),
951 : })
952 250 : }
953 :
954 : ///
955 : /// Write next value to the file.
956 : ///
957 : /// The page versions must be appended in blknum order.
958 : ///
959 537766 : pub async fn put_image(
960 537766 : &mut self,
961 537766 : key: Key,
962 537766 : img: Bytes,
963 537766 : ctx: &RequestContext,
964 537766 : ) -> anyhow::Result<()> {
965 546285 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
966 537766 : }
967 :
968 : ///
969 : /// Finish writing the image layer.
970 : ///
971 244 : pub(crate) async fn finish(
972 244 : mut self,
973 244 : timeline: &Arc<Timeline>,
974 244 : ctx: &RequestContext,
975 244 : ) -> anyhow::Result<super::ResidentLayer> {
976 732 : self.inner.take().unwrap().finish(timeline, ctx).await
977 244 : }
978 : }
979 :
980 : impl Drop for ImageLayerWriter {
981 250 : fn drop(&mut self) {
982 250 : if let Some(inner) = self.inner.take() {
983 6 : inner.blob_writer.into_inner().remove();
984 244 : }
985 250 : }
986 : }
987 :
988 : pub struct ImageLayerIterator<'a> {
989 : image_layer: &'a ImageLayerInner,
990 : ctx: &'a RequestContext,
991 : planner: StreamingVectoredReadPlanner,
992 : index_iter: DiskBtreeIterator<'a>,
993 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
994 : is_end: bool,
995 : }
996 :
997 : impl<'a> ImageLayerIterator<'a> {
998 : /// Retrieve a batch of key-value pairs into the iterator buffer.
999 18950 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1000 18950 : assert!(self.key_values_batch.is_empty());
1001 18950 : assert!(!self.is_end);
1002 :
1003 18950 : let plan = loop {
1004 28406 : if let Some(res) = self.index_iter.next().await {
1005 28370 : let (raw_key, offset) = res?;
1006 28370 : if let Some(batch_plan) = self.planner.handle(
1007 28370 : Key::from_slice(&raw_key[..KEY_SIZE]),
1008 28370 : self.image_layer.lsn,
1009 28370 : offset,
1010 28370 : ) {
1011 18914 : break batch_plan;
1012 9456 : }
1013 : } else {
1014 36 : self.is_end = true;
1015 36 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1016 36 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1017 36 : break item;
1018 : } else {
1019 0 : return Ok(()); // TODO: a test case on empty iterator
1020 : }
1021 : }
1022 : };
1023 18950 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1024 18950 : let mut next_batch = std::collections::VecDeque::new();
1025 18950 : let buf_size = plan.size();
1026 18950 : let buf = BytesMut::with_capacity(buf_size);
1027 18950 : let blobs_buf = vectored_blob_reader
1028 18950 : .read_blobs(&plan, buf, self.ctx)
1029 9623 : .await?;
1030 18950 : let frozen_buf: Bytes = blobs_buf.buf.freeze();
1031 28342 : for meta in blobs_buf.blobs.iter() {
1032 28342 : let img_buf = frozen_buf.slice(meta.start..meta.end);
1033 28342 : next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
1034 28342 : }
1035 18950 : self.key_values_batch = next_batch;
1036 18950 : Ok(())
1037 18950 : }
1038 :
1039 28116 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1040 28116 : if self.key_values_batch.is_empty() {
1041 18910 : if self.is_end {
1042 44 : return Ok(None);
1043 18866 : }
1044 18866 : self.next_batch().await?;
1045 9206 : }
1046 28072 : Ok(Some(
1047 28072 : self.key_values_batch
1048 28072 : .pop_front()
1049 28072 : .expect("should not be empty"),
1050 28072 : ))
1051 28116 : }
1052 : }
1053 :
1054 : #[cfg(test)]
1055 : mod test {
1056 : use std::{sync::Arc, time::Duration};
1057 :
1058 : use bytes::Bytes;
1059 : use itertools::Itertools;
1060 : use pageserver_api::{
1061 : key::Key,
1062 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1063 : };
1064 : use utils::{
1065 : generation::Generation,
1066 : id::{TenantId, TimelineId},
1067 : lsn::Lsn,
1068 : };
1069 :
1070 : use crate::{
1071 : context::RequestContext,
1072 : repository::Value,
1073 : tenant::{
1074 : config::TenantConf,
1075 : harness::{TenantHarness, TIMELINE_ID},
1076 : storage_layer::ResidentLayer,
1077 : vectored_blob_io::StreamingVectoredReadPlanner,
1078 : Tenant, Timeline,
1079 : },
1080 : DEFAULT_PG_VERSION,
1081 : };
1082 :
1083 : use super::{ImageLayerIterator, ImageLayerWriter};
1084 :
1085 : #[tokio::test]
1086 2 : async fn image_layer_rewrite() {
1087 2 : let tenant_conf = TenantConf {
1088 2 : gc_period: Duration::ZERO,
1089 2 : compaction_period: Duration::ZERO,
1090 2 : ..TenantConf::default()
1091 2 : };
1092 2 : let tenant_id = TenantId::generate();
1093 2 : let mut gen = Generation::new(0xdead0001);
1094 10 : let mut get_next_gen = || {
1095 10 : let ret = gen;
1096 10 : gen = gen.next();
1097 10 : ret
1098 10 : };
1099 2 : // The LSN at which we will create an image layer to filter
1100 2 : let lsn = Lsn(0xdeadbeef0000);
1101 2 : let timeline_id = TimelineId::generate();
1102 2 :
1103 2 : //
1104 2 : // Create an unsharded parent with a layer.
1105 2 : //
1106 2 :
1107 2 : let harness = TenantHarness::create_custom(
1108 2 : "test_image_layer_rewrite--parent",
1109 2 : tenant_conf.clone(),
1110 2 : tenant_id,
1111 2 : ShardIdentity::unsharded(),
1112 2 : get_next_gen(),
1113 2 : )
1114 2 : .await
1115 2 : .unwrap();
1116 8 : let (tenant, ctx) = harness.load().await;
1117 2 : let timeline = tenant
1118 2 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1119 6 : .await
1120 2 : .unwrap();
1121 2 :
1122 2 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1123 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1124 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1125 2 : let range = input_start..input_end;
1126 2 :
1127 2 : // Build an image layer to filter
1128 2 : let resident = {
1129 2 : let mut writer = ImageLayerWriter::new(
1130 2 : harness.conf,
1131 2 : timeline_id,
1132 2 : harness.tenant_shard_id,
1133 2 : &range,
1134 2 : lsn,
1135 2 : &ctx,
1136 2 : )
1137 2 : .await
1138 2 : .unwrap();
1139 2 :
1140 2 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1141 2 : let mut key = range.start;
1142 262146 : while key < range.end {
1143 266239 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1144 262144 :
1145 262144 : key = key.next();
1146 2 : }
1147 119 : writer.finish(&timeline, &ctx).await.unwrap()
1148 2 : };
1149 2 : let original_size = resident.metadata().file_size;
1150 2 :
1151 2 : //
1152 2 : // Create child shards and do the rewrite, exercising filter().
1153 2 : // TODO: abstraction in TenantHarness for splits.
1154 2 : //
1155 2 :
1156 2 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1157 2 : // range, middle of key range.
1158 2 : let shard_count = ShardCount::new(4);
1159 8 : for shard_number in 0..shard_count.count() {
1160 2 : //
1161 2 : // mimic the shard split
1162 2 : //
1163 8 : let shard_identity = ShardIdentity::new(
1164 8 : ShardNumber(shard_number),
1165 8 : shard_count,
1166 8 : ShardStripeSize(0x8000),
1167 8 : )
1168 8 : .unwrap();
1169 8 : let harness = TenantHarness::create_custom(
1170 8 : Box::leak(Box::new(format!(
1171 8 : "test_image_layer_rewrite--child{}",
1172 8 : shard_identity.shard_slug()
1173 8 : ))),
1174 8 : tenant_conf.clone(),
1175 8 : tenant_id,
1176 8 : shard_identity,
1177 8 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1178 8 : // But here, all we care about is that the gen number is unique.
1179 8 : get_next_gen(),
1180 8 : )
1181 2 : .await
1182 8 : .unwrap();
1183 32 : let (tenant, ctx) = harness.load().await;
1184 8 : let timeline = tenant
1185 8 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1186 20 : .await
1187 8 : .unwrap();
1188 2 :
1189 2 : //
1190 2 : // use filter() and make assertions
1191 2 : //
1192 2 :
1193 8 : let mut filtered_writer = ImageLayerWriter::new(
1194 8 : harness.conf,
1195 8 : timeline_id,
1196 8 : harness.tenant_shard_id,
1197 8 : &range,
1198 8 : lsn,
1199 8 : &ctx,
1200 8 : )
1201 4 : .await
1202 8 : .unwrap();
1203 2 :
1204 8 : let wrote_keys = resident
1205 8 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1206 266719 : .await
1207 8 : .unwrap();
1208 8 : let replacement = if wrote_keys > 0 {
1209 129 : Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
1210 2 : } else {
1211 2 : None
1212 2 : };
1213 2 :
1214 2 : // This exact size and those below will need updating as/when the layer encoding changes, but
1215 2 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1216 8 : assert_eq!(original_size, 1597440);
1217 2 :
1218 8 : match shard_number {
1219 2 : 0 => {
1220 2 : // We should have written out just one stripe for our shard identity
1221 2 : assert_eq!(wrote_keys, 0x8000);
1222 2 : let replacement = replacement.unwrap();
1223 2 :
1224 2 : // We should have dropped some of the data
1225 2 : assert!(replacement.metadata().file_size < original_size);
1226 2 : assert!(replacement.metadata().file_size > 0);
1227 2 :
1228 2 : // Assert that we dropped ~3/4 of the data.
1229 2 : assert_eq!(replacement.metadata().file_size, 417792);
1230 2 : }
1231 2 : 1 => {
1232 2 : // Shard 1 has no keys in our input range
1233 2 : assert_eq!(wrote_keys, 0x0);
1234 2 : assert!(replacement.is_none());
1235 2 : }
1236 2 : 2 => {
1237 2 : // Shard 2 has one stripes in the input range
1238 2 : assert_eq!(wrote_keys, 0x8000);
1239 2 : let replacement = replacement.unwrap();
1240 2 : assert!(replacement.metadata().file_size < original_size);
1241 2 : assert!(replacement.metadata().file_size > 0);
1242 2 : assert_eq!(replacement.metadata().file_size, 417792);
1243 2 : }
1244 2 : 3 => {
1245 2 : // Shard 3 has two stripes in the input range
1246 2 : assert_eq!(wrote_keys, 0x10000);
1247 2 : let replacement = replacement.unwrap();
1248 2 : assert!(replacement.metadata().file_size < original_size);
1249 2 : assert!(replacement.metadata().file_size > 0);
1250 2 : assert_eq!(replacement.metadata().file_size, 811008);
1251 2 : }
1252 2 : _ => unreachable!(),
1253 2 : }
1254 2 : }
1255 2 : }
1256 :
1257 2 : async fn produce_image_layer(
1258 2 : tenant: &Tenant,
1259 2 : tline: &Arc<Timeline>,
1260 2 : mut images: Vec<(Key, Bytes)>,
1261 2 : lsn: Lsn,
1262 2 : ctx: &RequestContext,
1263 2 : ) -> anyhow::Result<ResidentLayer> {
1264 2 : images.sort();
1265 2 : let (key_start, _) = images.first().unwrap();
1266 2 : let (key_last, _) = images.last().unwrap();
1267 2 : let key_end = key_last.next();
1268 2 : let key_range = *key_start..key_end;
1269 2 : let mut writer = ImageLayerWriter::new(
1270 2 : tenant.conf,
1271 2 : tline.timeline_id,
1272 2 : tenant.tenant_shard_id,
1273 2 : &key_range,
1274 2 : lsn,
1275 2 : ctx,
1276 2 : )
1277 1 : .await?;
1278 :
1279 2002 : for (key, img) in images {
1280 2031 : writer.put_image(key, img, ctx).await?;
1281 : }
1282 4 : let img_layer = writer.finish(tline, ctx).await?;
1283 :
1284 2 : Ok::<_, anyhow::Error>(img_layer)
1285 2 : }
1286 :
1287 28 : async fn assert_img_iter_equal(
1288 28 : img_iter: &mut ImageLayerIterator<'_>,
1289 28 : expect: &[(Key, Bytes)],
1290 28 : expect_lsn: Lsn,
1291 28 : ) {
1292 28 : let mut expect_iter = expect.iter();
1293 : loop {
1294 28028 : let o1 = img_iter.next().await.unwrap();
1295 28028 : let o2 = expect_iter.next();
1296 28028 : match (o1, o2) {
1297 28 : (None, None) => break,
1298 28000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1299 28000 : let Value::Image(i1) = v1 else {
1300 0 : panic!("expect Value::Image")
1301 : };
1302 28000 : assert_eq!(&k1, k2);
1303 28000 : assert_eq!(l1, expect_lsn);
1304 28000 : assert_eq!(&i1, i2);
1305 : }
1306 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1307 : }
1308 : }
1309 28 : }
1310 :
1311 : #[tokio::test]
1312 2 : async fn image_layer_iterator() {
1313 2 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1314 8 : let (tenant, ctx) = harness.load().await;
1315 2 :
1316 2 : let tline = tenant
1317 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1318 6 : .await
1319 2 : .unwrap();
1320 2 :
1321 2000 : fn get_key(id: u32) -> Key {
1322 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1323 2000 : key.field6 = id;
1324 2000 : key
1325 2000 : }
1326 2 : const N: usize = 1000;
1327 2 : let test_imgs = (0..N)
1328 2000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1329 2 : .collect_vec();
1330 2 : let resident_layer =
1331 2 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1332 2036 : .await
1333 2 : .unwrap();
1334 2 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1335 6 : for max_read_size in [1, 1024] {
1336 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1337 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1338 28 : // Test if the batch size is correctly determined
1339 28 : let mut iter = img_layer.iter(&ctx);
1340 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1341 28 : let mut num_items = 0;
1342 112 : for _ in 0..3 {
1343 84 : iter.next_batch().await.unwrap();
1344 84 : num_items += iter.key_values_batch.len();
1345 84 : if max_read_size == 1 {
1346 2 : // every key should be a batch b/c the value is larger than max_read_size
1347 42 : assert_eq!(iter.key_values_batch.len(), 1);
1348 2 : } else {
1349 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
1350 2 : }
1351 84 : if num_items >= N {
1352 2 : break;
1353 84 : }
1354 84 : iter.key_values_batch.clear();
1355 2 : }
1356 2 : // Test if the result is correct
1357 28 : let mut iter = img_layer.iter(&ctx);
1358 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1359 9576 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1360 2 : }
1361 2 : }
1362 2 : }
1363 : }
|