Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use crate::config::PageServerConf;
29 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
30 : use crate::page_cache::{self, FileId, PAGE_SZ};
31 : use crate::tenant::blob_io::BlobWriter;
32 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
33 : use crate::tenant::disk_btree::{
34 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
39 : VectoredReadPlanner,
40 : };
41 : use crate::tenant::PageReconstructError;
42 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
43 : use crate::virtual_file::IoBufferMut;
44 : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
45 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::Bytes;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use hex;
50 : use itertools::Itertools;
51 : use pageserver_api::config::MaxVectoredReadBytes;
52 : use pageserver_api::key::DBDIR_KEY;
53 : use pageserver_api::key::{Key, KEY_SIZE};
54 : use pageserver_api::keyspace::KeySpace;
55 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
56 : use pageserver_api::value::Value;
57 : use rand::{distributions::Alphanumeric, Rng};
58 : use serde::{Deserialize, Serialize};
59 : use std::collections::VecDeque;
60 : use std::fs::File;
61 : use std::io::SeekFrom;
62 : use std::ops::Range;
63 : use std::os::unix::prelude::FileExt;
64 : use std::str::FromStr;
65 : use tokio::sync::OnceCell;
66 : use tokio_stream::StreamExt;
67 : use tracing::*;
68 :
69 : use utils::{
70 : bin_ser::BeSer,
71 : id::{TenantId, TimelineId},
72 : lsn::Lsn,
73 : };
74 :
75 : use super::layer_name::ImageLayerName;
76 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
77 :
78 : ///
79 : /// Header stored in the beginning of the file
80 : ///
81 : /// After this comes the 'values' part, starting on block 1. After that,
82 : /// the 'index' starts at the block indicated by 'index_start_blk'
83 : ///
84 122 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
85 : pub struct Summary {
86 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
87 : pub magic: u16,
88 : pub format_version: u16,
89 :
90 : pub tenant_id: TenantId,
91 : pub timeline_id: TimelineId,
92 : pub key_range: Range<Key>,
93 : pub lsn: Lsn,
94 :
95 : /// Block number where the 'index' part of the file begins.
96 : pub index_start_blk: u32,
97 : /// Block within the 'index', where the B-tree root page is stored
98 : pub index_root_blk: u32,
99 : // the 'values' part starts after the summary header, on block 1.
100 : }
101 :
102 : impl From<&ImageLayer> for Summary {
103 0 : fn from(layer: &ImageLayer) -> Self {
104 0 : Self::expected(
105 0 : layer.desc.tenant_shard_id.tenant_id,
106 0 : layer.desc.timeline_id,
107 0 : layer.desc.key_range.clone(),
108 0 : layer.lsn,
109 0 : )
110 0 : }
111 : }
112 :
113 : impl Summary {
114 122 : pub(super) fn expected(
115 122 : tenant_id: TenantId,
116 122 : timeline_id: TimelineId,
117 122 : key_range: Range<Key>,
118 122 : lsn: Lsn,
119 122 : ) -> Self {
120 122 : Self {
121 122 : magic: IMAGE_FILE_MAGIC,
122 122 : format_version: STORAGE_FORMAT_VERSION,
123 122 : tenant_id,
124 122 : timeline_id,
125 122 : key_range,
126 122 : lsn,
127 122 :
128 122 : index_start_blk: 0,
129 122 : index_root_blk: 0,
130 122 : }
131 122 : }
132 : }
133 :
134 : /// This is used only from `pagectl`. Within pageserver, all layers are
135 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
136 : pub struct ImageLayer {
137 : path: Utf8PathBuf,
138 : pub desc: PersistentLayerDesc,
139 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
140 : pub lsn: Lsn,
141 : inner: OnceCell<ImageLayerInner>,
142 : }
143 :
144 : impl std::fmt::Debug for ImageLayer {
145 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 : use super::RangeDisplayDebug;
147 :
148 0 : f.debug_struct("ImageLayer")
149 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
150 0 : .field("file_size", &self.desc.file_size)
151 0 : .field("lsn", &self.lsn)
152 0 : .field("inner", &self.inner)
153 0 : .finish()
154 0 : }
155 : }
156 :
157 : /// ImageLayer is the in-memory data structure associated with an on-disk image
158 : /// file.
159 : pub struct ImageLayerInner {
160 : // values copied from summary
161 : index_start_blk: u32,
162 : index_root_blk: u32,
163 :
164 : key_range: Range<Key>,
165 : lsn: Lsn,
166 :
167 : file: VirtualFile,
168 : file_id: FileId,
169 :
170 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
171 : }
172 :
173 : impl ImageLayerInner {
174 0 : pub(crate) fn layer_dbg_info(&self) -> String {
175 0 : format!(
176 0 : "image {}..{} {}",
177 0 : self.key_range().start,
178 0 : self.key_range().end,
179 0 : self.lsn()
180 0 : )
181 0 : }
182 : }
183 :
184 : impl std::fmt::Debug for ImageLayerInner {
185 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 0 : f.debug_struct("ImageLayerInner")
187 0 : .field("index_start_blk", &self.index_start_blk)
188 0 : .field("index_root_blk", &self.index_root_blk)
189 0 : .finish()
190 0 : }
191 : }
192 :
193 : impl ImageLayerInner {
194 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
195 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
196 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
197 0 : self.index_start_blk,
198 0 : self.index_root_blk,
199 0 : block_reader,
200 0 : );
201 0 :
202 0 : tree_reader.dump().await?;
203 :
204 0 : tree_reader
205 0 : .visit(
206 0 : &[0u8; KEY_SIZE],
207 0 : VisitDirection::Forwards,
208 0 : |key, value| {
209 0 : println!("key: {} offset {}", hex::encode(key), value);
210 0 : true
211 0 : },
212 0 : ctx,
213 0 : )
214 0 : .await?;
215 :
216 0 : Ok(())
217 0 : }
218 : }
219 :
220 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
221 : impl std::fmt::Display for ImageLayer {
222 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 0 : write!(f, "{}", self.layer_desc().short_id())
224 0 : }
225 : }
226 :
227 : impl AsLayerDesc for ImageLayer {
228 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
229 0 : &self.desc
230 0 : }
231 : }
232 :
233 : impl ImageLayer {
234 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
235 0 : self.desc.dump();
236 0 :
237 0 : if !verbose {
238 0 : return Ok(());
239 0 : }
240 :
241 0 : let inner = self.load(ctx).await?;
242 :
243 0 : inner.dump(ctx).await?;
244 :
245 0 : Ok(())
246 0 : }
247 :
248 518 : fn temp_path_for(
249 518 : conf: &PageServerConf,
250 518 : timeline_id: TimelineId,
251 518 : tenant_shard_id: TenantShardId,
252 518 : fname: &ImageLayerName,
253 518 : ) -> Utf8PathBuf {
254 518 : let rand_string: String = rand::thread_rng()
255 518 : .sample_iter(&Alphanumeric)
256 518 : .take(8)
257 518 : .map(char::from)
258 518 : .collect();
259 518 :
260 518 : conf.timeline_path(&tenant_shard_id, &timeline_id)
261 518 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
262 518 : }
263 :
264 : ///
265 : /// Open the underlying file and read the metadata into memory, if it's
266 : /// not loaded already.
267 : ///
268 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
269 0 : self.inner
270 0 : .get_or_try_init(|| self.load_inner(ctx))
271 0 : .await
272 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
273 0 : }
274 :
275 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
276 0 : let path = self.path();
277 :
278 0 : let loaded =
279 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
280 :
281 : // not production code
282 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
283 0 : let expected_layer_name = self.layer_desc().layer_name();
284 0 :
285 0 : if actual_layer_name != expected_layer_name {
286 0 : println!("warning: filename does not match what is expected from in-file summary");
287 0 : println!("actual: {:?}", actual_layer_name.to_string());
288 0 : println!("expected: {:?}", expected_layer_name.to_string());
289 0 : }
290 :
291 0 : Ok(loaded)
292 0 : }
293 :
294 : /// Create an ImageLayer struct representing an existing file on disk.
295 : ///
296 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
297 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
298 0 : let mut summary_buf = vec![0; PAGE_SZ];
299 0 : file.read_exact_at(&mut summary_buf, 0)?;
300 0 : let summary = Summary::des_prefix(&summary_buf)?;
301 0 : let metadata = file
302 0 : .metadata()
303 0 : .context("get file metadata to determine size")?;
304 :
305 : // This function is never used for constructing layers in a running pageserver,
306 : // so it does not need an accurate TenantShardId.
307 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
308 0 :
309 0 : Ok(ImageLayer {
310 0 : path: path.to_path_buf(),
311 0 : desc: PersistentLayerDesc::new_img(
312 0 : tenant_shard_id,
313 0 : summary.timeline_id,
314 0 : summary.key_range,
315 0 : summary.lsn,
316 0 : metadata.len(),
317 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
318 0 : lsn: summary.lsn,
319 0 : inner: OnceCell::new(),
320 0 : })
321 0 : }
322 :
323 0 : fn path(&self) -> Utf8PathBuf {
324 0 : self.path.clone()
325 0 : }
326 : }
327 :
328 0 : #[derive(thiserror::Error, Debug)]
329 : pub enum RewriteSummaryError {
330 : #[error("magic mismatch")]
331 : MagicMismatch,
332 : #[error(transparent)]
333 : Other(#[from] anyhow::Error),
334 : }
335 :
336 : impl From<std::io::Error> for RewriteSummaryError {
337 0 : fn from(e: std::io::Error) -> Self {
338 0 : Self::Other(anyhow::anyhow!(e))
339 0 : }
340 : }
341 :
342 : impl ImageLayer {
343 0 : pub async fn rewrite_summary<F>(
344 0 : path: &Utf8Path,
345 0 : rewrite: F,
346 0 : ctx: &RequestContext,
347 0 : ) -> Result<(), RewriteSummaryError>
348 0 : where
349 0 : F: Fn(Summary) -> Summary,
350 0 : {
351 0 : let mut file = VirtualFile::open_with_options(
352 0 : path,
353 0 : virtual_file::OpenOptions::new().read(true).write(true),
354 0 : ctx,
355 0 : )
356 0 : .await
357 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
358 0 : let file_id = page_cache::next_file_id();
359 0 : let block_reader = FileBlockReader::new(&file, file_id);
360 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
361 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
362 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
363 0 : return Err(RewriteSummaryError::MagicMismatch);
364 0 : }
365 0 :
366 0 : let new_summary = rewrite(actual_summary);
367 0 :
368 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
369 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
370 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
371 0 : file.seek(SeekFrom::Start(0)).await?;
372 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
373 0 : res?;
374 0 : Ok(())
375 0 : }
376 : }
377 :
378 : impl ImageLayerInner {
379 116 : pub(crate) fn key_range(&self) -> &Range<Key> {
380 116 : &self.key_range
381 116 : }
382 :
383 116 : pub(crate) fn lsn(&self) -> Lsn {
384 116 : self.lsn
385 116 : }
386 :
387 122 : pub(super) async fn load(
388 122 : path: &Utf8Path,
389 122 : lsn: Lsn,
390 122 : summary: Option<Summary>,
391 122 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
392 122 : ctx: &RequestContext,
393 122 : ) -> anyhow::Result<Self> {
394 122 : let file = VirtualFile::open_v2(path, ctx)
395 61 : .await
396 122 : .context("open layer file")?;
397 122 : let file_id = page_cache::next_file_id();
398 122 : let block_reader = FileBlockReader::new(&file, file_id);
399 122 : let summary_blk = block_reader
400 122 : .read_blk(0, ctx)
401 62 : .await
402 122 : .context("read first block")?;
403 :
404 : // length is the only way how this could fail, so it's not actually likely at all unless
405 : // read_blk returns wrong sized block.
406 : //
407 : // TODO: confirm and make this into assertion
408 122 : let actual_summary =
409 122 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
410 :
411 122 : if let Some(mut expected_summary) = summary {
412 : // production code path
413 122 : expected_summary.index_start_blk = actual_summary.index_start_blk;
414 122 : expected_summary.index_root_blk = actual_summary.index_root_blk;
415 122 : // mask out the timeline_id, but still require the layers to be from the same tenant
416 122 : expected_summary.timeline_id = actual_summary.timeline_id;
417 122 :
418 122 : if actual_summary != expected_summary {
419 0 : bail!(
420 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
421 0 : actual_summary,
422 0 : expected_summary
423 0 : );
424 122 : }
425 0 : }
426 :
427 122 : Ok(ImageLayerInner {
428 122 : index_start_blk: actual_summary.index_start_blk,
429 122 : index_root_blk: actual_summary.index_root_blk,
430 122 : lsn,
431 122 : file,
432 122 : file_id,
433 122 : max_vectored_read_bytes,
434 122 : key_range: actual_summary.key_range,
435 122 : })
436 122 : }
437 :
438 : // Look up the keys in the provided keyspace and update
439 : // the reconstruct state with whatever is found.
440 21820 : pub(super) async fn get_values_reconstruct_data(
441 21820 : &self,
442 21820 : keyspace: KeySpace,
443 21820 : reconstruct_state: &mut ValuesReconstructState,
444 21820 : ctx: &RequestContext,
445 21820 : ) -> Result<(), GetVectoredError> {
446 21820 : let reads = self
447 21820 : .plan_reads(keyspace, None, ctx)
448 492 : .await
449 21820 : .map_err(GetVectoredError::Other)?;
450 :
451 21820 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
452 11117 : .await;
453 :
454 21820 : reconstruct_state.on_image_layer_visited(&self.key_range);
455 21820 :
456 21820 : Ok(())
457 21820 : }
458 :
459 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
460 : /// and the keys in this layer.
461 : ///
462 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
463 : /// this shard.
464 21828 : async fn plan_reads(
465 21828 : &self,
466 21828 : keyspace: KeySpace,
467 21828 : shard_identity: Option<&ShardIdentity>,
468 21828 : ctx: &RequestContext,
469 21828 : ) -> anyhow::Result<Vec<VectoredRead>> {
470 21828 : let mut planner = VectoredReadPlanner::new(
471 21828 : self.max_vectored_read_bytes
472 21828 : .expect("Layer is loaded with max vectored bytes config")
473 21828 : .0
474 21828 : .into(),
475 21828 : );
476 21828 :
477 21828 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
478 21828 : let tree_reader =
479 21828 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
480 21828 :
481 21828 : let ctx = RequestContextBuilder::extend(ctx)
482 21828 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
483 21828 : .build();
484 :
485 21836 : for range in keyspace.ranges.iter() {
486 21836 : let mut range_end_handled = false;
487 21836 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
488 21836 : range.start.write_to_byte_slice(&mut search_key);
489 21836 :
490 21836 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
491 21836 : let mut index_stream = std::pin::pin!(index_stream);
492 :
493 1124260 : while let Some(index_entry) = index_stream.next().await {
494 1124032 : let (raw_key, offset) = index_entry?;
495 :
496 1124032 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
497 1124032 : assert!(key >= range.start);
498 :
499 1124032 : let flag = if let Some(shard_identity) = shard_identity {
500 1048576 : if shard_identity.is_key_disposable(&key) {
501 786432 : BlobFlag::Ignore
502 : } else {
503 262144 : BlobFlag::None
504 : }
505 : } else {
506 75456 : BlobFlag::None
507 : };
508 :
509 1124032 : if key >= range.end {
510 21608 : planner.handle_range_end(offset);
511 21608 : range_end_handled = true;
512 21608 : break;
513 1102424 : } else {
514 1102424 : planner.handle(key, self.lsn, offset, flag);
515 1102424 : }
516 : }
517 :
518 21836 : if !range_end_handled {
519 228 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
520 228 : planner.handle_range_end(payload_end);
521 21608 : }
522 : }
523 :
524 21828 : Ok(planner.finish())
525 21828 : }
526 :
527 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
528 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
529 8 : pub(super) async fn filter(
530 8 : &self,
531 8 : shard_identity: &ShardIdentity,
532 8 : writer: &mut ImageLayerWriter,
533 8 : ctx: &RequestContext,
534 8 : ) -> anyhow::Result<usize> {
535 : // Fragment the range into the regions owned by this ShardIdentity
536 8 : let plan = self
537 8 : .plan_reads(
538 8 : KeySpace {
539 8 : // If asked for the total key space, plan_reads will give us all the keys in the layer
540 8 : ranges: vec![Key::MIN..Key::MAX],
541 8 : },
542 8 : Some(shard_identity),
543 8 : ctx,
544 8 : )
545 469 : .await?;
546 :
547 8 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
548 8 : let mut key_count = 0;
549 16 : for read in plan.into_iter() {
550 16 : let buf_size = read.size();
551 16 :
552 16 : let buf = IoBufferMut::with_capacity(buf_size);
553 16 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
554 :
555 16 : let view = BufView::new_slice(&blobs_buf.buf);
556 :
557 262144 : for meta in blobs_buf.blobs.iter() {
558 262144 : let img_buf = meta.read(&view).await?;
559 :
560 262144 : key_count += 1;
561 262144 : writer
562 262144 : .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
563 266240 : .await
564 262144 : .context(format!("Storing key {}", meta.meta.key))?;
565 : }
566 : }
567 :
568 8 : Ok(key_count)
569 8 : }
570 :
571 21820 : async fn do_reads_and_update_state(
572 21820 : &self,
573 21820 : reads: Vec<VectoredRead>,
574 21820 : reconstruct_state: &mut ValuesReconstructState,
575 21820 : ctx: &RequestContext,
576 21820 : ) {
577 21820 : let max_vectored_read_bytes = self
578 21820 : .max_vectored_read_bytes
579 21820 : .expect("Layer is loaded with max vectored bytes config")
580 21820 : .0
581 21820 : .into();
582 21820 :
583 21820 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
584 21820 : for read in reads.into_iter() {
585 21796 : let buf_size = read.size();
586 21796 :
587 21796 : if buf_size > max_vectored_read_bytes {
588 : // If the read is oversized, it should only contain one key.
589 0 : let offenders = read
590 0 : .blobs_at
591 0 : .as_slice()
592 0 : .iter()
593 0 : .filter_map(|(_, blob_meta)| {
594 0 : if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
595 : // The size of values for these keys is unbounded and can
596 : // grow very large in pathological cases.
597 0 : None
598 : } else {
599 0 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
600 : }
601 0 : })
602 0 : .join(", ");
603 0 :
604 0 : if !offenders.is_empty() {
605 0 : tracing::warn!(
606 0 : "Oversized vectored read ({} > {}) for keys {}",
607 : buf_size,
608 : max_vectored_read_bytes,
609 : offenders
610 : );
611 0 : }
612 21796 : }
613 :
614 21796 : let buf = IoBufferMut::with_capacity(buf_size);
615 21796 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
616 :
617 21796 : match res {
618 21796 : Ok(blobs_buf) => {
619 21796 : let view = BufView::new_slice(&blobs_buf.buf);
620 53848 : for meta in blobs_buf.blobs.iter() {
621 53848 : let img_buf = meta.read(&view).await;
622 :
623 53848 : let img_buf = match img_buf {
624 53848 : Ok(img_buf) => img_buf,
625 0 : Err(e) => {
626 0 : reconstruct_state.on_key_error(
627 0 : meta.meta.key,
628 0 : PageReconstructError::Other(anyhow!(e).context(format!(
629 0 : "Failed to decompress blob from virtual file {}",
630 0 : self.file.path(),
631 0 : ))),
632 0 : );
633 0 :
634 0 : continue;
635 : }
636 : };
637 53848 : reconstruct_state.update_key(
638 53848 : &meta.meta.key,
639 53848 : self.lsn,
640 53848 : Value::Image(img_buf.into_bytes()),
641 53848 : );
642 : }
643 : }
644 0 : Err(err) => {
645 0 : let kind = err.kind();
646 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
647 0 : reconstruct_state.on_key_error(
648 0 : blob_meta.key,
649 0 : PageReconstructError::from(anyhow!(
650 0 : "Failed to read blobs from virtual file {}: {}",
651 0 : self.file.path(),
652 0 : kind
653 0 : )),
654 0 : );
655 0 : }
656 : }
657 : };
658 : }
659 21820 : }
660 :
661 114 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
662 114 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
663 114 : let tree_reader =
664 114 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
665 114 : ImageLayerIterator {
666 114 : image_layer: self,
667 114 : ctx,
668 114 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
669 114 : key_values_batch: VecDeque::new(),
670 114 : is_end: false,
671 114 : planner: StreamingVectoredReadPlanner::new(
672 114 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
673 114 : 1024, // The default value. Unit tests might use a different value
674 114 : ),
675 114 : }
676 114 : }
677 :
678 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
679 : //
680 : // We're reusing the index traversal logical in plan_reads; would be nice to
681 : // factor that out.
682 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
683 0 : let plan = self
684 0 : .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
685 0 : .await?;
686 0 : Ok(plan
687 0 : .into_iter()
688 0 : .flat_map(|read| read.blobs_at)
689 0 : .map(|(_, blob_meta)| blob_meta.key)
690 0 : .collect())
691 0 : }
692 : }
693 :
694 : /// A builder object for constructing a new image layer.
695 : ///
696 : /// Usage:
697 : ///
698 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
699 : ///
700 : /// 2. Write the contents by calling `put_page_image` for every key-value
701 : /// pair in the key range.
702 : ///
703 : /// 3. Call `finish`.
704 : ///
705 : struct ImageLayerWriterInner {
706 : conf: &'static PageServerConf,
707 : path: Utf8PathBuf,
708 : timeline_id: TimelineId,
709 : tenant_shard_id: TenantShardId,
710 : key_range: Range<Key>,
711 : lsn: Lsn,
712 :
713 : // Total uncompressed bytes passed into put_image
714 : uncompressed_bytes: u64,
715 :
716 : // Like `uncompressed_bytes`,
717 : // but only of images we might consider for compression
718 : uncompressed_bytes_eligible: u64,
719 :
720 : // Like `uncompressed_bytes`, but only of images
721 : // where we have chosen their compressed form
722 : uncompressed_bytes_chosen: u64,
723 :
724 : // Number of keys in the layer.
725 : num_keys: usize,
726 :
727 : blob_writer: BlobWriter<false>,
728 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
729 :
730 : #[cfg(feature = "testing")]
731 : last_written_key: Key,
732 : }
733 :
734 : impl ImageLayerWriterInner {
735 : ///
736 : /// Start building a new image layer.
737 : ///
738 518 : async fn new(
739 518 : conf: &'static PageServerConf,
740 518 : timeline_id: TimelineId,
741 518 : tenant_shard_id: TenantShardId,
742 518 : key_range: &Range<Key>,
743 518 : lsn: Lsn,
744 518 : ctx: &RequestContext,
745 518 : ) -> anyhow::Result<Self> {
746 518 : // Create the file initially with a temporary filename.
747 518 : // We'll atomically rename it to the final name when we're done.
748 518 : let path = ImageLayer::temp_path_for(
749 518 : conf,
750 518 : timeline_id,
751 518 : tenant_shard_id,
752 518 : &ImageLayerName {
753 518 : key_range: key_range.clone(),
754 518 : lsn,
755 518 : },
756 518 : );
757 518 : trace!("creating image layer {}", path);
758 518 : let mut file = {
759 518 : VirtualFile::open_with_options(
760 518 : &path,
761 518 : virtual_file::OpenOptions::new()
762 518 : .write(true)
763 518 : .create_new(true),
764 518 : ctx,
765 518 : )
766 346 : .await?
767 : };
768 : // make room for the header block
769 518 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
770 518 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
771 518 :
772 518 : // Initialize the b-tree index builder
773 518 : let block_buf = BlockBuf::new();
774 518 : let tree_builder = DiskBtreeBuilder::new(block_buf);
775 518 :
776 518 : let writer = Self {
777 518 : conf,
778 518 : path,
779 518 : timeline_id,
780 518 : tenant_shard_id,
781 518 : key_range: key_range.clone(),
782 518 : lsn,
783 518 : tree: tree_builder,
784 518 : blob_writer,
785 518 : uncompressed_bytes: 0,
786 518 : uncompressed_bytes_eligible: 0,
787 518 : uncompressed_bytes_chosen: 0,
788 518 : num_keys: 0,
789 518 : #[cfg(feature = "testing")]
790 518 : last_written_key: Key::MIN,
791 518 : };
792 518 :
793 518 : Ok(writer)
794 518 : }
795 :
796 : ///
797 : /// Write next value to the file.
798 : ///
799 : /// The page versions must be appended in blknum order.
800 : ///
801 546322 : async fn put_image(
802 546322 : &mut self,
803 546322 : key: Key,
804 546322 : img: Bytes,
805 546322 : ctx: &RequestContext,
806 546322 : ) -> anyhow::Result<()> {
807 546322 : ensure!(self.key_range.contains(&key));
808 546322 : let compression = self.conf.image_compression;
809 546322 : let uncompressed_len = img.len() as u64;
810 546322 : self.uncompressed_bytes += uncompressed_len;
811 546322 : self.num_keys += 1;
812 546322 : let (_img, res) = self
813 546322 : .blob_writer
814 546322 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
815 554980 : .await;
816 : // TODO: re-use the buffer for `img` further upstack
817 546322 : let (off, compression_info) = res?;
818 546322 : if compression_info.compressed_size.is_some() {
819 8002 : // The image has been considered for compression at least
820 8002 : self.uncompressed_bytes_eligible += uncompressed_len;
821 538320 : }
822 546322 : if compression_info.written_compressed {
823 0 : // The image has been compressed
824 0 : self.uncompressed_bytes_chosen += uncompressed_len;
825 546322 : }
826 :
827 546322 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
828 546322 : key.write_to_byte_slice(&mut keybuf);
829 546322 : self.tree.append(&keybuf, off)?;
830 :
831 : #[cfg(feature = "testing")]
832 546322 : {
833 546322 : self.last_written_key = key;
834 546322 : }
835 546322 :
836 546322 : Ok(())
837 546322 : }
838 :
839 : ///
840 : /// Finish writing the image layer.
841 : ///
842 316 : async fn finish(
843 316 : self,
844 316 : ctx: &RequestContext,
845 316 : end_key: Option<Key>,
846 316 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
847 316 : let temp_path = self.path.clone();
848 875 : let result = self.finish0(ctx, end_key).await;
849 316 : if let Err(ref e) = result {
850 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
851 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
852 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
853 0 : }
854 316 : }
855 316 : result
856 316 : }
857 :
858 : ///
859 : /// Finish writing the image layer.
860 : ///
861 316 : async fn finish0(
862 316 : self,
863 316 : ctx: &RequestContext,
864 316 : end_key: Option<Key>,
865 316 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
866 316 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
867 316 :
868 316 : // Calculate compression ratio
869 316 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
870 316 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
871 316 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
872 316 : .inc_by(self.uncompressed_bytes_eligible);
873 316 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
874 316 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
875 316 :
876 316 : let mut file = self.blob_writer.into_inner();
877 316 :
878 316 : // Write out the index
879 316 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
880 0 : .await?;
881 316 : let (index_root_blk, block_buf) = self.tree.finish()?;
882 1088 : for buf in block_buf.blocks {
883 772 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
884 772 : res?;
885 : }
886 :
887 316 : let final_key_range = if let Some(end_key) = end_key {
888 40 : self.key_range.start..end_key
889 : } else {
890 276 : self.key_range.clone()
891 : };
892 :
893 : // Fill in the summary on blk 0
894 316 : let summary = Summary {
895 316 : magic: IMAGE_FILE_MAGIC,
896 316 : format_version: STORAGE_FORMAT_VERSION,
897 316 : tenant_id: self.tenant_shard_id.tenant_id,
898 316 : timeline_id: self.timeline_id,
899 316 : key_range: final_key_range.clone(),
900 316 : lsn: self.lsn,
901 316 : index_start_blk,
902 316 : index_root_blk,
903 316 : };
904 316 :
905 316 : let mut buf = Vec::with_capacity(PAGE_SZ);
906 316 : // TODO: could use smallvec here but it's a pain with Slice<T>
907 316 : Summary::ser_into(&summary, &mut buf)?;
908 316 : file.seek(SeekFrom::Start(0)).await?;
909 316 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
910 316 : res?;
911 :
912 316 : let metadata = file
913 316 : .metadata()
914 161 : .await
915 316 : .context("get metadata to determine file size")?;
916 :
917 316 : let desc = PersistentLayerDesc::new_img(
918 316 : self.tenant_shard_id,
919 316 : self.timeline_id,
920 316 : final_key_range,
921 316 : self.lsn,
922 316 : metadata.len(),
923 316 : );
924 :
925 : #[cfg(feature = "testing")]
926 316 : if let Some(end_key) = end_key {
927 40 : assert!(
928 40 : self.last_written_key < end_key,
929 0 : "written key violates end_key range"
930 : );
931 276 : }
932 :
933 : // Note: Because we open the file in write-only mode, we cannot
934 : // reuse the same VirtualFile for reading later. That's why we don't
935 : // set inner.file here. The first read will have to re-open it.
936 :
937 : // fsync the file
938 316 : file.sync_all()
939 160 : .await
940 316 : .maybe_fatal_err("image_layer sync_all")?;
941 :
942 316 : trace!("created image layer {}", self.path);
943 :
944 316 : Ok((desc, self.path))
945 316 : }
946 : }
947 :
948 : /// A builder object for constructing a new image layer.
949 : ///
950 : /// Usage:
951 : ///
952 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
953 : ///
954 : /// 2. Write the contents by calling `put_page_image` for every key-value
955 : /// pair in the key range.
956 : ///
957 : /// 3. Call `finish`.
958 : ///
959 : /// # Note
960 : ///
961 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
962 : /// possible for the writer to drop before `finish` is actually called. So this
963 : /// could lead to odd temporary files in the directory, exhausting file system.
964 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
965 : /// implementation that cleans up the temporary file in failure. It's not
966 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
967 : /// out some fields, making it impossible to implement `Drop`.
968 : ///
969 : #[must_use]
970 : pub struct ImageLayerWriter {
971 : inner: Option<ImageLayerWriterInner>,
972 : }
973 :
974 : impl ImageLayerWriter {
975 : ///
976 : /// Start building a new image layer.
977 : ///
978 518 : pub async fn new(
979 518 : conf: &'static PageServerConf,
980 518 : timeline_id: TimelineId,
981 518 : tenant_shard_id: TenantShardId,
982 518 : key_range: &Range<Key>,
983 518 : lsn: Lsn,
984 518 : ctx: &RequestContext,
985 518 : ) -> anyhow::Result<ImageLayerWriter> {
986 518 : Ok(Self {
987 518 : inner: Some(
988 518 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
989 346 : .await?,
990 : ),
991 : })
992 518 : }
993 :
994 : ///
995 : /// Write next value to the file.
996 : ///
997 : /// The page versions must be appended in blknum order.
998 : ///
999 546322 : pub async fn put_image(
1000 546322 : &mut self,
1001 546322 : key: Key,
1002 546322 : img: Bytes,
1003 546322 : ctx: &RequestContext,
1004 546322 : ) -> anyhow::Result<()> {
1005 554980 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
1006 546322 : }
1007 :
1008 : /// Estimated size of the image layer.
1009 8476 : pub(crate) fn estimated_size(&self) -> u64 {
1010 8476 : let inner = self.inner.as_ref().unwrap();
1011 8476 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
1012 8476 : }
1013 :
1014 8564 : pub(crate) fn num_keys(&self) -> usize {
1015 8564 : self.inner.as_ref().unwrap().num_keys
1016 8564 : }
1017 :
1018 : ///
1019 : /// Finish writing the image layer.
1020 : ///
1021 276 : pub(crate) async fn finish(
1022 276 : mut self,
1023 276 : ctx: &RequestContext,
1024 276 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1025 795 : self.inner.take().unwrap().finish(ctx, None).await
1026 276 : }
1027 :
1028 : /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
1029 40 : pub(super) async fn finish_with_end_key(
1030 40 : mut self,
1031 40 : end_key: Key,
1032 40 : ctx: &RequestContext,
1033 40 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1034 80 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
1035 40 : }
1036 : }
1037 :
1038 : impl Drop for ImageLayerWriter {
1039 518 : fn drop(&mut self) {
1040 518 : if let Some(inner) = self.inner.take() {
1041 202 : inner.blob_writer.into_inner().remove();
1042 316 : }
1043 518 : }
1044 : }
1045 :
1046 : pub struct ImageLayerIterator<'a> {
1047 : image_layer: &'a ImageLayerInner,
1048 : ctx: &'a RequestContext,
1049 : planner: StreamingVectoredReadPlanner,
1050 : index_iter: DiskBtreeIterator<'a>,
1051 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1052 : is_end: bool,
1053 : }
1054 :
1055 : impl<'a> ImageLayerIterator<'a> {
1056 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1057 0 : self.image_layer.layer_dbg_info()
1058 0 : }
1059 :
1060 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1061 19118 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1062 19118 : assert!(self.key_values_batch.is_empty());
1063 19118 : assert!(!self.is_end);
1064 :
1065 19118 : let plan = loop {
1066 28960 : if let Some(res) = self.index_iter.next().await {
1067 28874 : let (raw_key, offset) = res?;
1068 28874 : if let Some(batch_plan) = self.planner.handle(
1069 28874 : Key::from_slice(&raw_key[..KEY_SIZE]),
1070 28874 : self.image_layer.lsn,
1071 28874 : offset,
1072 28874 : ) {
1073 19032 : break batch_plan;
1074 9842 : }
1075 : } else {
1076 86 : self.is_end = true;
1077 86 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1078 86 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1079 86 : break item;
1080 : } else {
1081 0 : return Ok(()); // TODO: a test case on empty iterator
1082 : }
1083 : }
1084 : };
1085 19118 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1086 19118 : let mut next_batch = std::collections::VecDeque::new();
1087 19118 : let buf_size = plan.size();
1088 19118 : let buf = IoBufferMut::with_capacity(buf_size);
1089 19118 : let blobs_buf = vectored_blob_reader
1090 19118 : .read_blobs(&plan, buf, self.ctx)
1091 9707 : .await?;
1092 19118 : let view = BufView::new_slice(&blobs_buf.buf);
1093 28846 : for meta in blobs_buf.blobs.iter() {
1094 28846 : let img_buf = meta.read(&view).await?;
1095 28846 : next_batch.push_back((
1096 28846 : meta.meta.key,
1097 28846 : self.image_layer.lsn,
1098 28846 : Value::Image(img_buf.into_bytes()),
1099 28846 : ));
1100 : }
1101 19118 : self.key_values_batch = next_batch;
1102 19118 : Ok(())
1103 19118 : }
1104 :
1105 28720 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1106 28720 : if self.key_values_batch.is_empty() {
1107 19178 : if self.is_end {
1108 144 : return Ok(None);
1109 19034 : }
1110 19034 : self.next_batch().await?;
1111 9542 : }
1112 28576 : Ok(Some(
1113 28576 : self.key_values_batch
1114 28576 : .pop_front()
1115 28576 : .expect("should not be empty"),
1116 28576 : ))
1117 28720 : }
1118 : }
1119 :
1120 : #[cfg(test)]
1121 : mod test {
1122 : use std::{sync::Arc, time::Duration};
1123 :
1124 : use bytes::Bytes;
1125 : use itertools::Itertools;
1126 : use pageserver_api::{
1127 : key::Key,
1128 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1129 : value::Value,
1130 : };
1131 : use utils::{
1132 : generation::Generation,
1133 : id::{TenantId, TimelineId},
1134 : lsn::Lsn,
1135 : };
1136 :
1137 : use crate::{
1138 : context::RequestContext,
1139 : tenant::{
1140 : config::TenantConf,
1141 : harness::{TenantHarness, TIMELINE_ID},
1142 : storage_layer::{Layer, ResidentLayer},
1143 : vectored_blob_io::StreamingVectoredReadPlanner,
1144 : Tenant, Timeline,
1145 : },
1146 : DEFAULT_PG_VERSION,
1147 : };
1148 :
1149 : use super::{ImageLayerIterator, ImageLayerWriter};
1150 :
1151 : #[tokio::test]
1152 2 : async fn image_layer_rewrite() {
1153 2 : let tenant_conf = TenantConf {
1154 2 : gc_period: Duration::ZERO,
1155 2 : compaction_period: Duration::ZERO,
1156 2 : ..TenantConf::default()
1157 2 : };
1158 2 : let tenant_id = TenantId::generate();
1159 2 : let mut gen = Generation::new(0xdead0001);
1160 10 : let mut get_next_gen = || {
1161 10 : let ret = gen;
1162 10 : gen = gen.next();
1163 10 : ret
1164 10 : };
1165 2 : // The LSN at which we will create an image layer to filter
1166 2 : let lsn = Lsn(0xdeadbeef0000);
1167 2 : let timeline_id = TimelineId::generate();
1168 2 :
1169 2 : //
1170 2 : // Create an unsharded parent with a layer.
1171 2 : //
1172 2 :
1173 2 : let harness = TenantHarness::create_custom(
1174 2 : "test_image_layer_rewrite--parent",
1175 2 : tenant_conf.clone(),
1176 2 : tenant_id,
1177 2 : ShardIdentity::unsharded(),
1178 2 : get_next_gen(),
1179 2 : )
1180 2 : .await
1181 2 : .unwrap();
1182 20 : let (tenant, ctx) = harness.load().await;
1183 2 : let timeline = tenant
1184 2 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1185 6 : .await
1186 2 : .unwrap();
1187 2 :
1188 2 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1189 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1190 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1191 2 : let range = input_start..input_end;
1192 2 :
1193 2 : // Build an image layer to filter
1194 2 : let resident = {
1195 2 : let mut writer = ImageLayerWriter::new(
1196 2 : harness.conf,
1197 2 : timeline_id,
1198 2 : harness.tenant_shard_id,
1199 2 : &range,
1200 2 : lsn,
1201 2 : &ctx,
1202 2 : )
1203 2 : .await
1204 2 : .unwrap();
1205 2 :
1206 2 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1207 2 : let mut key = range.start;
1208 262146 : while key < range.end {
1209 266239 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1210 262144 :
1211 262144 : key = key.next();
1212 2 : }
1213 119 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1214 2 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1215 2 : };
1216 2 : let original_size = resident.metadata().file_size;
1217 2 :
1218 2 : //
1219 2 : // Create child shards and do the rewrite, exercising filter().
1220 2 : // TODO: abstraction in TenantHarness for splits.
1221 2 : //
1222 2 :
1223 2 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1224 2 : // range, middle of key range.
1225 2 : let shard_count = ShardCount::new(4);
1226 8 : for shard_number in 0..shard_count.count() {
1227 2 : //
1228 2 : // mimic the shard split
1229 2 : //
1230 8 : let shard_identity = ShardIdentity::new(
1231 8 : ShardNumber(shard_number),
1232 8 : shard_count,
1233 8 : ShardStripeSize(0x8000),
1234 8 : )
1235 8 : .unwrap();
1236 8 : let harness = TenantHarness::create_custom(
1237 8 : Box::leak(Box::new(format!(
1238 8 : "test_image_layer_rewrite--child{}",
1239 8 : shard_identity.shard_slug()
1240 8 : ))),
1241 8 : tenant_conf.clone(),
1242 8 : tenant_id,
1243 8 : shard_identity,
1244 8 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1245 8 : // But here, all we care about is that the gen number is unique.
1246 8 : get_next_gen(),
1247 8 : )
1248 2 : .await
1249 8 : .unwrap();
1250 80 : let (tenant, ctx) = harness.load().await;
1251 8 : let timeline = tenant
1252 8 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1253 20 : .await
1254 8 : .unwrap();
1255 2 :
1256 2 : //
1257 2 : // use filter() and make assertions
1258 2 : //
1259 2 :
1260 8 : let mut filtered_writer = ImageLayerWriter::new(
1261 8 : harness.conf,
1262 8 : timeline_id,
1263 8 : harness.tenant_shard_id,
1264 8 : &range,
1265 8 : lsn,
1266 8 : &ctx,
1267 8 : )
1268 4 : .await
1269 8 : .unwrap();
1270 2 :
1271 8 : let wrote_keys = resident
1272 8 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1273 266719 : .await
1274 8 : .unwrap();
1275 8 : let replacement = if wrote_keys > 0 {
1276 129 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1277 6 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1278 6 : Some(resident)
1279 2 : } else {
1280 2 : None
1281 2 : };
1282 2 :
1283 2 : // This exact size and those below will need updating as/when the layer encoding changes, but
1284 2 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1285 8 : assert_eq!(original_size, 1597440);
1286 2 :
1287 8 : match shard_number {
1288 2 : 0 => {
1289 2 : // We should have written out just one stripe for our shard identity
1290 2 : assert_eq!(wrote_keys, 0x8000);
1291 2 : let replacement = replacement.unwrap();
1292 2 :
1293 2 : // We should have dropped some of the data
1294 2 : assert!(replacement.metadata().file_size < original_size);
1295 2 : assert!(replacement.metadata().file_size > 0);
1296 2 :
1297 2 : // Assert that we dropped ~3/4 of the data.
1298 2 : assert_eq!(replacement.metadata().file_size, 417792);
1299 2 : }
1300 2 : 1 => {
1301 2 : // Shard 1 has no keys in our input range
1302 2 : assert_eq!(wrote_keys, 0x0);
1303 2 : assert!(replacement.is_none());
1304 2 : }
1305 2 : 2 => {
1306 2 : // Shard 2 has one stripes in the input range
1307 2 : assert_eq!(wrote_keys, 0x8000);
1308 2 : let replacement = replacement.unwrap();
1309 2 : assert!(replacement.metadata().file_size < original_size);
1310 2 : assert!(replacement.metadata().file_size > 0);
1311 2 : assert_eq!(replacement.metadata().file_size, 417792);
1312 2 : }
1313 2 : 3 => {
1314 2 : // Shard 3 has two stripes in the input range
1315 2 : assert_eq!(wrote_keys, 0x10000);
1316 2 : let replacement = replacement.unwrap();
1317 2 : assert!(replacement.metadata().file_size < original_size);
1318 2 : assert!(replacement.metadata().file_size > 0);
1319 2 : assert_eq!(replacement.metadata().file_size, 811008);
1320 2 : }
1321 2 : _ => unreachable!(),
1322 2 : }
1323 2 : }
1324 2 : }
1325 :
1326 2 : async fn produce_image_layer(
1327 2 : tenant: &Tenant,
1328 2 : tline: &Arc<Timeline>,
1329 2 : mut images: Vec<(Key, Bytes)>,
1330 2 : lsn: Lsn,
1331 2 : ctx: &RequestContext,
1332 2 : ) -> anyhow::Result<ResidentLayer> {
1333 2 : images.sort();
1334 2 : let (key_start, _) = images.first().unwrap();
1335 2 : let (key_last, _) = images.last().unwrap();
1336 2 : let key_end = key_last.next();
1337 2 : let key_range = *key_start..key_end;
1338 2 : let mut writer = ImageLayerWriter::new(
1339 2 : tenant.conf,
1340 2 : tline.timeline_id,
1341 2 : tenant.tenant_shard_id,
1342 2 : &key_range,
1343 2 : lsn,
1344 2 : ctx,
1345 2 : )
1346 1 : .await?;
1347 :
1348 2002 : for (key, img) in images {
1349 2031 : writer.put_image(key, img, ctx).await?;
1350 : }
1351 4 : let (desc, path) = writer.finish(ctx).await?;
1352 2 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1353 :
1354 2 : Ok::<_, anyhow::Error>(img_layer)
1355 2 : }
1356 :
1357 28 : async fn assert_img_iter_equal(
1358 28 : img_iter: &mut ImageLayerIterator<'_>,
1359 28 : expect: &[(Key, Bytes)],
1360 28 : expect_lsn: Lsn,
1361 28 : ) {
1362 28 : let mut expect_iter = expect.iter();
1363 : loop {
1364 28028 : let o1 = img_iter.next().await.unwrap();
1365 28028 : let o2 = expect_iter.next();
1366 28028 : match (o1, o2) {
1367 28 : (None, None) => break,
1368 28000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1369 28000 : let Value::Image(i1) = v1 else {
1370 0 : panic!("expect Value::Image")
1371 : };
1372 28000 : assert_eq!(&k1, k2);
1373 28000 : assert_eq!(l1, expect_lsn);
1374 28000 : assert_eq!(&i1, i2);
1375 : }
1376 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1377 : }
1378 : }
1379 28 : }
1380 :
1381 : #[tokio::test]
1382 2 : async fn image_layer_iterator() {
1383 2 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1384 20 : let (tenant, ctx) = harness.load().await;
1385 2 :
1386 2 : let tline = tenant
1387 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1388 6 : .await
1389 2 : .unwrap();
1390 2 :
1391 2000 : fn get_key(id: u32) -> Key {
1392 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1393 2000 : key.field6 = id;
1394 2000 : key
1395 2000 : }
1396 2 : const N: usize = 1000;
1397 2 : let test_imgs = (0..N)
1398 2000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1399 2 : .collect_vec();
1400 2 : let resident_layer =
1401 2 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1402 2036 : .await
1403 2 : .unwrap();
1404 2 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1405 6 : for max_read_size in [1, 1024] {
1406 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1407 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1408 28 : // Test if the batch size is correctly determined
1409 28 : let mut iter = img_layer.iter(&ctx);
1410 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1411 28 : let mut num_items = 0;
1412 112 : for _ in 0..3 {
1413 84 : iter.next_batch().await.unwrap();
1414 84 : num_items += iter.key_values_batch.len();
1415 84 : if max_read_size == 1 {
1416 2 : // every key should be a batch b/c the value is larger than max_read_size
1417 42 : assert_eq!(iter.key_values_batch.len(), 1);
1418 2 : } else {
1419 42 : assert!(iter.key_values_batch.len() <= batch_size);
1420 2 : }
1421 84 : if num_items >= N {
1422 2 : break;
1423 84 : }
1424 84 : iter.key_values_batch.clear();
1425 2 : }
1426 2 : // Test if the result is correct
1427 28 : let mut iter = img_layer.iter(&ctx);
1428 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1429 9635 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1430 2 : }
1431 2 : }
1432 2 : }
1433 : }
|