Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{
33 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
34 : };
35 : use crate::tenant::timeline::GetVectoredError;
36 : use crate::tenant::vectored_blob_io::{
37 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
38 : VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
42 : use crate::virtual_file::{self, VirtualFile};
43 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
44 : use anyhow::{anyhow, bail, ensure, Context, Result};
45 : use bytes::{Bytes, BytesMut};
46 : use camino::{Utf8Path, Utf8PathBuf};
47 : use hex;
48 : use itertools::Itertools;
49 : use pageserver_api::keyspace::KeySpace;
50 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
51 : use rand::{distributions::Alphanumeric, Rng};
52 : use serde::{Deserialize, Serialize};
53 : use std::collections::VecDeque;
54 : use std::fs::File;
55 : use std::io::SeekFrom;
56 : use std::ops::Range;
57 : use std::os::unix::prelude::FileExt;
58 : use std::str::FromStr;
59 : use std::sync::Arc;
60 : use tokio::sync::OnceCell;
61 : use tokio_stream::StreamExt;
62 : use tracing::*;
63 :
64 : use utils::{
65 : bin_ser::BeSer,
66 : id::{TenantId, TimelineId},
67 : lsn::Lsn,
68 : };
69 :
70 : use super::layer_name::ImageLayerName;
71 : use super::{
72 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
73 : };
74 :
75 : ///
76 : /// Header stored in the beginning of the file
77 : ///
78 : /// After this comes the 'values' part, starting on block 1. After that,
79 : /// the 'index' starts at the block indicated by 'index_start_blk'
80 : ///
81 98 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
82 : pub struct Summary {
83 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
84 : pub magic: u16,
85 : pub format_version: u16,
86 :
87 : pub tenant_id: TenantId,
88 : pub timeline_id: TimelineId,
89 : pub key_range: Range<Key>,
90 : pub lsn: Lsn,
91 :
92 : /// Block number where the 'index' part of the file begins.
93 : pub index_start_blk: u32,
94 : /// Block within the 'index', where the B-tree root page is stored
95 : pub index_root_blk: u32,
96 : // the 'values' part starts after the summary header, on block 1.
97 : }
98 :
99 : impl From<&ImageLayer> for Summary {
100 0 : fn from(layer: &ImageLayer) -> Self {
101 0 : Self::expected(
102 0 : layer.desc.tenant_shard_id.tenant_id,
103 0 : layer.desc.timeline_id,
104 0 : layer.desc.key_range.clone(),
105 0 : layer.lsn,
106 0 : )
107 0 : }
108 : }
109 :
110 : impl Summary {
111 98 : pub(super) fn expected(
112 98 : tenant_id: TenantId,
113 98 : timeline_id: TimelineId,
114 98 : key_range: Range<Key>,
115 98 : lsn: Lsn,
116 98 : ) -> Self {
117 98 : Self {
118 98 : magic: IMAGE_FILE_MAGIC,
119 98 : format_version: STORAGE_FORMAT_VERSION,
120 98 : tenant_id,
121 98 : timeline_id,
122 98 : key_range,
123 98 : lsn,
124 98 :
125 98 : index_start_blk: 0,
126 98 : index_root_blk: 0,
127 98 : }
128 98 : }
129 : }
130 :
131 : /// This is used only from `pagectl`. Within pageserver, all layers are
132 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
133 : pub struct ImageLayer {
134 : path: Utf8PathBuf,
135 : pub desc: PersistentLayerDesc,
136 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
137 : pub lsn: Lsn,
138 : inner: OnceCell<ImageLayerInner>,
139 : }
140 :
141 : impl std::fmt::Debug for ImageLayer {
142 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 0 : use super::RangeDisplayDebug;
144 0 :
145 0 : f.debug_struct("ImageLayer")
146 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
147 0 : .field("file_size", &self.desc.file_size)
148 0 : .field("lsn", &self.lsn)
149 0 : .field("inner", &self.inner)
150 0 : .finish()
151 0 : }
152 : }
153 :
154 : /// ImageLayer is the in-memory data structure associated with an on-disk image
155 : /// file.
156 : pub struct ImageLayerInner {
157 : // values copied from summary
158 : index_start_blk: u32,
159 : index_root_blk: u32,
160 :
161 : key_range: Range<Key>,
162 : lsn: Lsn,
163 :
164 : file: VirtualFile,
165 : file_id: FileId,
166 :
167 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
168 : }
169 :
170 : impl std::fmt::Debug for ImageLayerInner {
171 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 0 : f.debug_struct("ImageLayerInner")
173 0 : .field("index_start_blk", &self.index_start_blk)
174 0 : .field("index_root_blk", &self.index_root_blk)
175 0 : .finish()
176 0 : }
177 : }
178 :
179 : impl ImageLayerInner {
180 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
181 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
182 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
183 0 : self.index_start_blk,
184 0 : self.index_root_blk,
185 0 : block_reader,
186 0 : );
187 0 :
188 0 : tree_reader.dump().await?;
189 :
190 0 : tree_reader
191 0 : .visit(
192 0 : &[0u8; KEY_SIZE],
193 0 : VisitDirection::Forwards,
194 0 : |key, value| {
195 0 : println!("key: {} offset {}", hex::encode(key), value);
196 0 : true
197 0 : },
198 0 : ctx,
199 0 : )
200 0 : .await?;
201 :
202 0 : Ok(())
203 0 : }
204 : }
205 :
206 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
207 : impl std::fmt::Display for ImageLayer {
208 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 0 : write!(f, "{}", self.layer_desc().short_id())
210 0 : }
211 : }
212 :
213 : impl AsLayerDesc for ImageLayer {
214 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
215 0 : &self.desc
216 0 : }
217 : }
218 :
219 : impl ImageLayer {
220 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
221 0 : self.desc.dump();
222 0 :
223 0 : if !verbose {
224 0 : return Ok(());
225 0 : }
226 :
227 0 : let inner = self.load(ctx).await?;
228 :
229 0 : inner.dump(ctx).await?;
230 :
231 0 : Ok(())
232 0 : }
233 :
234 298 : fn temp_path_for(
235 298 : conf: &PageServerConf,
236 298 : timeline_id: TimelineId,
237 298 : tenant_shard_id: TenantShardId,
238 298 : fname: &ImageLayerName,
239 298 : ) -> Utf8PathBuf {
240 298 : let rand_string: String = rand::thread_rng()
241 298 : .sample_iter(&Alphanumeric)
242 298 : .take(8)
243 298 : .map(char::from)
244 298 : .collect();
245 298 :
246 298 : conf.timeline_path(&tenant_shard_id, &timeline_id)
247 298 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
248 298 : }
249 :
250 : ///
251 : /// Open the underlying file and read the metadata into memory, if it's
252 : /// not loaded already.
253 : ///
254 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
255 0 : self.inner
256 0 : .get_or_try_init(|| self.load_inner(ctx))
257 0 : .await
258 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
259 0 : }
260 :
261 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
262 0 : let path = self.path();
263 :
264 0 : let loaded =
265 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
266 :
267 : // not production code
268 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
269 0 : let expected_layer_name = self.layer_desc().layer_name();
270 0 :
271 0 : if actual_layer_name != expected_layer_name {
272 0 : println!("warning: filename does not match what is expected from in-file summary");
273 0 : println!("actual: {:?}", actual_layer_name.to_string());
274 0 : println!("expected: {:?}", expected_layer_name.to_string());
275 0 : }
276 :
277 0 : Ok(loaded)
278 0 : }
279 :
280 : /// Create an ImageLayer struct representing an existing file on disk.
281 : ///
282 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
283 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
284 0 : let mut summary_buf = vec![0; PAGE_SZ];
285 0 : file.read_exact_at(&mut summary_buf, 0)?;
286 0 : let summary = Summary::des_prefix(&summary_buf)?;
287 0 : let metadata = file
288 0 : .metadata()
289 0 : .context("get file metadata to determine size")?;
290 :
291 : // This function is never used for constructing layers in a running pageserver,
292 : // so it does not need an accurate TenantShardId.
293 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
294 0 :
295 0 : Ok(ImageLayer {
296 0 : path: path.to_path_buf(),
297 0 : desc: PersistentLayerDesc::new_img(
298 0 : tenant_shard_id,
299 0 : summary.timeline_id,
300 0 : summary.key_range,
301 0 : summary.lsn,
302 0 : metadata.len(),
303 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
304 0 : lsn: summary.lsn,
305 0 : inner: OnceCell::new(),
306 0 : })
307 0 : }
308 :
309 0 : fn path(&self) -> Utf8PathBuf {
310 0 : self.path.clone()
311 0 : }
312 : }
313 :
314 0 : #[derive(thiserror::Error, Debug)]
315 : pub enum RewriteSummaryError {
316 : #[error("magic mismatch")]
317 : MagicMismatch,
318 : #[error(transparent)]
319 : Other(#[from] anyhow::Error),
320 : }
321 :
322 : impl From<std::io::Error> for RewriteSummaryError {
323 0 : fn from(e: std::io::Error) -> Self {
324 0 : Self::Other(anyhow::anyhow!(e))
325 0 : }
326 : }
327 :
328 : impl ImageLayer {
329 0 : pub async fn rewrite_summary<F>(
330 0 : path: &Utf8Path,
331 0 : rewrite: F,
332 0 : ctx: &RequestContext,
333 0 : ) -> Result<(), RewriteSummaryError>
334 0 : where
335 0 : F: Fn(Summary) -> Summary,
336 0 : {
337 0 : let mut file = VirtualFile::open_with_options(
338 0 : path,
339 0 : virtual_file::OpenOptions::new().read(true).write(true),
340 0 : ctx,
341 0 : )
342 0 : .await
343 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
344 0 : let file_id = page_cache::next_file_id();
345 0 : let block_reader = FileBlockReader::new(&file, file_id);
346 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
347 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
348 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
349 0 : return Err(RewriteSummaryError::MagicMismatch);
350 0 : }
351 0 :
352 0 : let new_summary = rewrite(actual_summary);
353 0 :
354 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
355 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
356 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
357 0 : file.seek(SeekFrom::Start(0)).await?;
358 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
359 0 : res?;
360 0 : Ok(())
361 0 : }
362 : }
363 :
364 : impl ImageLayerInner {
365 26 : pub(crate) fn key_range(&self) -> &Range<Key> {
366 26 : &self.key_range
367 26 : }
368 :
369 26 : pub(crate) fn lsn(&self) -> Lsn {
370 26 : self.lsn
371 26 : }
372 :
373 98 : pub(super) async fn load(
374 98 : path: &Utf8Path,
375 98 : lsn: Lsn,
376 98 : summary: Option<Summary>,
377 98 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
378 98 : ctx: &RequestContext,
379 98 : ) -> anyhow::Result<Self> {
380 98 : let file = VirtualFile::open(path, ctx)
381 49 : .await
382 98 : .context("open layer file")?;
383 98 : let file_id = page_cache::next_file_id();
384 98 : let block_reader = FileBlockReader::new(&file, file_id);
385 98 : let summary_blk = block_reader
386 98 : .read_blk(0, ctx)
387 51 : .await
388 98 : .context("read first block")?;
389 :
390 : // length is the only way how this could fail, so it's not actually likely at all unless
391 : // read_blk returns wrong sized block.
392 : //
393 : // TODO: confirm and make this into assertion
394 98 : let actual_summary =
395 98 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
396 :
397 98 : if let Some(mut expected_summary) = summary {
398 : // production code path
399 98 : expected_summary.index_start_blk = actual_summary.index_start_blk;
400 98 : expected_summary.index_root_blk = actual_summary.index_root_blk;
401 98 : // mask out the timeline_id, but still require the layers to be from the same tenant
402 98 : expected_summary.timeline_id = actual_summary.timeline_id;
403 98 :
404 98 : if actual_summary != expected_summary {
405 0 : bail!(
406 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
407 0 : actual_summary,
408 0 : expected_summary
409 0 : );
410 98 : }
411 0 : }
412 :
413 98 : Ok(ImageLayerInner {
414 98 : index_start_blk: actual_summary.index_start_blk,
415 98 : index_root_blk: actual_summary.index_root_blk,
416 98 : lsn,
417 98 : file,
418 98 : file_id,
419 98 : max_vectored_read_bytes,
420 98 : key_range: actual_summary.key_range,
421 98 : })
422 98 : }
423 :
424 : // Look up the keys in the provided keyspace and update
425 : // the reconstruct state with whatever is found.
426 7738 : pub(super) async fn get_values_reconstruct_data(
427 7738 : &self,
428 7738 : keyspace: KeySpace,
429 7738 : reconstruct_state: &mut ValuesReconstructState,
430 7738 : ctx: &RequestContext,
431 7738 : ) -> Result<(), GetVectoredError> {
432 7738 : let reads = self
433 7738 : .plan_reads(keyspace, None, ctx)
434 724 : .await
435 7738 : .map_err(GetVectoredError::Other)?;
436 :
437 7738 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
438 4745 : .await;
439 :
440 7738 : reconstruct_state.on_image_layer_visited(&self.key_range);
441 7738 :
442 7738 : Ok(())
443 7738 : }
444 :
445 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
446 0 : pub(super) async fn load_key_values(
447 0 : &self,
448 0 : ctx: &RequestContext,
449 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
450 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
451 0 : let tree_reader =
452 0 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
453 0 : let mut result = Vec::new();
454 0 : let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
455 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
456 0 : let cursor = block_reader.block_cursor();
457 0 : while let Some(item) = stream.next().await {
458 : // TODO: dedup code with get_reconstruct_value
459 0 : let (raw_key, offset) = item?;
460 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
461 : // TODO: ctx handling and sharding
462 0 : let blob = cursor
463 0 : .read_blob(offset, ctx)
464 0 : .await
465 0 : .with_context(|| format!("failed to read value from offset {}", offset))?;
466 0 : let value = Bytes::from(blob);
467 0 : result.push((key, self.lsn, Value::Image(value)));
468 : }
469 0 : Ok(result)
470 0 : }
471 :
472 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
473 : /// and the keys in this layer.
474 : ///
475 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
476 : /// this shard.
477 7746 : async fn plan_reads(
478 7746 : &self,
479 7746 : keyspace: KeySpace,
480 7746 : shard_identity: Option<&ShardIdentity>,
481 7746 : ctx: &RequestContext,
482 7746 : ) -> anyhow::Result<Vec<VectoredRead>> {
483 7746 : let mut planner = VectoredReadPlanner::new(
484 7746 : self.max_vectored_read_bytes
485 7746 : .expect("Layer is loaded with max vectored bytes config")
486 7746 : .0
487 7746 : .into(),
488 7746 : );
489 7746 :
490 7746 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
491 7746 : let tree_reader =
492 7746 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
493 7746 :
494 7746 : let ctx = RequestContextBuilder::extend(ctx)
495 7746 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
496 7746 : .build();
497 :
498 29387 : for range in keyspace.ranges.iter() {
499 29387 : let mut range_end_handled = false;
500 29387 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
501 29387 : range.start.write_to_byte_slice(&mut search_key);
502 29387 :
503 29387 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
504 29387 : let mut index_stream = std::pin::pin!(index_stream);
505 :
506 1096073 : while let Some(index_entry) = index_stream.next().await {
507 1095882 : let (raw_key, offset) = index_entry?;
508 :
509 1095882 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
510 1095882 : assert!(key >= range.start);
511 :
512 1095882 : let flag = if let Some(shard_identity) = shard_identity {
513 1048576 : if shard_identity.is_key_disposable(&key) {
514 786432 : BlobFlag::Ignore
515 : } else {
516 262144 : BlobFlag::None
517 : }
518 : } else {
519 47306 : BlobFlag::None
520 : };
521 :
522 1095882 : if key >= range.end {
523 29196 : planner.handle_range_end(offset);
524 29196 : range_end_handled = true;
525 29196 : break;
526 1066686 : } else {
527 1066686 : planner.handle(key, self.lsn, offset, flag);
528 1066686 : }
529 : }
530 :
531 29387 : if !range_end_handled {
532 191 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
533 191 : planner.handle_range_end(payload_end);
534 29196 : }
535 : }
536 :
537 7746 : Ok(planner.finish())
538 7746 : }
539 :
540 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
541 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
542 8 : pub(super) async fn filter(
543 8 : &self,
544 8 : shard_identity: &ShardIdentity,
545 8 : writer: &mut ImageLayerWriter,
546 8 : ctx: &RequestContext,
547 8 : ) -> anyhow::Result<usize> {
548 : // Fragment the range into the regions owned by this ShardIdentity
549 8 : let plan = self
550 8 : .plan_reads(
551 8 : KeySpace {
552 8 : // If asked for the total key space, plan_reads will give us all the keys in the layer
553 8 : ranges: vec![Key::MIN..Key::MAX],
554 8 : },
555 8 : Some(shard_identity),
556 8 : ctx,
557 8 : )
558 469 : .await?;
559 :
560 8 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
561 8 : let mut key_count = 0;
562 16 : for read in plan.into_iter() {
563 16 : let buf_size = read.size();
564 16 :
565 16 : let buf = BytesMut::with_capacity(buf_size);
566 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
567 :
568 16 : let frozen_buf = blobs_buf.buf.freeze();
569 :
570 262144 : for meta in blobs_buf.blobs.iter() {
571 262144 : let img_buf = frozen_buf.slice(meta.start..meta.end);
572 262144 :
573 262144 : key_count += 1;
574 262144 : writer
575 262144 : .put_image(meta.meta.key, img_buf, ctx)
576 266240 : .await
577 262144 : .context(format!("Storing key {}", meta.meta.key))?;
578 : }
579 : }
580 :
581 8 : Ok(key_count)
582 8 : }
583 :
584 7738 : async fn do_reads_and_update_state(
585 7738 : &self,
586 7738 : reads: Vec<VectoredRead>,
587 7738 : reconstruct_state: &mut ValuesReconstructState,
588 7738 : ctx: &RequestContext,
589 7738 : ) {
590 7738 : let max_vectored_read_bytes = self
591 7738 : .max_vectored_read_bytes
592 7738 : .expect("Layer is loaded with max vectored bytes config")
593 7738 : .0
594 7738 : .into();
595 7738 :
596 7738 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
597 9385 : for read in reads.into_iter() {
598 9385 : let buf_size = read.size();
599 9385 :
600 9385 : if buf_size > max_vectored_read_bytes {
601 : // If the read is oversized, it should only contain one key.
602 0 : let offenders = read
603 0 : .blobs_at
604 0 : .as_slice()
605 0 : .iter()
606 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
607 0 : .join(", ");
608 0 : tracing::warn!(
609 0 : "Oversized vectored read ({} > {}) for keys {}",
610 : buf_size,
611 : max_vectored_read_bytes,
612 : offenders
613 : );
614 9385 : }
615 :
616 9385 : let buf = BytesMut::with_capacity(buf_size);
617 9385 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
618 :
619 9385 : match res {
620 9385 : Ok(blobs_buf) => {
621 9385 : let frozen_buf = blobs_buf.buf.freeze();
622 :
623 18110 : for meta in blobs_buf.blobs.iter() {
624 18110 : let img_buf = frozen_buf.slice(meta.start..meta.end);
625 18110 : reconstruct_state.update_key(
626 18110 : &meta.meta.key,
627 18110 : self.lsn,
628 18110 : Value::Image(img_buf),
629 18110 : );
630 18110 : }
631 : }
632 0 : Err(err) => {
633 0 : let kind = err.kind();
634 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
635 0 : reconstruct_state.on_key_error(
636 0 : blob_meta.key,
637 0 : PageReconstructError::from(anyhow!(
638 0 : "Failed to read blobs from virtual file {}: {}",
639 0 : self.file.path,
640 0 : kind
641 0 : )),
642 0 : );
643 0 : }
644 : }
645 : };
646 : }
647 7738 : }
648 :
649 82 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
650 82 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
651 82 : let tree_reader =
652 82 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
653 82 : ImageLayerIterator {
654 82 : image_layer: self,
655 82 : ctx,
656 82 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
657 82 : key_values_batch: VecDeque::new(),
658 82 : is_end: false,
659 82 : planner: StreamingVectoredReadPlanner::new(
660 82 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
661 82 : 1024, // The default value. Unit tests might use a different value
662 82 : ),
663 82 : }
664 82 : }
665 : }
666 :
667 : /// A builder object for constructing a new image layer.
668 : ///
669 : /// Usage:
670 : ///
671 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
672 : ///
673 : /// 2. Write the contents by calling `put_page_image` for every key-value
674 : /// pair in the key range.
675 : ///
676 : /// 3. Call `finish`.
677 : ///
678 : struct ImageLayerWriterInner {
679 : conf: &'static PageServerConf,
680 : path: Utf8PathBuf,
681 : timeline_id: TimelineId,
682 : tenant_shard_id: TenantShardId,
683 : key_range: Range<Key>,
684 : lsn: Lsn,
685 :
686 : // Total uncompressed bytes passed into put_image
687 : uncompressed_bytes: u64,
688 :
689 : // Like `uncompressed_bytes`,
690 : // but only of images we might consider for compression
691 : uncompressed_bytes_eligible: u64,
692 :
693 : // Like `uncompressed_bytes`, but only of images
694 : // where we have chosen their compressed form
695 : uncompressed_bytes_chosen: u64,
696 :
697 : // Number of keys in the layer.
698 : num_keys: usize,
699 :
700 : blob_writer: BlobWriter<false>,
701 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
702 :
703 : #[cfg_attr(not(feature = "testing"), allow(dead_code))]
704 : last_written_key: Key,
705 : }
706 :
707 : impl ImageLayerWriterInner {
708 12 : fn size(&self) -> u64 {
709 12 : self.tree.borrow_writer().size() + self.blob_writer.size()
710 12 : }
711 :
712 : ///
713 : /// Start building a new image layer.
714 : ///
715 298 : async fn new(
716 298 : conf: &'static PageServerConf,
717 298 : timeline_id: TimelineId,
718 298 : tenant_shard_id: TenantShardId,
719 298 : key_range: &Range<Key>,
720 298 : lsn: Lsn,
721 298 : ctx: &RequestContext,
722 298 : ) -> anyhow::Result<Self> {
723 298 : // Create the file initially with a temporary filename.
724 298 : // We'll atomically rename it to the final name when we're done.
725 298 : let path = ImageLayer::temp_path_for(
726 298 : conf,
727 298 : timeline_id,
728 298 : tenant_shard_id,
729 298 : &ImageLayerName {
730 298 : key_range: key_range.clone(),
731 298 : lsn,
732 298 : },
733 298 : );
734 298 : trace!("creating image layer {}", path);
735 298 : let mut file = {
736 298 : VirtualFile::open_with_options(
737 298 : &path,
738 298 : virtual_file::OpenOptions::new()
739 298 : .write(true)
740 298 : .create_new(true),
741 298 : ctx,
742 298 : )
743 227 : .await?
744 : };
745 : // make room for the header block
746 298 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
747 298 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
748 298 :
749 298 : // Initialize the b-tree index builder
750 298 : let block_buf = BlockBuf::new();
751 298 : let tree_builder = DiskBtreeBuilder::new(block_buf);
752 298 :
753 298 : let writer = Self {
754 298 : conf,
755 298 : path,
756 298 : timeline_id,
757 298 : tenant_shard_id,
758 298 : key_range: key_range.clone(),
759 298 : lsn,
760 298 : tree: tree_builder,
761 298 : blob_writer,
762 298 : uncompressed_bytes: 0,
763 298 : uncompressed_bytes_eligible: 0,
764 298 : uncompressed_bytes_chosen: 0,
765 298 : num_keys: 0,
766 298 : last_written_key: Key::MIN,
767 298 : };
768 298 :
769 298 : Ok(writer)
770 298 : }
771 :
772 : ///
773 : /// Write next value to the file.
774 : ///
775 : /// The page versions must be appended in blknum order.
776 : ///
777 542178 : async fn put_image(
778 542178 : &mut self,
779 542178 : key: Key,
780 542178 : img: Bytes,
781 542178 : ctx: &RequestContext,
782 542178 : ) -> anyhow::Result<()> {
783 542178 : ensure!(self.key_range.contains(&key));
784 542178 : let compression = self.conf.image_compression;
785 542178 : let uncompressed_len = img.len() as u64;
786 542178 : self.uncompressed_bytes += uncompressed_len;
787 542178 : self.num_keys += 1;
788 542178 : let (_img, res) = self
789 542178 : .blob_writer
790 542178 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
791 550761 : .await;
792 : // TODO: re-use the buffer for `img` further upstack
793 542178 : let (off, compression_info) = res?;
794 542178 : if compression_info.compressed_size.is_some() {
795 4002 : // The image has been considered for compression at least
796 4002 : self.uncompressed_bytes_eligible += uncompressed_len;
797 538176 : }
798 542178 : if compression_info.written_compressed {
799 0 : // The image has been compressed
800 0 : self.uncompressed_bytes_chosen += uncompressed_len;
801 542178 : }
802 :
803 542178 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
804 542178 : key.write_to_byte_slice(&mut keybuf);
805 542178 : self.tree.append(&keybuf, off)?;
806 :
807 : #[cfg(feature = "testing")]
808 542178 : {
809 542178 : self.last_written_key = key;
810 542178 : }
811 542178 :
812 542178 : Ok(())
813 542178 : }
814 :
815 : ///
816 : /// Finish writing the image layer.
817 : ///
818 284 : async fn finish(
819 284 : self,
820 284 : timeline: &Arc<Timeline>,
821 284 : ctx: &RequestContext,
822 284 : end_key: Option<Key>,
823 284 : ) -> anyhow::Result<ResidentLayer> {
824 284 : let index_start_blk =
825 284 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
826 284 :
827 284 : // Calculate compression ratio
828 284 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
829 284 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
830 284 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
831 284 : .inc_by(self.uncompressed_bytes_eligible);
832 284 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
833 284 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
834 284 :
835 284 : let mut file = self.blob_writer.into_inner();
836 284 :
837 284 : // Write out the index
838 284 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
839 0 : .await?;
840 284 : let (index_root_blk, block_buf) = self.tree.finish()?;
841 1024 : for buf in block_buf.blocks {
842 740 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
843 740 : res?;
844 : }
845 :
846 : // Fill in the summary on blk 0
847 284 : let summary = Summary {
848 284 : magic: IMAGE_FILE_MAGIC,
849 284 : format_version: STORAGE_FORMAT_VERSION,
850 284 : tenant_id: self.tenant_shard_id.tenant_id,
851 284 : timeline_id: self.timeline_id,
852 284 : key_range: self.key_range.clone(),
853 284 : lsn: self.lsn,
854 284 : index_start_blk,
855 284 : index_root_blk,
856 284 : };
857 284 :
858 284 : let mut buf = Vec::with_capacity(PAGE_SZ);
859 284 : // TODO: could use smallvec here but it's a pain with Slice<T>
860 284 : Summary::ser_into(&summary, &mut buf)?;
861 284 : file.seek(SeekFrom::Start(0)).await?;
862 284 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
863 284 : res?;
864 :
865 284 : let metadata = file
866 284 : .metadata()
867 146 : .await
868 284 : .context("get metadata to determine file size")?;
869 :
870 284 : let desc = PersistentLayerDesc::new_img(
871 284 : self.tenant_shard_id,
872 284 : self.timeline_id,
873 284 : if let Some(end_key) = end_key {
874 14 : self.key_range.start..end_key
875 : } else {
876 270 : self.key_range.clone()
877 : },
878 284 : self.lsn,
879 284 : metadata.len(),
880 : );
881 :
882 : #[cfg(feature = "testing")]
883 284 : if let Some(end_key) = end_key {
884 14 : assert!(
885 14 : self.last_written_key < end_key,
886 0 : "written key violates end_key range"
887 : );
888 270 : }
889 :
890 : // Note: Because we open the file in write-only mode, we cannot
891 : // reuse the same VirtualFile for reading later. That's why we don't
892 : // set inner.file here. The first read will have to re-open it.
893 :
894 : // fsync the file
895 284 : file.sync_all().await?;
896 :
897 : // FIXME: why not carry the virtualfile here, it supports renaming?
898 284 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
899 :
900 284 : info!("created image layer {}", layer.local_path());
901 :
902 284 : Ok(layer)
903 284 : }
904 : }
905 :
906 : /// A builder object for constructing a new image layer.
907 : ///
908 : /// Usage:
909 : ///
910 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
911 : ///
912 : /// 2. Write the contents by calling `put_page_image` for every key-value
913 : /// pair in the key range.
914 : ///
915 : /// 3. Call `finish`.
916 : ///
917 : /// # Note
918 : ///
919 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
920 : /// possible for the writer to drop before `finish` is actually called. So this
921 : /// could lead to odd temporary files in the directory, exhausting file system.
922 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
923 : /// implementation that cleans up the temporary file in failure. It's not
924 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
925 : /// out some fields, making it impossible to implement `Drop`.
926 : ///
927 : #[must_use]
928 : pub struct ImageLayerWriter {
929 : inner: Option<ImageLayerWriterInner>,
930 : }
931 :
932 : impl ImageLayerWriter {
933 : ///
934 : /// Start building a new image layer.
935 : ///
936 298 : pub async fn new(
937 298 : conf: &'static PageServerConf,
938 298 : timeline_id: TimelineId,
939 298 : tenant_shard_id: TenantShardId,
940 298 : key_range: &Range<Key>,
941 298 : lsn: Lsn,
942 298 : ctx: &RequestContext,
943 298 : ) -> anyhow::Result<ImageLayerWriter> {
944 298 : Ok(Self {
945 298 : inner: Some(
946 298 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
947 227 : .await?,
948 : ),
949 : })
950 298 : }
951 :
952 : ///
953 : /// Write next value to the file.
954 : ///
955 : /// The page versions must be appended in blknum order.
956 : ///
957 542178 : pub async fn put_image(
958 542178 : &mut self,
959 542178 : key: Key,
960 542178 : img: Bytes,
961 542178 : ctx: &RequestContext,
962 542178 : ) -> anyhow::Result<()> {
963 550761 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
964 542178 : }
965 :
966 : #[cfg(test)]
967 : /// Estimated size of the image layer.
968 4000 : pub(crate) fn estimated_size(&self) -> u64 {
969 4000 : let inner = self.inner.as_ref().unwrap();
970 4000 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
971 4000 : }
972 :
973 : #[cfg(test)]
974 4006 : pub(crate) fn num_keys(&self) -> usize {
975 4006 : self.inner.as_ref().unwrap().num_keys
976 4006 : }
977 :
978 : ///
979 : /// Finish writing the image layer.
980 : ///
981 270 : pub(crate) async fn finish(
982 270 : mut self,
983 270 : timeline: &Arc<Timeline>,
984 270 : ctx: &RequestContext,
985 270 : ) -> anyhow::Result<super::ResidentLayer> {
986 785 : self.inner.take().unwrap().finish(timeline, ctx, None).await
987 270 : }
988 :
989 : #[cfg(test)]
990 : /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
991 14 : pub(super) async fn finish_with_end_key(
992 14 : mut self,
993 14 : timeline: &Arc<Timeline>,
994 14 : end_key: Key,
995 14 : ctx: &RequestContext,
996 14 : ) -> anyhow::Result<super::ResidentLayer> {
997 14 : self.inner
998 14 : .take()
999 14 : .unwrap()
1000 14 : .finish(timeline, ctx, Some(end_key))
1001 29 : .await
1002 14 : }
1003 :
1004 12 : pub(crate) fn size(&self) -> u64 {
1005 12 : self.inner.as_ref().unwrap().size()
1006 12 : }
1007 : }
1008 :
1009 : impl Drop for ImageLayerWriter {
1010 298 : fn drop(&mut self) {
1011 298 : if let Some(inner) = self.inner.take() {
1012 14 : inner.blob_writer.into_inner().remove();
1013 284 : }
1014 298 : }
1015 : }
1016 :
1017 : pub struct ImageLayerIterator<'a> {
1018 : image_layer: &'a ImageLayerInner,
1019 : ctx: &'a RequestContext,
1020 : planner: StreamingVectoredReadPlanner,
1021 : index_iter: DiskBtreeIterator<'a>,
1022 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1023 : is_end: bool,
1024 : }
1025 :
1026 : impl<'a> ImageLayerIterator<'a> {
1027 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1028 18968 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1029 18968 : assert!(self.key_values_batch.is_empty());
1030 18968 : assert!(!self.is_end);
1031 :
1032 18968 : let plan = loop {
1033 28676 : if let Some(res) = self.index_iter.next().await {
1034 28622 : let (raw_key, offset) = res?;
1035 28622 : if let Some(batch_plan) = self.planner.handle(
1036 28622 : Key::from_slice(&raw_key[..KEY_SIZE]),
1037 28622 : self.image_layer.lsn,
1038 28622 : offset,
1039 28622 : ) {
1040 18914 : break batch_plan;
1041 9708 : }
1042 : } else {
1043 54 : self.is_end = true;
1044 54 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1045 54 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1046 54 : break item;
1047 : } else {
1048 0 : return Ok(()); // TODO: a test case on empty iterator
1049 : }
1050 : }
1051 : };
1052 18968 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1053 18968 : let mut next_batch = std::collections::VecDeque::new();
1054 18968 : let buf_size = plan.size();
1055 18968 : let buf = BytesMut::with_capacity(buf_size);
1056 18968 : let blobs_buf = vectored_blob_reader
1057 18968 : .read_blobs(&plan, buf, self.ctx)
1058 9633 : .await?;
1059 18968 : let frozen_buf: Bytes = blobs_buf.buf.freeze();
1060 28594 : for meta in blobs_buf.blobs.iter() {
1061 28594 : let img_buf = frozen_buf.slice(meta.start..meta.end);
1062 28594 : next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
1063 28594 : }
1064 18968 : self.key_values_batch = next_batch;
1065 18968 : Ok(())
1066 18968 : }
1067 :
1068 28404 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1069 28404 : if self.key_values_batch.is_empty() {
1070 18964 : if self.is_end {
1071 80 : return Ok(None);
1072 18884 : }
1073 18884 : self.next_batch().await?;
1074 9440 : }
1075 28324 : Ok(Some(
1076 28324 : self.key_values_batch
1077 28324 : .pop_front()
1078 28324 : .expect("should not be empty"),
1079 28324 : ))
1080 28404 : }
1081 : }
1082 :
1083 : #[cfg(test)]
1084 : mod test {
1085 : use std::{sync::Arc, time::Duration};
1086 :
1087 : use bytes::Bytes;
1088 : use itertools::Itertools;
1089 : use pageserver_api::{
1090 : key::Key,
1091 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1092 : };
1093 : use utils::{
1094 : generation::Generation,
1095 : id::{TenantId, TimelineId},
1096 : lsn::Lsn,
1097 : };
1098 :
1099 : use crate::{
1100 : context::RequestContext,
1101 : repository::Value,
1102 : tenant::{
1103 : config::TenantConf,
1104 : harness::{TenantHarness, TIMELINE_ID},
1105 : storage_layer::ResidentLayer,
1106 : vectored_blob_io::StreamingVectoredReadPlanner,
1107 : Tenant, Timeline,
1108 : },
1109 : DEFAULT_PG_VERSION,
1110 : };
1111 :
1112 : use super::{ImageLayerIterator, ImageLayerWriter};
1113 :
1114 : #[tokio::test]
1115 2 : async fn image_layer_rewrite() {
1116 2 : let tenant_conf = TenantConf {
1117 2 : gc_period: Duration::ZERO,
1118 2 : compaction_period: Duration::ZERO,
1119 2 : ..TenantConf::default()
1120 2 : };
1121 2 : let tenant_id = TenantId::generate();
1122 2 : let mut gen = Generation::new(0xdead0001);
1123 10 : let mut get_next_gen = || {
1124 10 : let ret = gen;
1125 10 : gen = gen.next();
1126 10 : ret
1127 10 : };
1128 2 : // The LSN at which we will create an image layer to filter
1129 2 : let lsn = Lsn(0xdeadbeef0000);
1130 2 : let timeline_id = TimelineId::generate();
1131 2 :
1132 2 : //
1133 2 : // Create an unsharded parent with a layer.
1134 2 : //
1135 2 :
1136 2 : let harness = TenantHarness::create_custom(
1137 2 : "test_image_layer_rewrite--parent",
1138 2 : tenant_conf.clone(),
1139 2 : tenant_id,
1140 2 : ShardIdentity::unsharded(),
1141 2 : get_next_gen(),
1142 2 : )
1143 2 : .await
1144 2 : .unwrap();
1145 8 : let (tenant, ctx) = harness.load().await;
1146 2 : let timeline = tenant
1147 2 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1148 4 : .await
1149 2 : .unwrap();
1150 2 :
1151 2 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1152 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1153 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1154 2 : let range = input_start..input_end;
1155 2 :
1156 2 : // Build an image layer to filter
1157 2 : let resident = {
1158 2 : let mut writer = ImageLayerWriter::new(
1159 2 : harness.conf,
1160 2 : timeline_id,
1161 2 : harness.tenant_shard_id,
1162 2 : &range,
1163 2 : lsn,
1164 2 : &ctx,
1165 2 : )
1166 2 : .await
1167 2 : .unwrap();
1168 2 :
1169 2 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1170 2 : let mut key = range.start;
1171 262146 : while key < range.end {
1172 266239 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1173 262144 :
1174 262144 : key = key.next();
1175 2 : }
1176 119 : writer.finish(&timeline, &ctx).await.unwrap()
1177 2 : };
1178 2 : let original_size = resident.metadata().file_size;
1179 2 :
1180 2 : //
1181 2 : // Create child shards and do the rewrite, exercising filter().
1182 2 : // TODO: abstraction in TenantHarness for splits.
1183 2 : //
1184 2 :
1185 2 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1186 2 : // range, middle of key range.
1187 2 : let shard_count = ShardCount::new(4);
1188 8 : for shard_number in 0..shard_count.count() {
1189 2 : //
1190 2 : // mimic the shard split
1191 2 : //
1192 8 : let shard_identity = ShardIdentity::new(
1193 8 : ShardNumber(shard_number),
1194 8 : shard_count,
1195 8 : ShardStripeSize(0x8000),
1196 8 : )
1197 8 : .unwrap();
1198 8 : let harness = TenantHarness::create_custom(
1199 8 : Box::leak(Box::new(format!(
1200 8 : "test_image_layer_rewrite--child{}",
1201 8 : shard_identity.shard_slug()
1202 8 : ))),
1203 8 : tenant_conf.clone(),
1204 8 : tenant_id,
1205 8 : shard_identity,
1206 8 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1207 8 : // But here, all we care about is that the gen number is unique.
1208 8 : get_next_gen(),
1209 8 : )
1210 2 : .await
1211 8 : .unwrap();
1212 32 : let (tenant, ctx) = harness.load().await;
1213 8 : let timeline = tenant
1214 8 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1215 12 : .await
1216 8 : .unwrap();
1217 2 :
1218 2 : //
1219 2 : // use filter() and make assertions
1220 2 : //
1221 2 :
1222 8 : let mut filtered_writer = ImageLayerWriter::new(
1223 8 : harness.conf,
1224 8 : timeline_id,
1225 8 : harness.tenant_shard_id,
1226 8 : &range,
1227 8 : lsn,
1228 8 : &ctx,
1229 8 : )
1230 4 : .await
1231 8 : .unwrap();
1232 2 :
1233 8 : let wrote_keys = resident
1234 8 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1235 266719 : .await
1236 8 : .unwrap();
1237 8 : let replacement = if wrote_keys > 0 {
1238 129 : Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
1239 2 : } else {
1240 2 : None
1241 2 : };
1242 2 :
1243 2 : // This exact size and those below will need updating as/when the layer encoding changes, but
1244 2 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1245 8 : assert_eq!(original_size, 1597440);
1246 2 :
1247 8 : match shard_number {
1248 2 : 0 => {
1249 2 : // We should have written out just one stripe for our shard identity
1250 2 : assert_eq!(wrote_keys, 0x8000);
1251 2 : let replacement = replacement.unwrap();
1252 2 :
1253 2 : // We should have dropped some of the data
1254 2 : assert!(replacement.metadata().file_size < original_size);
1255 2 : assert!(replacement.metadata().file_size > 0);
1256 2 :
1257 2 : // Assert that we dropped ~3/4 of the data.
1258 2 : assert_eq!(replacement.metadata().file_size, 417792);
1259 2 : }
1260 2 : 1 => {
1261 2 : // Shard 1 has no keys in our input range
1262 2 : assert_eq!(wrote_keys, 0x0);
1263 2 : assert!(replacement.is_none());
1264 2 : }
1265 2 : 2 => {
1266 2 : // Shard 2 has one stripes in the input range
1267 2 : assert_eq!(wrote_keys, 0x8000);
1268 2 : let replacement = replacement.unwrap();
1269 2 : assert!(replacement.metadata().file_size < original_size);
1270 2 : assert!(replacement.metadata().file_size > 0);
1271 2 : assert_eq!(replacement.metadata().file_size, 417792);
1272 2 : }
1273 2 : 3 => {
1274 2 : // Shard 3 has two stripes in the input range
1275 2 : assert_eq!(wrote_keys, 0x10000);
1276 2 : let replacement = replacement.unwrap();
1277 2 : assert!(replacement.metadata().file_size < original_size);
1278 2 : assert!(replacement.metadata().file_size > 0);
1279 2 : assert_eq!(replacement.metadata().file_size, 811008);
1280 2 : }
1281 2 : _ => unreachable!(),
1282 2 : }
1283 2 : }
1284 2 : }
1285 :
1286 2 : async fn produce_image_layer(
1287 2 : tenant: &Tenant,
1288 2 : tline: &Arc<Timeline>,
1289 2 : mut images: Vec<(Key, Bytes)>,
1290 2 : lsn: Lsn,
1291 2 : ctx: &RequestContext,
1292 2 : ) -> anyhow::Result<ResidentLayer> {
1293 2 : images.sort();
1294 2 : let (key_start, _) = images.first().unwrap();
1295 2 : let (key_last, _) = images.last().unwrap();
1296 2 : let key_end = key_last.next();
1297 2 : let key_range = *key_start..key_end;
1298 2 : let mut writer = ImageLayerWriter::new(
1299 2 : tenant.conf,
1300 2 : tline.timeline_id,
1301 2 : tenant.tenant_shard_id,
1302 2 : &key_range,
1303 2 : lsn,
1304 2 : ctx,
1305 2 : )
1306 1 : .await?;
1307 :
1308 2002 : for (key, img) in images {
1309 2031 : writer.put_image(key, img, ctx).await?;
1310 : }
1311 4 : let img_layer = writer.finish(tline, ctx).await?;
1312 :
1313 2 : Ok::<_, anyhow::Error>(img_layer)
1314 2 : }
1315 :
1316 28 : async fn assert_img_iter_equal(
1317 28 : img_iter: &mut ImageLayerIterator<'_>,
1318 28 : expect: &[(Key, Bytes)],
1319 28 : expect_lsn: Lsn,
1320 28 : ) {
1321 28 : let mut expect_iter = expect.iter();
1322 : loop {
1323 28028 : let o1 = img_iter.next().await.unwrap();
1324 28028 : let o2 = expect_iter.next();
1325 28028 : match (o1, o2) {
1326 28 : (None, None) => break,
1327 28000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1328 28000 : let Value::Image(i1) = v1 else {
1329 0 : panic!("expect Value::Image")
1330 : };
1331 28000 : assert_eq!(&k1, k2);
1332 28000 : assert_eq!(l1, expect_lsn);
1333 28000 : assert_eq!(&i1, i2);
1334 : }
1335 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1336 : }
1337 : }
1338 28 : }
1339 :
1340 : #[tokio::test]
1341 2 : async fn image_layer_iterator() {
1342 2 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1343 8 : let (tenant, ctx) = harness.load().await;
1344 2 :
1345 2 : let tline = tenant
1346 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1347 4 : .await
1348 2 : .unwrap();
1349 2 :
1350 2000 : fn get_key(id: u32) -> Key {
1351 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1352 2000 : key.field6 = id;
1353 2000 : key
1354 2000 : }
1355 2 : const N: usize = 1000;
1356 2 : let test_imgs = (0..N)
1357 2000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1358 2 : .collect_vec();
1359 2 : let resident_layer =
1360 2 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1361 2036 : .await
1362 2 : .unwrap();
1363 2 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1364 6 : for max_read_size in [1, 1024] {
1365 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1366 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1367 28 : // Test if the batch size is correctly determined
1368 28 : let mut iter = img_layer.iter(&ctx);
1369 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1370 28 : let mut num_items = 0;
1371 112 : for _ in 0..3 {
1372 84 : iter.next_batch().await.unwrap();
1373 84 : num_items += iter.key_values_batch.len();
1374 84 : if max_read_size == 1 {
1375 2 : // every key should be a batch b/c the value is larger than max_read_size
1376 42 : assert_eq!(iter.key_values_batch.len(), 1);
1377 2 : } else {
1378 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
1379 2 : }
1380 84 : if num_items >= N {
1381 2 : break;
1382 84 : }
1383 84 : iter.key_values_batch.clear();
1384 2 : }
1385 2 : // Test if the result is correct
1386 28 : let mut iter = img_layer.iter(&ctx);
1387 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1388 9576 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1389 2 : }
1390 2 : }
1391 2 : }
1392 : }
|