Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{
33 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
34 : };
35 : use crate::tenant::storage_layer::{
36 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
37 : };
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
41 : VectoredReadPlanner,
42 : };
43 : use crate::tenant::{PageReconstructError, Timeline};
44 : use crate::virtual_file::{self, VirtualFile};
45 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::{Bytes, BytesMut};
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use hex;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
53 : use rand::{distributions::Alphanumeric, Rng};
54 : use serde::{Deserialize, Serialize};
55 : use std::collections::VecDeque;
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::prelude::FileExt;
60 : use std::str::FromStr;
61 : use std::sync::Arc;
62 : use tokio::sync::OnceCell;
63 : use tokio_stream::StreamExt;
64 : use tracing::*;
65 :
66 : use utils::{
67 : bin_ser::BeSer,
68 : id::{TenantId, TimelineId},
69 : lsn::Lsn,
70 : };
71 :
72 : use super::layer_name::ImageLayerName;
73 : use super::{
74 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
75 : };
76 :
77 : ///
78 : /// Header stored in the beginning of the file
79 : ///
80 : /// After this comes the 'values' part, starting on block 1. After that,
81 : /// the 'index' starts at the block indicated by 'index_start_blk'
82 : ///
83 98 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
84 : pub struct Summary {
85 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
86 : pub magic: u16,
87 : pub format_version: u16,
88 :
89 : pub tenant_id: TenantId,
90 : pub timeline_id: TimelineId,
91 : pub key_range: Range<Key>,
92 : pub lsn: Lsn,
93 :
94 : /// Block number where the 'index' part of the file begins.
95 : pub index_start_blk: u32,
96 : /// Block within the 'index', where the B-tree root page is stored
97 : pub index_root_blk: u32,
98 : // the 'values' part starts after the summary header, on block 1.
99 : }
100 :
101 : impl From<&ImageLayer> for Summary {
102 0 : fn from(layer: &ImageLayer) -> Self {
103 0 : Self::expected(
104 0 : layer.desc.tenant_shard_id.tenant_id,
105 0 : layer.desc.timeline_id,
106 0 : layer.desc.key_range.clone(),
107 0 : layer.lsn,
108 0 : )
109 0 : }
110 : }
111 :
112 : impl Summary {
113 98 : pub(super) fn expected(
114 98 : tenant_id: TenantId,
115 98 : timeline_id: TimelineId,
116 98 : key_range: Range<Key>,
117 98 : lsn: Lsn,
118 98 : ) -> Self {
119 98 : Self {
120 98 : magic: IMAGE_FILE_MAGIC,
121 98 : format_version: STORAGE_FORMAT_VERSION,
122 98 : tenant_id,
123 98 : timeline_id,
124 98 : key_range,
125 98 : lsn,
126 98 :
127 98 : index_start_blk: 0,
128 98 : index_root_blk: 0,
129 98 : }
130 98 : }
131 : }
132 :
133 : /// This is used only from `pagectl`. Within pageserver, all layers are
134 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
135 : pub struct ImageLayer {
136 : path: Utf8PathBuf,
137 : pub desc: PersistentLayerDesc,
138 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
139 : pub lsn: Lsn,
140 : access_stats: LayerAccessStats,
141 : inner: OnceCell<ImageLayerInner>,
142 : }
143 :
144 : impl std::fmt::Debug for ImageLayer {
145 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 0 : use super::RangeDisplayDebug;
147 0 :
148 0 : f.debug_struct("ImageLayer")
149 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
150 0 : .field("file_size", &self.desc.file_size)
151 0 : .field("lsn", &self.lsn)
152 0 : .field("inner", &self.inner)
153 0 : .finish()
154 0 : }
155 : }
156 :
157 : /// ImageLayer is the in-memory data structure associated with an on-disk image
158 : /// file.
159 : pub struct ImageLayerInner {
160 : // values copied from summary
161 : index_start_blk: u32,
162 : index_root_blk: u32,
163 :
164 : key_range: Range<Key>,
165 : lsn: Lsn,
166 :
167 : file: VirtualFile,
168 : file_id: FileId,
169 :
170 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
171 : }
172 :
173 : impl std::fmt::Debug for ImageLayerInner {
174 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 0 : f.debug_struct("ImageLayerInner")
176 0 : .field("index_start_blk", &self.index_start_blk)
177 0 : .field("index_root_blk", &self.index_root_blk)
178 0 : .finish()
179 0 : }
180 : }
181 :
182 : impl ImageLayerInner {
183 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
184 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
185 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
186 0 : self.index_start_blk,
187 0 : self.index_root_blk,
188 0 : block_reader,
189 0 : );
190 0 :
191 0 : tree_reader.dump().await?;
192 :
193 0 : tree_reader
194 0 : .visit(
195 0 : &[0u8; KEY_SIZE],
196 0 : VisitDirection::Forwards,
197 0 : |key, value| {
198 0 : println!("key: {} offset {}", hex::encode(key), value);
199 0 : true
200 0 : },
201 0 : ctx,
202 0 : )
203 0 : .await?;
204 :
205 0 : Ok(())
206 0 : }
207 : }
208 :
209 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
210 : impl std::fmt::Display for ImageLayer {
211 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 0 : write!(f, "{}", self.layer_desc().short_id())
213 0 : }
214 : }
215 :
216 : impl AsLayerDesc for ImageLayer {
217 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
218 0 : &self.desc
219 0 : }
220 : }
221 :
222 : impl ImageLayer {
223 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
224 0 : self.desc.dump();
225 0 :
226 0 : if !verbose {
227 0 : return Ok(());
228 0 : }
229 :
230 0 : let inner = self.load(ctx).await?;
231 :
232 0 : inner.dump(ctx).await?;
233 :
234 0 : Ok(())
235 0 : }
236 :
237 272 : fn temp_path_for(
238 272 : conf: &PageServerConf,
239 272 : timeline_id: TimelineId,
240 272 : tenant_shard_id: TenantShardId,
241 272 : fname: &ImageLayerName,
242 272 : ) -> Utf8PathBuf {
243 272 : let rand_string: String = rand::thread_rng()
244 272 : .sample_iter(&Alphanumeric)
245 272 : .take(8)
246 272 : .map(char::from)
247 272 : .collect();
248 272 :
249 272 : conf.timeline_path(&tenant_shard_id, &timeline_id)
250 272 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
251 272 : }
252 :
253 : ///
254 : /// Open the underlying file and read the metadata into memory, if it's
255 : /// not loaded already.
256 : ///
257 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
258 0 : self.access_stats.record_access(ctx);
259 0 : self.inner
260 0 : .get_or_try_init(|| self.load_inner(ctx))
261 0 : .await
262 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
263 0 : }
264 :
265 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
266 0 : let path = self.path();
267 :
268 0 : let loaded =
269 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
270 :
271 : // not production code
272 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
273 0 : let expected_layer_name = self.layer_desc().layer_name();
274 0 :
275 0 : if actual_layer_name != expected_layer_name {
276 0 : println!("warning: filename does not match what is expected from in-file summary");
277 0 : println!("actual: {:?}", actual_layer_name.to_string());
278 0 : println!("expected: {:?}", expected_layer_name.to_string());
279 0 : }
280 :
281 0 : Ok(loaded)
282 0 : }
283 :
284 : /// Create an ImageLayer struct representing an existing file on disk.
285 : ///
286 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
287 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
288 0 : let mut summary_buf = vec![0; PAGE_SZ];
289 0 : file.read_exact_at(&mut summary_buf, 0)?;
290 0 : let summary = Summary::des_prefix(&summary_buf)?;
291 0 : let metadata = file
292 0 : .metadata()
293 0 : .context("get file metadata to determine size")?;
294 :
295 : // This function is never used for constructing layers in a running pageserver,
296 : // so it does not need an accurate TenantShardId.
297 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
298 0 :
299 0 : Ok(ImageLayer {
300 0 : path: path.to_path_buf(),
301 0 : desc: PersistentLayerDesc::new_img(
302 0 : tenant_shard_id,
303 0 : summary.timeline_id,
304 0 : summary.key_range,
305 0 : summary.lsn,
306 0 : metadata.len(),
307 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
308 0 : lsn: summary.lsn,
309 0 : access_stats: Default::default(),
310 0 : inner: OnceCell::new(),
311 0 : })
312 0 : }
313 :
314 0 : fn path(&self) -> Utf8PathBuf {
315 0 : self.path.clone()
316 0 : }
317 : }
318 :
319 0 : #[derive(thiserror::Error, Debug)]
320 : pub enum RewriteSummaryError {
321 : #[error("magic mismatch")]
322 : MagicMismatch,
323 : #[error(transparent)]
324 : Other(#[from] anyhow::Error),
325 : }
326 :
327 : impl From<std::io::Error> for RewriteSummaryError {
328 0 : fn from(e: std::io::Error) -> Self {
329 0 : Self::Other(anyhow::anyhow!(e))
330 0 : }
331 : }
332 :
333 : impl ImageLayer {
334 0 : pub async fn rewrite_summary<F>(
335 0 : path: &Utf8Path,
336 0 : rewrite: F,
337 0 : ctx: &RequestContext,
338 0 : ) -> Result<(), RewriteSummaryError>
339 0 : where
340 0 : F: Fn(Summary) -> Summary,
341 0 : {
342 0 : let mut file = VirtualFile::open_with_options(
343 0 : path,
344 0 : virtual_file::OpenOptions::new().read(true).write(true),
345 0 : ctx,
346 0 : )
347 0 : .await
348 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
349 0 : let file_id = page_cache::next_file_id();
350 0 : let block_reader = FileBlockReader::new(&file, file_id);
351 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
352 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
353 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
354 0 : return Err(RewriteSummaryError::MagicMismatch);
355 0 : }
356 0 :
357 0 : let new_summary = rewrite(actual_summary);
358 0 :
359 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
360 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
361 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
362 0 : file.seek(SeekFrom::Start(0)).await?;
363 0 : let (_buf, res) = file.write_all(buf, ctx).await;
364 0 : res?;
365 0 : Ok(())
366 0 : }
367 : }
368 :
369 : impl ImageLayerInner {
370 22 : pub(crate) fn key_range(&self) -> &Range<Key> {
371 22 : &self.key_range
372 22 : }
373 :
374 22 : pub(crate) fn lsn(&self) -> Lsn {
375 22 : self.lsn
376 22 : }
377 :
378 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
379 : /// - inner has the success or transient failure
380 : /// - outer has the permanent failure
381 98 : pub(super) async fn load(
382 98 : path: &Utf8Path,
383 98 : lsn: Lsn,
384 98 : summary: Option<Summary>,
385 98 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
386 98 : ctx: &RequestContext,
387 98 : ) -> anyhow::Result<Self> {
388 98 : let file = VirtualFile::open(path, ctx)
389 49 : .await
390 98 : .context("open layer file")?;
391 98 : let file_id = page_cache::next_file_id();
392 98 : let block_reader = FileBlockReader::new(&file, file_id);
393 98 : let summary_blk = block_reader
394 98 : .read_blk(0, ctx)
395 51 : .await
396 98 : .context("read first block")?;
397 :
398 : // length is the only way how this could fail, so it's not actually likely at all unless
399 : // read_blk returns wrong sized block.
400 : //
401 : // TODO: confirm and make this into assertion
402 98 : let actual_summary =
403 98 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
404 :
405 98 : if let Some(mut expected_summary) = summary {
406 : // production code path
407 98 : expected_summary.index_start_blk = actual_summary.index_start_blk;
408 98 : expected_summary.index_root_blk = actual_summary.index_root_blk;
409 98 : // mask out the timeline_id, but still require the layers to be from the same tenant
410 98 : expected_summary.timeline_id = actual_summary.timeline_id;
411 98 :
412 98 : if actual_summary != expected_summary {
413 0 : bail!(
414 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
415 0 : actual_summary,
416 0 : expected_summary
417 0 : );
418 98 : }
419 0 : }
420 :
421 98 : Ok(ImageLayerInner {
422 98 : index_start_blk: actual_summary.index_start_blk,
423 98 : index_root_blk: actual_summary.index_root_blk,
424 98 : lsn,
425 98 : file,
426 98 : file_id,
427 98 : max_vectored_read_bytes,
428 98 : key_range: actual_summary.key_range,
429 98 : })
430 98 : }
431 :
432 4 : pub(super) async fn get_value_reconstruct_data(
433 4 : &self,
434 4 : key: Key,
435 4 : reconstruct_state: &mut ValueReconstructState,
436 4 : ctx: &RequestContext,
437 4 : ) -> anyhow::Result<ValueReconstructResult> {
438 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
439 4 : let tree_reader =
440 4 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
441 4 :
442 4 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
443 4 : key.write_to_byte_slice(&mut keybuf);
444 4 : if let Some(offset) = tree_reader
445 4 : .get(
446 4 : &keybuf,
447 4 : &RequestContextBuilder::extend(ctx)
448 4 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
449 4 : .build(),
450 4 : )
451 2 : .await?
452 : {
453 4 : let blob = block_reader
454 4 : .block_cursor()
455 4 : .read_blob(
456 4 : offset,
457 4 : &RequestContextBuilder::extend(ctx)
458 4 : .page_content_kind(PageContentKind::ImageLayerValue)
459 4 : .build(),
460 4 : )
461 2 : .await
462 4 : .with_context(|| format!("failed to read value from offset {}", offset))?;
463 4 : let value = Bytes::from(blob);
464 4 :
465 4 : reconstruct_state.img = Some((self.lsn, value));
466 4 : Ok(ValueReconstructResult::Complete)
467 : } else {
468 0 : Ok(ValueReconstructResult::Missing)
469 : }
470 4 : }
471 :
472 : // Look up the keys in the provided keyspace and update
473 : // the reconstruct state with whatever is found.
474 7606 : pub(super) async fn get_values_reconstruct_data(
475 7606 : &self,
476 7606 : keyspace: KeySpace,
477 7606 : reconstruct_state: &mut ValuesReconstructState,
478 7606 : ctx: &RequestContext,
479 7606 : ) -> Result<(), GetVectoredError> {
480 7606 : let reads = self
481 7606 : .plan_reads(keyspace, None, ctx)
482 730 : .await
483 7606 : .map_err(GetVectoredError::Other)?;
484 :
485 7606 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
486 4633 : .await;
487 :
488 7606 : reconstruct_state.on_image_layer_visited(&self.key_range);
489 7606 :
490 7606 : Ok(())
491 7606 : }
492 :
493 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
494 0 : pub(super) async fn load_key_values(
495 0 : &self,
496 0 : ctx: &RequestContext,
497 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
498 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
499 0 : let tree_reader =
500 0 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
501 0 : let mut result = Vec::new();
502 0 : let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
503 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
504 0 : let cursor = block_reader.block_cursor();
505 0 : while let Some(item) = stream.next().await {
506 : // TODO: dedup code with get_reconstruct_value
507 0 : let (raw_key, offset) = item?;
508 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
509 : // TODO: ctx handling and sharding
510 0 : let blob = cursor
511 0 : .read_blob(offset, ctx)
512 0 : .await
513 0 : .with_context(|| format!("failed to read value from offset {}", offset))?;
514 0 : let value = Bytes::from(blob);
515 0 : result.push((key, self.lsn, Value::Image(value)));
516 : }
517 0 : Ok(result)
518 0 : }
519 :
520 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
521 : /// and the keys in this layer.
522 : ///
523 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
524 : /// this shard.
525 7614 : async fn plan_reads(
526 7614 : &self,
527 7614 : keyspace: KeySpace,
528 7614 : shard_identity: Option<&ShardIdentity>,
529 7614 : ctx: &RequestContext,
530 7614 : ) -> anyhow::Result<Vec<VectoredRead>> {
531 7614 : let mut planner = VectoredReadPlanner::new(
532 7614 : self.max_vectored_read_bytes
533 7614 : .expect("Layer is loaded with max vectored bytes config")
534 7614 : .0
535 7614 : .into(),
536 7614 : );
537 7614 :
538 7614 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
539 7614 : let tree_reader =
540 7614 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
541 7614 :
542 7614 : let ctx = RequestContextBuilder::extend(ctx)
543 7614 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
544 7614 : .build();
545 :
546 29312 : for range in keyspace.ranges.iter() {
547 29312 : let mut range_end_handled = false;
548 29312 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
549 29312 : range.start.write_to_byte_slice(&mut search_key);
550 29312 :
551 29312 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
552 29312 : let mut index_stream = std::pin::pin!(index_stream);
553 :
554 1095809 : while let Some(index_entry) = index_stream.next().await {
555 1095626 : let (raw_key, offset) = index_entry?;
556 :
557 1095626 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
558 1095626 : assert!(key >= range.start);
559 :
560 1095626 : let flag = if let Some(shard_identity) = shard_identity {
561 1048576 : if shard_identity.is_key_disposable(&key) {
562 786432 : BlobFlag::Ignore
563 : } else {
564 262144 : BlobFlag::None
565 : }
566 : } else {
567 47050 : BlobFlag::None
568 : };
569 :
570 1095626 : if key >= range.end {
571 29129 : planner.handle_range_end(offset);
572 29129 : range_end_handled = true;
573 29129 : break;
574 1066497 : } else {
575 1066497 : planner.handle(key, self.lsn, offset, flag);
576 1066497 : }
577 : }
578 :
579 29312 : if !range_end_handled {
580 183 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
581 183 : planner.handle_range_end(payload_end);
582 29129 : }
583 : }
584 :
585 7614 : Ok(planner.finish())
586 7614 : }
587 :
588 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
589 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
590 8 : pub(super) async fn filter(
591 8 : &self,
592 8 : shard_identity: &ShardIdentity,
593 8 : writer: &mut ImageLayerWriter,
594 8 : ctx: &RequestContext,
595 8 : ) -> anyhow::Result<usize> {
596 : // Fragment the range into the regions owned by this ShardIdentity
597 8 : let plan = self
598 8 : .plan_reads(
599 8 : KeySpace {
600 8 : // If asked for the total key space, plan_reads will give us all the keys in the layer
601 8 : ranges: vec![Key::MIN..Key::MAX],
602 8 : },
603 8 : Some(shard_identity),
604 8 : ctx,
605 8 : )
606 469 : .await?;
607 :
608 8 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
609 8 : let mut key_count = 0;
610 16 : for read in plan.into_iter() {
611 16 : let buf_size = read.size();
612 16 :
613 16 : let buf = BytesMut::with_capacity(buf_size);
614 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
615 :
616 16 : let frozen_buf = blobs_buf.buf.freeze();
617 :
618 262144 : for meta in blobs_buf.blobs.iter() {
619 262144 : let img_buf = frozen_buf.slice(meta.start..meta.end);
620 262144 :
621 262144 : key_count += 1;
622 262144 : writer
623 262144 : .put_image(meta.meta.key, img_buf, ctx)
624 266240 : .await
625 262144 : .context(format!("Storing key {}", meta.meta.key))?;
626 : }
627 : }
628 :
629 8 : Ok(key_count)
630 8 : }
631 :
632 7606 : async fn do_reads_and_update_state(
633 7606 : &self,
634 7606 : reads: Vec<VectoredRead>,
635 7606 : reconstruct_state: &mut ValuesReconstructState,
636 7606 : ctx: &RequestContext,
637 7606 : ) {
638 7606 : let max_vectored_read_bytes = self
639 7606 : .max_vectored_read_bytes
640 7606 : .expect("Layer is loaded with max vectored bytes config")
641 7606 : .0
642 7606 : .into();
643 7606 :
644 7606 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
645 9217 : for read in reads.into_iter() {
646 9217 : let buf_size = read.size();
647 9217 :
648 9217 : if buf_size > max_vectored_read_bytes {
649 : // If the read is oversized, it should only contain one key.
650 0 : let offenders = read
651 0 : .blobs_at
652 0 : .as_slice()
653 0 : .iter()
654 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
655 0 : .join(", ");
656 0 : tracing::warn!(
657 0 : "Oversized vectored read ({} > {}) for keys {}",
658 : buf_size,
659 : max_vectored_read_bytes,
660 : offenders
661 : );
662 9217 : }
663 :
664 9217 : let buf = BytesMut::with_capacity(buf_size);
665 9217 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
666 :
667 9217 : match res {
668 9217 : Ok(blobs_buf) => {
669 9217 : let frozen_buf = blobs_buf.buf.freeze();
670 :
671 17921 : for meta in blobs_buf.blobs.iter() {
672 17921 : let img_buf = frozen_buf.slice(meta.start..meta.end);
673 17921 : reconstruct_state.update_key(
674 17921 : &meta.meta.key,
675 17921 : self.lsn,
676 17921 : Value::Image(img_buf),
677 17921 : );
678 17921 : }
679 : }
680 0 : Err(err) => {
681 0 : let kind = err.kind();
682 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
683 0 : reconstruct_state.on_key_error(
684 0 : blob_meta.key,
685 0 : PageReconstructError::from(anyhow!(
686 0 : "Failed to read blobs from virtual file {}: {}",
687 0 : self.file.path,
688 0 : kind
689 0 : )),
690 0 : );
691 0 : }
692 : }
693 : };
694 : }
695 7606 : }
696 :
697 78 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
698 78 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
699 78 : let tree_reader =
700 78 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
701 78 : ImageLayerIterator {
702 78 : image_layer: self,
703 78 : ctx,
704 78 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
705 78 : key_values_batch: VecDeque::new(),
706 78 : is_end: false,
707 78 : planner: StreamingVectoredReadPlanner::new(
708 78 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
709 78 : 1024, // The default value. Unit tests might use a different value
710 78 : ),
711 78 : }
712 78 : }
713 : }
714 :
715 : /// A builder object for constructing a new image layer.
716 : ///
717 : /// Usage:
718 : ///
719 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
720 : ///
721 : /// 2. Write the contents by calling `put_page_image` for every key-value
722 : /// pair in the key range.
723 : ///
724 : /// 3. Call `finish`.
725 : ///
726 : struct ImageLayerWriterInner {
727 : conf: &'static PageServerConf,
728 : path: Utf8PathBuf,
729 : timeline_id: TimelineId,
730 : tenant_shard_id: TenantShardId,
731 : key_range: Range<Key>,
732 : lsn: Lsn,
733 :
734 : // Total uncompressed bytes passed into put_image
735 : uncompressed_bytes: u64,
736 :
737 : // Like `uncompressed_bytes`,
738 : // but only of images we might consider for compression
739 : uncompressed_bytes_eligible: u64,
740 :
741 : // Like `uncompressed_bytes`, but only of images
742 : // where we have chosen their compressed form
743 : uncompressed_bytes_chosen: u64,
744 :
745 : blob_writer: BlobWriter<false>,
746 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
747 : }
748 :
749 : impl ImageLayerWriterInner {
750 : ///
751 : /// Start building a new image layer.
752 : ///
753 272 : async fn new(
754 272 : conf: &'static PageServerConf,
755 272 : timeline_id: TimelineId,
756 272 : tenant_shard_id: TenantShardId,
757 272 : key_range: &Range<Key>,
758 272 : lsn: Lsn,
759 272 : ctx: &RequestContext,
760 272 : ) -> anyhow::Result<Self> {
761 272 : // Create the file initially with a temporary filename.
762 272 : // We'll atomically rename it to the final name when we're done.
763 272 : let path = ImageLayer::temp_path_for(
764 272 : conf,
765 272 : timeline_id,
766 272 : tenant_shard_id,
767 272 : &ImageLayerName {
768 272 : key_range: key_range.clone(),
769 272 : lsn,
770 272 : },
771 272 : );
772 272 : trace!("creating image layer {}", path);
773 272 : let mut file = {
774 272 : VirtualFile::open_with_options(
775 272 : &path,
776 272 : virtual_file::OpenOptions::new()
777 272 : .write(true)
778 272 : .create_new(true),
779 272 : ctx,
780 272 : )
781 210 : .await?
782 : };
783 : // make room for the header block
784 272 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
785 272 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
786 272 :
787 272 : // Initialize the b-tree index builder
788 272 : let block_buf = BlockBuf::new();
789 272 : let tree_builder = DiskBtreeBuilder::new(block_buf);
790 272 :
791 272 : let writer = Self {
792 272 : conf,
793 272 : path,
794 272 : timeline_id,
795 272 : tenant_shard_id,
796 272 : key_range: key_range.clone(),
797 272 : lsn,
798 272 : tree: tree_builder,
799 272 : blob_writer,
800 272 : uncompressed_bytes: 0,
801 272 : uncompressed_bytes_eligible: 0,
802 272 : uncompressed_bytes_chosen: 0,
803 272 : };
804 272 :
805 272 : Ok(writer)
806 272 : }
807 :
808 : ///
809 : /// Write next value to the file.
810 : ///
811 : /// The page versions must be appended in blknum order.
812 : ///
813 538070 : async fn put_image(
814 538070 : &mut self,
815 538070 : key: Key,
816 538070 : img: Bytes,
817 538070 : ctx: &RequestContext,
818 538070 : ) -> anyhow::Result<()> {
819 538070 : ensure!(self.key_range.contains(&key));
820 538070 : let compression = self.conf.image_compression;
821 538070 : let uncompressed_len = img.len() as u64;
822 538070 : self.uncompressed_bytes += uncompressed_len;
823 538070 : let (_img, res) = self
824 538070 : .blob_writer
825 538070 : .write_blob_maybe_compressed(img, ctx, compression)
826 546563 : .await;
827 : // TODO: re-use the buffer for `img` further upstack
828 538070 : let (off, compression_info) = res?;
829 538070 : if compression_info.compressed_size.is_some() {
830 0 : // The image has been considered for compression at least
831 0 : self.uncompressed_bytes_eligible += uncompressed_len;
832 538070 : }
833 538070 : if compression_info.written_compressed {
834 0 : // The image has been compressed
835 0 : self.uncompressed_bytes_chosen += uncompressed_len;
836 538070 : }
837 :
838 538070 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
839 538070 : key.write_to_byte_slice(&mut keybuf);
840 538070 : self.tree.append(&keybuf, off)?;
841 :
842 538070 : Ok(())
843 538070 : }
844 :
845 : ///
846 : /// Finish writing the image layer.
847 : ///
848 260 : async fn finish(
849 260 : self,
850 260 : timeline: &Arc<Timeline>,
851 260 : ctx: &RequestContext,
852 260 : ) -> anyhow::Result<ResidentLayer> {
853 260 : let index_start_blk =
854 260 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
855 260 :
856 260 : // Calculate compression ratio
857 260 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
858 260 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
859 260 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
860 260 : .inc_by(self.uncompressed_bytes_eligible);
861 260 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
862 260 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
863 260 :
864 260 : let mut file = self.blob_writer.into_inner();
865 260 :
866 260 : // Write out the index
867 260 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
868 0 : .await?;
869 260 : let (index_root_blk, block_buf) = self.tree.finish()?;
870 976 : for buf in block_buf.blocks {
871 716 : let (_buf, res) = file.write_all(buf, ctx).await;
872 716 : res?;
873 : }
874 :
875 : // Fill in the summary on blk 0
876 260 : let summary = Summary {
877 260 : magic: IMAGE_FILE_MAGIC,
878 260 : format_version: STORAGE_FORMAT_VERSION,
879 260 : tenant_id: self.tenant_shard_id.tenant_id,
880 260 : timeline_id: self.timeline_id,
881 260 : key_range: self.key_range.clone(),
882 260 : lsn: self.lsn,
883 260 : index_start_blk,
884 260 : index_root_blk,
885 260 : };
886 260 :
887 260 : let mut buf = Vec::with_capacity(PAGE_SZ);
888 260 : // TODO: could use smallvec here but it's a pain with Slice<T>
889 260 : Summary::ser_into(&summary, &mut buf)?;
890 260 : file.seek(SeekFrom::Start(0)).await?;
891 260 : let (_buf, res) = file.write_all(buf, ctx).await;
892 260 : res?;
893 :
894 260 : let metadata = file
895 260 : .metadata()
896 131 : .await
897 260 : .context("get metadata to determine file size")?;
898 :
899 260 : let desc = PersistentLayerDesc::new_img(
900 260 : self.tenant_shard_id,
901 260 : self.timeline_id,
902 260 : self.key_range.clone(),
903 260 : self.lsn,
904 260 : metadata.len(),
905 260 : );
906 260 :
907 260 : // Note: Because we open the file in write-only mode, we cannot
908 260 : // reuse the same VirtualFile for reading later. That's why we don't
909 260 : // set inner.file here. The first read will have to re-open it.
910 260 :
911 260 : // fsync the file
912 260 : file.sync_all().await?;
913 :
914 : // FIXME: why not carry the virtualfile here, it supports renaming?
915 260 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
916 :
917 260 : info!("created image layer {}", layer.local_path());
918 :
919 260 : Ok(layer)
920 260 : }
921 : }
922 :
923 : /// A builder object for constructing a new image layer.
924 : ///
925 : /// Usage:
926 : ///
927 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
928 : ///
929 : /// 2. Write the contents by calling `put_page_image` for every key-value
930 : /// pair in the key range.
931 : ///
932 : /// 3. Call `finish`.
933 : ///
934 : /// # Note
935 : ///
936 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
937 : /// possible for the writer to drop before `finish` is actually called. So this
938 : /// could lead to odd temporary files in the directory, exhausting file system.
939 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
940 : /// implementation that cleans up the temporary file in failure. It's not
941 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
942 : /// out some fields, making it impossible to implement `Drop`.
943 : ///
944 : #[must_use]
945 : pub struct ImageLayerWriter {
946 : inner: Option<ImageLayerWriterInner>,
947 : }
948 :
949 : impl ImageLayerWriter {
950 : ///
951 : /// Start building a new image layer.
952 : ///
953 272 : pub async fn new(
954 272 : conf: &'static PageServerConf,
955 272 : timeline_id: TimelineId,
956 272 : tenant_shard_id: TenantShardId,
957 272 : key_range: &Range<Key>,
958 272 : lsn: Lsn,
959 272 : ctx: &RequestContext,
960 272 : ) -> anyhow::Result<ImageLayerWriter> {
961 272 : Ok(Self {
962 272 : inner: Some(
963 272 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
964 210 : .await?,
965 : ),
966 : })
967 272 : }
968 :
969 : ///
970 : /// Write next value to the file.
971 : ///
972 : /// The page versions must be appended in blknum order.
973 : ///
974 538070 : pub async fn put_image(
975 538070 : &mut self,
976 538070 : key: Key,
977 538070 : img: Bytes,
978 538070 : ctx: &RequestContext,
979 538070 : ) -> anyhow::Result<()> {
980 546563 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
981 538070 : }
982 :
983 : ///
984 : /// Finish writing the image layer.
985 : ///
986 260 : pub(crate) async fn finish(
987 260 : mut self,
988 260 : timeline: &Arc<Timeline>,
989 260 : ctx: &RequestContext,
990 260 : ) -> anyhow::Result<super::ResidentLayer> {
991 756 : self.inner.take().unwrap().finish(timeline, ctx).await
992 260 : }
993 : }
994 :
995 : impl Drop for ImageLayerWriter {
996 272 : fn drop(&mut self) {
997 272 : if let Some(inner) = self.inner.take() {
998 12 : inner.blob_writer.into_inner().remove();
999 260 : }
1000 272 : }
1001 : }
1002 :
1003 : pub struct ImageLayerIterator<'a> {
1004 : image_layer: &'a ImageLayerInner,
1005 : ctx: &'a RequestContext,
1006 : planner: StreamingVectoredReadPlanner,
1007 : index_iter: DiskBtreeIterator<'a>,
1008 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1009 : is_end: bool,
1010 : }
1011 :
1012 : impl<'a> ImageLayerIterator<'a> {
1013 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1014 18964 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1015 18964 : assert!(self.key_values_batch.is_empty());
1016 18964 : assert!(!self.is_end);
1017 :
1018 18964 : let plan = loop {
1019 28636 : if let Some(res) = self.index_iter.next().await {
1020 28586 : let (raw_key, offset) = res?;
1021 28586 : if let Some(batch_plan) = self.planner.handle(
1022 28586 : Key::from_slice(&raw_key[..KEY_SIZE]),
1023 28586 : self.image_layer.lsn,
1024 28586 : offset,
1025 28586 : ) {
1026 18914 : break batch_plan;
1027 9672 : }
1028 : } else {
1029 50 : self.is_end = true;
1030 50 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1031 50 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1032 50 : break item;
1033 : } else {
1034 0 : return Ok(()); // TODO: a test case on empty iterator
1035 : }
1036 : }
1037 : };
1038 18964 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1039 18964 : let mut next_batch = std::collections::VecDeque::new();
1040 18964 : let buf_size = plan.size();
1041 18964 : let buf = BytesMut::with_capacity(buf_size);
1042 18964 : let blobs_buf = vectored_blob_reader
1043 18964 : .read_blobs(&plan, buf, self.ctx)
1044 9631 : .await?;
1045 18964 : let frozen_buf: Bytes = blobs_buf.buf.freeze();
1046 28558 : for meta in blobs_buf.blobs.iter() {
1047 28558 : let img_buf = frozen_buf.slice(meta.start..meta.end);
1048 28558 : next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
1049 28558 : }
1050 18964 : self.key_values_batch = next_batch;
1051 18964 : Ok(())
1052 18964 : }
1053 :
1054 28360 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1055 28360 : if self.key_values_batch.is_empty() {
1056 18952 : if self.is_end {
1057 72 : return Ok(None);
1058 18880 : }
1059 18880 : self.next_batch().await?;
1060 9408 : }
1061 28288 : Ok(Some(
1062 28288 : self.key_values_batch
1063 28288 : .pop_front()
1064 28288 : .expect("should not be empty"),
1065 28288 : ))
1066 28360 : }
1067 : }
1068 :
1069 : #[cfg(test)]
1070 : mod test {
1071 : use std::{sync::Arc, time::Duration};
1072 :
1073 : use bytes::Bytes;
1074 : use itertools::Itertools;
1075 : use pageserver_api::{
1076 : key::Key,
1077 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1078 : };
1079 : use utils::{
1080 : generation::Generation,
1081 : id::{TenantId, TimelineId},
1082 : lsn::Lsn,
1083 : };
1084 :
1085 : use crate::{
1086 : context::RequestContext,
1087 : repository::Value,
1088 : tenant::{
1089 : config::TenantConf,
1090 : harness::{TenantHarness, TIMELINE_ID},
1091 : storage_layer::ResidentLayer,
1092 : vectored_blob_io::StreamingVectoredReadPlanner,
1093 : Tenant, Timeline,
1094 : },
1095 : DEFAULT_PG_VERSION,
1096 : };
1097 :
1098 : use super::{ImageLayerIterator, ImageLayerWriter};
1099 :
1100 : #[tokio::test]
1101 2 : async fn image_layer_rewrite() {
1102 2 : let tenant_conf = TenantConf {
1103 2 : gc_period: Duration::ZERO,
1104 2 : compaction_period: Duration::ZERO,
1105 2 : ..TenantConf::default()
1106 2 : };
1107 2 : let tenant_id = TenantId::generate();
1108 2 : let mut gen = Generation::new(0xdead0001);
1109 10 : let mut get_next_gen = || {
1110 10 : let ret = gen;
1111 10 : gen = gen.next();
1112 10 : ret
1113 10 : };
1114 2 : // The LSN at which we will create an image layer to filter
1115 2 : let lsn = Lsn(0xdeadbeef0000);
1116 2 : let timeline_id = TimelineId::generate();
1117 2 :
1118 2 : //
1119 2 : // Create an unsharded parent with a layer.
1120 2 : //
1121 2 :
1122 2 : let harness = TenantHarness::create_custom(
1123 2 : "test_image_layer_rewrite--parent",
1124 2 : tenant_conf.clone(),
1125 2 : tenant_id,
1126 2 : ShardIdentity::unsharded(),
1127 2 : get_next_gen(),
1128 2 : )
1129 2 : .await
1130 2 : .unwrap();
1131 8 : let (tenant, ctx) = harness.load().await;
1132 2 : let timeline = tenant
1133 2 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1134 4 : .await
1135 2 : .unwrap();
1136 2 :
1137 2 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1138 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1139 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1140 2 : let range = input_start..input_end;
1141 2 :
1142 2 : // Build an image layer to filter
1143 2 : let resident = {
1144 2 : let mut writer = ImageLayerWriter::new(
1145 2 : harness.conf,
1146 2 : timeline_id,
1147 2 : harness.tenant_shard_id,
1148 2 : &range,
1149 2 : lsn,
1150 2 : &ctx,
1151 2 : )
1152 2 : .await
1153 2 : .unwrap();
1154 2 :
1155 2 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1156 2 : let mut key = range.start;
1157 262146 : while key < range.end {
1158 266239 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1159 262144 :
1160 262144 : key = key.next();
1161 2 : }
1162 119 : writer.finish(&timeline, &ctx).await.unwrap()
1163 2 : };
1164 2 : let original_size = resident.metadata().file_size;
1165 2 :
1166 2 : //
1167 2 : // Create child shards and do the rewrite, exercising filter().
1168 2 : // TODO: abstraction in TenantHarness for splits.
1169 2 : //
1170 2 :
1171 2 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1172 2 : // range, middle of key range.
1173 2 : let shard_count = ShardCount::new(4);
1174 8 : for shard_number in 0..shard_count.count() {
1175 2 : //
1176 2 : // mimic the shard split
1177 2 : //
1178 8 : let shard_identity = ShardIdentity::new(
1179 8 : ShardNumber(shard_number),
1180 8 : shard_count,
1181 8 : ShardStripeSize(0x8000),
1182 8 : )
1183 8 : .unwrap();
1184 8 : let harness = TenantHarness::create_custom(
1185 8 : Box::leak(Box::new(format!(
1186 8 : "test_image_layer_rewrite--child{}",
1187 8 : shard_identity.shard_slug()
1188 8 : ))),
1189 8 : tenant_conf.clone(),
1190 8 : tenant_id,
1191 8 : shard_identity,
1192 8 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1193 8 : // But here, all we care about is that the gen number is unique.
1194 8 : get_next_gen(),
1195 8 : )
1196 2 : .await
1197 8 : .unwrap();
1198 31 : let (tenant, ctx) = harness.load().await;
1199 8 : let timeline = tenant
1200 8 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1201 12 : .await
1202 8 : .unwrap();
1203 2 :
1204 2 : //
1205 2 : // use filter() and make assertions
1206 2 : //
1207 2 :
1208 8 : let mut filtered_writer = ImageLayerWriter::new(
1209 8 : harness.conf,
1210 8 : timeline_id,
1211 8 : harness.tenant_shard_id,
1212 8 : &range,
1213 8 : lsn,
1214 8 : &ctx,
1215 8 : )
1216 4 : .await
1217 8 : .unwrap();
1218 2 :
1219 8 : let wrote_keys = resident
1220 8 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1221 266719 : .await
1222 8 : .unwrap();
1223 8 : let replacement = if wrote_keys > 0 {
1224 129 : Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
1225 2 : } else {
1226 2 : None
1227 2 : };
1228 2 :
1229 2 : // This exact size and those below will need updating as/when the layer encoding changes, but
1230 2 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1231 8 : assert_eq!(original_size, 1597440);
1232 2 :
1233 8 : match shard_number {
1234 2 : 0 => {
1235 2 : // We should have written out just one stripe for our shard identity
1236 2 : assert_eq!(wrote_keys, 0x8000);
1237 2 : let replacement = replacement.unwrap();
1238 2 :
1239 2 : // We should have dropped some of the data
1240 2 : assert!(replacement.metadata().file_size < original_size);
1241 2 : assert!(replacement.metadata().file_size > 0);
1242 2 :
1243 2 : // Assert that we dropped ~3/4 of the data.
1244 2 : assert_eq!(replacement.metadata().file_size, 417792);
1245 2 : }
1246 2 : 1 => {
1247 2 : // Shard 1 has no keys in our input range
1248 2 : assert_eq!(wrote_keys, 0x0);
1249 2 : assert!(replacement.is_none());
1250 2 : }
1251 2 : 2 => {
1252 2 : // Shard 2 has one stripes in the input range
1253 2 : assert_eq!(wrote_keys, 0x8000);
1254 2 : let replacement = replacement.unwrap();
1255 2 : assert!(replacement.metadata().file_size < original_size);
1256 2 : assert!(replacement.metadata().file_size > 0);
1257 2 : assert_eq!(replacement.metadata().file_size, 417792);
1258 2 : }
1259 2 : 3 => {
1260 2 : // Shard 3 has two stripes in the input range
1261 2 : assert_eq!(wrote_keys, 0x10000);
1262 2 : let replacement = replacement.unwrap();
1263 2 : assert!(replacement.metadata().file_size < original_size);
1264 2 : assert!(replacement.metadata().file_size > 0);
1265 2 : assert_eq!(replacement.metadata().file_size, 811008);
1266 2 : }
1267 2 : _ => unreachable!(),
1268 2 : }
1269 2 : }
1270 2 : }
1271 :
1272 2 : async fn produce_image_layer(
1273 2 : tenant: &Tenant,
1274 2 : tline: &Arc<Timeline>,
1275 2 : mut images: Vec<(Key, Bytes)>,
1276 2 : lsn: Lsn,
1277 2 : ctx: &RequestContext,
1278 2 : ) -> anyhow::Result<ResidentLayer> {
1279 2 : images.sort();
1280 2 : let (key_start, _) = images.first().unwrap();
1281 2 : let (key_last, _) = images.last().unwrap();
1282 2 : let key_end = key_last.next();
1283 2 : let key_range = *key_start..key_end;
1284 2 : let mut writer = ImageLayerWriter::new(
1285 2 : tenant.conf,
1286 2 : tline.timeline_id,
1287 2 : tenant.tenant_shard_id,
1288 2 : &key_range,
1289 2 : lsn,
1290 2 : ctx,
1291 2 : )
1292 1 : .await?;
1293 :
1294 2002 : for (key, img) in images {
1295 2031 : writer.put_image(key, img, ctx).await?;
1296 : }
1297 4 : let img_layer = writer.finish(tline, ctx).await?;
1298 :
1299 2 : Ok::<_, anyhow::Error>(img_layer)
1300 2 : }
1301 :
1302 28 : async fn assert_img_iter_equal(
1303 28 : img_iter: &mut ImageLayerIterator<'_>,
1304 28 : expect: &[(Key, Bytes)],
1305 28 : expect_lsn: Lsn,
1306 28 : ) {
1307 28 : let mut expect_iter = expect.iter();
1308 : loop {
1309 28028 : let o1 = img_iter.next().await.unwrap();
1310 28028 : let o2 = expect_iter.next();
1311 28028 : match (o1, o2) {
1312 28 : (None, None) => break,
1313 28000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1314 28000 : let Value::Image(i1) = v1 else {
1315 0 : panic!("expect Value::Image")
1316 : };
1317 28000 : assert_eq!(&k1, k2);
1318 28000 : assert_eq!(l1, expect_lsn);
1319 28000 : assert_eq!(&i1, i2);
1320 : }
1321 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1322 : }
1323 : }
1324 28 : }
1325 :
1326 : #[tokio::test]
1327 2 : async fn image_layer_iterator() {
1328 2 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1329 8 : let (tenant, ctx) = harness.load().await;
1330 2 :
1331 2 : let tline = tenant
1332 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1333 4 : .await
1334 2 : .unwrap();
1335 2 :
1336 2000 : fn get_key(id: u32) -> Key {
1337 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1338 2000 : key.field6 = id;
1339 2000 : key
1340 2000 : }
1341 2 : const N: usize = 1000;
1342 2 : let test_imgs = (0..N)
1343 2000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1344 2 : .collect_vec();
1345 2 : let resident_layer =
1346 2 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1347 2036 : .await
1348 2 : .unwrap();
1349 2 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1350 6 : for max_read_size in [1, 1024] {
1351 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1352 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1353 28 : // Test if the batch size is correctly determined
1354 28 : let mut iter = img_layer.iter(&ctx);
1355 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1356 28 : let mut num_items = 0;
1357 112 : for _ in 0..3 {
1358 84 : iter.next_batch().await.unwrap();
1359 84 : num_items += iter.key_values_batch.len();
1360 84 : if max_read_size == 1 {
1361 2 : // every key should be a batch b/c the value is larger than max_read_size
1362 42 : assert_eq!(iter.key_values_batch.len(), 1);
1363 2 : } else {
1364 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
1365 2 : }
1366 84 : if num_items >= N {
1367 2 : break;
1368 84 : }
1369 84 : iter.key_values_batch.clear();
1370 2 : }
1371 2 : // Test if the result is correct
1372 28 : let mut iter = img_layer.iter(&ctx);
1373 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1374 9576 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1375 2 : }
1376 2 : }
1377 2 : }
1378 : }
|