Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN.
3 : //!
4 : //! It contains an image of all key-value pairs in its key-range. Any key
5 : //! that falls into the image layer's range but does not exist in the layer,
6 : //! does not exist.
7 : //!
8 : //! An image layer is stored in a file on disk. The file is stored in
9 : //! timelines/<timeline_id> directory. Currently, there are no
10 : //! subdirectories, and each image layer file is named like this:
11 : //!
12 : //! ```text
13 : //! <key start>-<key end>__<LSN>
14 : //! ```
15 : //!
16 : //! For example:
17 : //!
18 : //! ```text
19 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
20 : //! ```
21 : //!
22 : //! Every image layer file consists of three parts: "summary",
23 : //! "index", and "values". The summary is a fixed size header at the
24 : //! beginning of the file, and it contains basic information about the
25 : //! layer, and offsets to the other parts. The "index" is a B-tree,
26 : //! mapping from Key to an offset in the "values" part. The
27 : //! actual page images are stored in the "values" part.
28 : use crate::config::PageServerConf;
29 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
30 : use crate::page_cache::{self, FileId, PAGE_SZ};
31 : use crate::tenant::blob_io::BlobWriter;
32 : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
33 : use crate::tenant::disk_btree::{
34 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
39 : VectoredReadPlanner,
40 : };
41 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
42 : use crate::virtual_file::IoBufferMut;
43 : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
44 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
45 : use anyhow::{bail, ensure, Context, Result};
46 : use bytes::Bytes;
47 : use camino::{Utf8Path, Utf8PathBuf};
48 : use hex;
49 : use itertools::Itertools;
50 : use pageserver_api::config::MaxVectoredReadBytes;
51 : use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
52 : use pageserver_api::keyspace::KeySpace;
53 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
54 : use pageserver_api::value::Value;
55 : use rand::{distributions::Alphanumeric, Rng};
56 : use serde::{Deserialize, Serialize};
57 : use std::collections::{HashMap, VecDeque};
58 : use std::fs::File;
59 : use std::io::SeekFrom;
60 : use std::ops::Range;
61 : use std::os::unix::prelude::FileExt;
62 : use std::str::FromStr;
63 : use std::sync::Arc;
64 : use tokio::sync::OnceCell;
65 : use tokio_stream::StreamExt;
66 : use tracing::*;
67 :
68 : use utils::{
69 : bin_ser::BeSer,
70 : id::{TenantId, TimelineId},
71 : lsn::Lsn,
72 : };
73 :
74 : use super::layer_name::ImageLayerName;
75 : use super::{
76 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
77 : ValuesReconstructState,
78 : };
79 :
80 : ///
81 : /// Header stored in the beginning of the file
82 : ///
83 : /// After this comes the 'values' part, starting on block 1. After that,
84 : /// the 'index' starts at the block indicated by 'index_start_blk'
85 : ///
86 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
87 : pub struct Summary {
88 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
89 : pub magic: u16,
90 : pub format_version: u16,
91 :
92 : pub tenant_id: TenantId,
93 : pub timeline_id: TimelineId,
94 : pub key_range: Range<Key>,
95 : pub lsn: Lsn,
96 :
97 : /// Block number where the 'index' part of the file begins.
98 : pub index_start_blk: u32,
99 : /// Block within the 'index', where the B-tree root page is stored
100 : pub index_root_blk: u32,
101 : // the 'values' part starts after the summary header, on block 1.
102 : }
103 :
104 : impl From<&ImageLayer> for Summary {
105 0 : fn from(layer: &ImageLayer) -> Self {
106 0 : Self::expected(
107 0 : layer.desc.tenant_shard_id.tenant_id,
108 0 : layer.desc.timeline_id,
109 0 : layer.desc.key_range.clone(),
110 0 : layer.lsn,
111 0 : )
112 0 : }
113 : }
114 :
115 : impl Summary {
116 268 : pub(super) fn expected(
117 268 : tenant_id: TenantId,
118 268 : timeline_id: TimelineId,
119 268 : key_range: Range<Key>,
120 268 : lsn: Lsn,
121 268 : ) -> Self {
122 268 : Self {
123 268 : magic: IMAGE_FILE_MAGIC,
124 268 : format_version: STORAGE_FORMAT_VERSION,
125 268 : tenant_id,
126 268 : timeline_id,
127 268 : key_range,
128 268 : lsn,
129 268 :
130 268 : index_start_blk: 0,
131 268 : index_root_blk: 0,
132 268 : }
133 268 : }
134 : }
135 :
136 : /// This is used only from `pagectl`. Within pageserver, all layers are
137 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
138 : pub struct ImageLayer {
139 : path: Utf8PathBuf,
140 : pub desc: PersistentLayerDesc,
141 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
142 : pub lsn: Lsn,
143 : inner: OnceCell<ImageLayerInner>,
144 : }
145 :
146 : impl std::fmt::Debug for ImageLayer {
147 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 : use super::RangeDisplayDebug;
149 :
150 0 : f.debug_struct("ImageLayer")
151 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
152 0 : .field("file_size", &self.desc.file_size)
153 0 : .field("lsn", &self.lsn)
154 0 : .field("inner", &self.inner)
155 0 : .finish()
156 0 : }
157 : }
158 :
159 : /// ImageLayer is the in-memory data structure associated with an on-disk image
160 : /// file.
161 : pub struct ImageLayerInner {
162 : // values copied from summary
163 : index_start_blk: u32,
164 : index_root_blk: u32,
165 :
166 : key_range: Range<Key>,
167 : lsn: Lsn,
168 :
169 : file: Arc<VirtualFile>,
170 : file_id: FileId,
171 :
172 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
173 : }
174 :
175 : impl ImageLayerInner {
176 0 : pub(crate) fn layer_dbg_info(&self) -> String {
177 0 : format!(
178 0 : "image {}..{} {}",
179 0 : self.key_range().start,
180 0 : self.key_range().end,
181 0 : self.lsn()
182 0 : )
183 0 : }
184 : }
185 :
186 : impl std::fmt::Debug for ImageLayerInner {
187 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 0 : f.debug_struct("ImageLayerInner")
189 0 : .field("index_start_blk", &self.index_start_blk)
190 0 : .field("index_root_blk", &self.index_root_blk)
191 0 : .finish()
192 0 : }
193 : }
194 :
195 : impl ImageLayerInner {
196 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
197 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
198 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
199 0 : self.index_start_blk,
200 0 : self.index_root_blk,
201 0 : block_reader,
202 0 : );
203 0 :
204 0 : tree_reader.dump().await?;
205 :
206 0 : tree_reader
207 0 : .visit(
208 0 : &[0u8; KEY_SIZE],
209 0 : VisitDirection::Forwards,
210 0 : |key, value| {
211 0 : println!("key: {} offset {}", hex::encode(key), value);
212 0 : true
213 0 : },
214 0 : ctx,
215 0 : )
216 0 : .await?;
217 :
218 0 : Ok(())
219 0 : }
220 : }
221 :
222 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
223 : impl std::fmt::Display for ImageLayer {
224 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 0 : write!(f, "{}", self.layer_desc().short_id())
226 0 : }
227 : }
228 :
229 : impl AsLayerDesc for ImageLayer {
230 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
231 0 : &self.desc
232 0 : }
233 : }
234 :
235 : impl ImageLayer {
236 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
237 0 : self.desc.dump();
238 0 :
239 0 : if !verbose {
240 0 : return Ok(());
241 0 : }
242 :
243 0 : let inner = self.load(ctx).await?;
244 :
245 0 : inner.dump(ctx).await?;
246 :
247 0 : Ok(())
248 0 : }
249 :
250 1180 : fn temp_path_for(
251 1180 : conf: &PageServerConf,
252 1180 : timeline_id: TimelineId,
253 1180 : tenant_shard_id: TenantShardId,
254 1180 : fname: &ImageLayerName,
255 1180 : ) -> Utf8PathBuf {
256 1180 : let rand_string: String = rand::thread_rng()
257 1180 : .sample_iter(&Alphanumeric)
258 1180 : .take(8)
259 1180 : .map(char::from)
260 1180 : .collect();
261 1180 :
262 1180 : conf.timeline_path(&tenant_shard_id, &timeline_id)
263 1180 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
264 1180 : }
265 :
266 : ///
267 : /// Open the underlying file and read the metadata into memory, if it's
268 : /// not loaded already.
269 : ///
270 0 : async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
271 0 : self.inner
272 0 : .get_or_try_init(|| self.load_inner(ctx))
273 0 : .await
274 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
275 0 : }
276 :
277 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
278 0 : let path = self.path();
279 :
280 0 : let loaded =
281 0 : ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
282 :
283 : // not production code
284 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
285 0 : let expected_layer_name = self.layer_desc().layer_name();
286 0 :
287 0 : if actual_layer_name != expected_layer_name {
288 0 : println!("warning: filename does not match what is expected from in-file summary");
289 0 : println!("actual: {:?}", actual_layer_name.to_string());
290 0 : println!("expected: {:?}", expected_layer_name.to_string());
291 0 : }
292 :
293 0 : Ok(loaded)
294 0 : }
295 :
296 : /// Create an ImageLayer struct representing an existing file on disk.
297 : ///
298 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
299 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
300 0 : let mut summary_buf = vec![0; PAGE_SZ];
301 0 : file.read_exact_at(&mut summary_buf, 0)?;
302 0 : let summary = Summary::des_prefix(&summary_buf)?;
303 0 : let metadata = file
304 0 : .metadata()
305 0 : .context("get file metadata to determine size")?;
306 :
307 : // This function is never used for constructing layers in a running pageserver,
308 : // so it does not need an accurate TenantShardId.
309 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
310 0 :
311 0 : Ok(ImageLayer {
312 0 : path: path.to_path_buf(),
313 0 : desc: PersistentLayerDesc::new_img(
314 0 : tenant_shard_id,
315 0 : summary.timeline_id,
316 0 : summary.key_range,
317 0 : summary.lsn,
318 0 : metadata.len(),
319 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
320 0 : lsn: summary.lsn,
321 0 : inner: OnceCell::new(),
322 0 : })
323 0 : }
324 :
325 0 : fn path(&self) -> Utf8PathBuf {
326 0 : self.path.clone()
327 0 : }
328 : }
329 :
330 : #[derive(thiserror::Error, Debug)]
331 : pub enum RewriteSummaryError {
332 : #[error("magic mismatch")]
333 : MagicMismatch,
334 : #[error(transparent)]
335 : Other(#[from] anyhow::Error),
336 : }
337 :
338 : impl From<std::io::Error> for RewriteSummaryError {
339 0 : fn from(e: std::io::Error) -> Self {
340 0 : Self::Other(anyhow::anyhow!(e))
341 0 : }
342 : }
343 :
344 : impl ImageLayer {
345 0 : pub async fn rewrite_summary<F>(
346 0 : path: &Utf8Path,
347 0 : rewrite: F,
348 0 : ctx: &RequestContext,
349 0 : ) -> Result<(), RewriteSummaryError>
350 0 : where
351 0 : F: Fn(Summary) -> Summary,
352 0 : {
353 0 : let mut file = VirtualFile::open_with_options(
354 0 : path,
355 0 : virtual_file::OpenOptions::new().read(true).write(true),
356 0 : ctx,
357 0 : )
358 0 : .await
359 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
360 0 : let file_id = page_cache::next_file_id();
361 0 : let block_reader = FileBlockReader::new(&file, file_id);
362 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
363 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
364 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
365 0 : return Err(RewriteSummaryError::MagicMismatch);
366 0 : }
367 0 :
368 0 : let new_summary = rewrite(actual_summary);
369 0 :
370 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
371 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
372 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
373 0 : file.seek(SeekFrom::Start(0)).await?;
374 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
375 0 : res?;
376 0 : Ok(())
377 0 : }
378 : }
379 :
380 : impl ImageLayerInner {
381 264 : pub(crate) fn key_range(&self) -> &Range<Key> {
382 264 : &self.key_range
383 264 : }
384 :
385 264 : pub(crate) fn lsn(&self) -> Lsn {
386 264 : self.lsn
387 264 : }
388 :
389 268 : pub(super) async fn load(
390 268 : path: &Utf8Path,
391 268 : lsn: Lsn,
392 268 : summary: Option<Summary>,
393 268 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
394 268 : ctx: &RequestContext,
395 268 : ) -> anyhow::Result<Self> {
396 268 : let file = Arc::new(
397 268 : VirtualFile::open_v2(path, ctx)
398 268 : .await
399 268 : .context("open layer file")?,
400 : );
401 268 : let file_id = page_cache::next_file_id();
402 268 : let block_reader = FileBlockReader::new(&file, file_id);
403 268 : let summary_blk = block_reader
404 268 : .read_blk(0, ctx)
405 268 : .await
406 268 : .context("read first block")?;
407 :
408 : // length is the only way how this could fail, so it's not actually likely at all unless
409 : // read_blk returns wrong sized block.
410 : //
411 : // TODO: confirm and make this into assertion
412 268 : let actual_summary =
413 268 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
414 :
415 268 : if let Some(mut expected_summary) = summary {
416 : // production code path
417 268 : expected_summary.index_start_blk = actual_summary.index_start_blk;
418 268 : expected_summary.index_root_blk = actual_summary.index_root_blk;
419 268 : // mask out the timeline_id, but still require the layers to be from the same tenant
420 268 : expected_summary.timeline_id = actual_summary.timeline_id;
421 268 :
422 268 : if actual_summary != expected_summary {
423 0 : bail!(
424 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
425 0 : actual_summary,
426 0 : expected_summary
427 0 : );
428 268 : }
429 0 : }
430 :
431 268 : Ok(ImageLayerInner {
432 268 : index_start_blk: actual_summary.index_start_blk,
433 268 : index_root_blk: actual_summary.index_root_blk,
434 268 : lsn,
435 268 : file,
436 268 : file_id,
437 268 : max_vectored_read_bytes,
438 268 : key_range: actual_summary.key_range,
439 268 : })
440 268 : }
441 :
442 : // Look up the keys in the provided keyspace and update
443 : // the reconstruct state with whatever is found.
444 45116 : pub(super) async fn get_values_reconstruct_data(
445 45116 : &self,
446 45116 : this: ResidentLayer,
447 45116 : keyspace: KeySpace,
448 45116 : reconstruct_state: &mut ValuesReconstructState,
449 45116 : ctx: &RequestContext,
450 45116 : ) -> Result<(), GetVectoredError> {
451 45116 : let reads = self
452 45116 : .plan_reads(keyspace, None, ctx)
453 45116 : .await
454 45116 : .map_err(GetVectoredError::Other)?;
455 :
456 45116 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
457 45116 : .await;
458 :
459 45116 : reconstruct_state.on_image_layer_visited(&self.key_range);
460 45116 :
461 45116 : Ok(())
462 45116 : }
463 :
464 : /// Traverse the layer's index to build read operations on the overlap of the input keyspace
465 : /// and the keys in this layer.
466 : ///
467 : /// If shard_identity is provided, it will be used to filter keys down to those stored on
468 : /// this shard.
469 45132 : async fn plan_reads(
470 45132 : &self,
471 45132 : keyspace: KeySpace,
472 45132 : shard_identity: Option<&ShardIdentity>,
473 45132 : ctx: &RequestContext,
474 45132 : ) -> anyhow::Result<Vec<VectoredRead>> {
475 45132 : let mut planner = VectoredReadPlanner::new(
476 45132 : self.max_vectored_read_bytes
477 45132 : .expect("Layer is loaded with max vectored bytes config")
478 45132 : .0
479 45132 : .into(),
480 45132 : );
481 45132 :
482 45132 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
483 45132 : let tree_reader =
484 45132 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
485 45132 :
486 45132 : let ctx = RequestContextBuilder::extend(ctx)
487 45132 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
488 45132 : .build();
489 :
490 45160 : for range in keyspace.ranges.iter() {
491 45160 : let mut range_end_handled = false;
492 45160 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
493 45160 : range.start.write_to_byte_slice(&mut search_key);
494 45160 :
495 45160 : let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
496 45160 : let mut index_stream = std::pin::pin!(index_stream);
497 :
498 2251480 : while let Some(index_entry) = index_stream.next().await {
499 2250860 : let (raw_key, offset) = index_entry?;
500 :
501 2250860 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
502 2250860 : assert!(key >= range.start);
503 :
504 2250860 : let flag = if let Some(shard_identity) = shard_identity {
505 2097152 : if shard_identity.is_key_disposable(&key) {
506 1572864 : BlobFlag::Ignore
507 : } else {
508 524288 : BlobFlag::None
509 : }
510 : } else {
511 153708 : BlobFlag::None
512 : };
513 :
514 2250860 : if key >= range.end {
515 44540 : planner.handle_range_end(offset);
516 44540 : range_end_handled = true;
517 44540 : break;
518 2206320 : } else {
519 2206320 : planner.handle(key, self.lsn, offset, flag);
520 2206320 : }
521 : }
522 :
523 45160 : if !range_end_handled {
524 620 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
525 620 : planner.handle_range_end(payload_end);
526 44540 : }
527 : }
528 :
529 45132 : Ok(planner.finish())
530 45132 : }
531 :
532 : /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
533 : /// then execute vectored GET operations, passing the results of all read keys into the writer.
534 16 : pub(super) async fn filter(
535 16 : &self,
536 16 : shard_identity: &ShardIdentity,
537 16 : writer: &mut ImageLayerWriter,
538 16 : ctx: &RequestContext,
539 16 : ) -> anyhow::Result<usize> {
540 : // Fragment the range into the regions owned by this ShardIdentity
541 16 : let plan = self
542 16 : .plan_reads(
543 16 : KeySpace {
544 16 : // If asked for the total key space, plan_reads will give us all the keys in the layer
545 16 : ranges: vec![Key::MIN..Key::MAX],
546 16 : },
547 16 : Some(shard_identity),
548 16 : ctx,
549 16 : )
550 16 : .await?;
551 :
552 16 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
553 16 : let mut key_count = 0;
554 32 : for read in plan.into_iter() {
555 32 : let buf_size = read.size();
556 32 :
557 32 : let buf = IoBufferMut::with_capacity(buf_size);
558 32 : let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
559 :
560 32 : let view = BufView::new_slice(&blobs_buf.buf);
561 :
562 524288 : for meta in blobs_buf.blobs.iter() {
563 524288 : let img_buf = meta.read(&view).await?;
564 :
565 524288 : key_count += 1;
566 524288 : writer
567 524288 : .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
568 524288 : .await
569 524288 : .context(format!("Storing key {}", meta.meta.key))?;
570 : }
571 : }
572 :
573 16 : Ok(key_count)
574 16 : }
575 :
576 45116 : async fn do_reads_and_update_state(
577 45116 : &self,
578 45116 : this: ResidentLayer,
579 45116 : reads: Vec<VectoredRead>,
580 45116 : reconstruct_state: &mut ValuesReconstructState,
581 45116 : ctx: &RequestContext,
582 45116 : ) {
583 45116 : let max_vectored_read_bytes = self
584 45116 : .max_vectored_read_bytes
585 45116 : .expect("Layer is loaded with max vectored bytes config")
586 45116 : .0
587 45116 : .into();
588 :
589 45116 : for read in reads.into_iter() {
590 45040 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
591 109168 : for (_, blob_meta) in read.blobs_at.as_slice() {
592 109168 : let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
593 109168 : ios.insert((blob_meta.key, blob_meta.lsn), io);
594 109168 : }
595 :
596 45040 : let buf_size = read.size();
597 45040 :
598 45040 : if buf_size > max_vectored_read_bytes {
599 : // If the read is oversized, it should only contain one key.
600 0 : let offenders = read
601 0 : .blobs_at
602 0 : .as_slice()
603 0 : .iter()
604 0 : .filter_map(|(_, blob_meta)| {
605 0 : if blob_meta.key.is_rel_dir_key()
606 0 : || blob_meta.key == DBDIR_KEY
607 0 : || blob_meta.key.is_aux_file_key()
608 : {
609 : // The size of values for these keys is unbounded and can
610 : // grow very large in pathological cases.
611 0 : None
612 : } else {
613 0 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
614 : }
615 0 : })
616 0 : .join(", ");
617 0 :
618 0 : if !offenders.is_empty() {
619 0 : tracing::warn!(
620 0 : "Oversized vectored read ({} > {}) for keys {}",
621 : buf_size,
622 : max_vectored_read_bytes,
623 : offenders
624 : );
625 0 : }
626 45040 : }
627 :
628 45040 : let read_extend_residency = this.clone();
629 45040 : let read_from = self.file.clone();
630 45040 : let read_ctx = ctx.attached_child();
631 45040 : reconstruct_state
632 45040 : .spawn_io(async move {
633 45040 : let buf = IoBufferMut::with_capacity(buf_size);
634 45040 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
635 45040 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
636 :
637 45040 : match res {
638 45040 : Ok(blobs_buf) => {
639 45040 : let view = BufView::new_slice(&blobs_buf.buf);
640 109168 : for meta in blobs_buf.blobs.iter() {
641 109168 : let io: OnDiskValueIo =
642 109168 : ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
643 109168 : let img_buf = meta.read(&view).await;
644 :
645 109168 : let img_buf = match img_buf {
646 109168 : Ok(img_buf) => img_buf,
647 0 : Err(e) => {
648 0 : io.complete(Err(e));
649 0 : continue;
650 : }
651 : };
652 :
653 109168 : io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
654 : }
655 :
656 45040 : assert!(ios.is_empty());
657 : }
658 0 : Err(err) => {
659 0 : for (_, io) in ios {
660 0 : io.complete(Err(std::io::Error::new(
661 0 : err.kind(),
662 0 : "vec read failed",
663 0 : )));
664 0 : }
665 : }
666 : }
667 :
668 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
669 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
670 45040 : drop(read_extend_residency);
671 45040 : })
672 45040 : .await;
673 : }
674 45116 : }
675 :
676 244 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
677 244 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
678 244 : let tree_reader =
679 244 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
680 244 : ImageLayerIterator {
681 244 : image_layer: self,
682 244 : ctx,
683 244 : index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
684 244 : key_values_batch: VecDeque::new(),
685 244 : is_end: false,
686 244 : planner: StreamingVectoredReadPlanner::new(
687 244 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
688 244 : 1024, // The default value. Unit tests might use a different value
689 244 : ),
690 244 : }
691 244 : }
692 :
693 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
694 : //
695 : // We're reusing the index traversal logical in plan_reads; would be nice to
696 : // factor that out.
697 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
698 0 : let plan = self
699 0 : .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
700 0 : .await?;
701 0 : Ok(plan
702 0 : .into_iter()
703 0 : .flat_map(|read| read.blobs_at)
704 0 : .map(|(_, blob_meta)| blob_meta.key)
705 0 : .collect())
706 0 : }
707 : }
708 :
709 : /// A builder object for constructing a new image layer.
710 : ///
711 : /// Usage:
712 : ///
713 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
714 : ///
715 : /// 2. Write the contents by calling `put_page_image` for every key-value
716 : /// pair in the key range.
717 : ///
718 : /// 3. Call `finish`.
719 : ///
720 : struct ImageLayerWriterInner {
721 : conf: &'static PageServerConf,
722 : path: Utf8PathBuf,
723 : timeline_id: TimelineId,
724 : tenant_shard_id: TenantShardId,
725 : key_range: Range<Key>,
726 : lsn: Lsn,
727 :
728 : // Total uncompressed bytes passed into put_image
729 : uncompressed_bytes: u64,
730 :
731 : // Like `uncompressed_bytes`,
732 : // but only of images we might consider for compression
733 : uncompressed_bytes_eligible: u64,
734 :
735 : // Like `uncompressed_bytes`, but only of images
736 : // where we have chosen their compressed form
737 : uncompressed_bytes_chosen: u64,
738 :
739 : // Number of keys in the layer.
740 : num_keys: usize,
741 :
742 : blob_writer: BlobWriter<false>,
743 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
744 :
745 : #[cfg(feature = "testing")]
746 : last_written_key: Key,
747 : }
748 :
749 : impl ImageLayerWriterInner {
750 : ///
751 : /// Start building a new image layer.
752 : ///
753 1180 : async fn new(
754 1180 : conf: &'static PageServerConf,
755 1180 : timeline_id: TimelineId,
756 1180 : tenant_shard_id: TenantShardId,
757 1180 : key_range: &Range<Key>,
758 1180 : lsn: Lsn,
759 1180 : ctx: &RequestContext,
760 1180 : ) -> anyhow::Result<Self> {
761 1180 : // Create the file initially with a temporary filename.
762 1180 : // We'll atomically rename it to the final name when we're done.
763 1180 : let path = ImageLayer::temp_path_for(
764 1180 : conf,
765 1180 : timeline_id,
766 1180 : tenant_shard_id,
767 1180 : &ImageLayerName {
768 1180 : key_range: key_range.clone(),
769 1180 : lsn,
770 1180 : },
771 1180 : );
772 1180 : trace!("creating image layer {}", path);
773 1180 : let mut file = {
774 1180 : VirtualFile::open_with_options(
775 1180 : &path,
776 1180 : virtual_file::OpenOptions::new()
777 1180 : .write(true)
778 1180 : .create_new(true),
779 1180 : ctx,
780 1180 : )
781 1180 : .await?
782 : };
783 : // make room for the header block
784 1180 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
785 1180 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
786 1180 :
787 1180 : // Initialize the b-tree index builder
788 1180 : let block_buf = BlockBuf::new();
789 1180 : let tree_builder = DiskBtreeBuilder::new(block_buf);
790 1180 :
791 1180 : let writer = Self {
792 1180 : conf,
793 1180 : path,
794 1180 : timeline_id,
795 1180 : tenant_shard_id,
796 1180 : key_range: key_range.clone(),
797 1180 : lsn,
798 1180 : tree: tree_builder,
799 1180 : blob_writer,
800 1180 : uncompressed_bytes: 0,
801 1180 : uncompressed_bytes_eligible: 0,
802 1180 : uncompressed_bytes_chosen: 0,
803 1180 : num_keys: 0,
804 1180 : #[cfg(feature = "testing")]
805 1180 : last_written_key: Key::MIN,
806 1180 : };
807 1180 :
808 1180 : Ok(writer)
809 1180 : }
810 :
811 : ///
812 : /// Write next value to the file.
813 : ///
814 : /// The page versions must be appended in blknum order.
815 : ///
816 1093276 : async fn put_image(
817 1093276 : &mut self,
818 1093276 : key: Key,
819 1093276 : img: Bytes,
820 1093276 : ctx: &RequestContext,
821 1093276 : ) -> anyhow::Result<()> {
822 1093276 : ensure!(self.key_range.contains(&key));
823 1093276 : let compression = self.conf.image_compression;
824 1093276 : let uncompressed_len = img.len() as u64;
825 1093276 : self.uncompressed_bytes += uncompressed_len;
826 1093276 : self.num_keys += 1;
827 1093276 : let (_img, res) = self
828 1093276 : .blob_writer
829 1093276 : .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
830 1093276 : .await;
831 : // TODO: re-use the buffer for `img` further upstack
832 1093276 : let (off, compression_info) = res?;
833 1093276 : if compression_info.compressed_size.is_some() {
834 16004 : // The image has been considered for compression at least
835 16004 : self.uncompressed_bytes_eligible += uncompressed_len;
836 1077272 : }
837 1093276 : if compression_info.written_compressed {
838 0 : // The image has been compressed
839 0 : self.uncompressed_bytes_chosen += uncompressed_len;
840 1093276 : }
841 :
842 1093276 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
843 1093276 : key.write_to_byte_slice(&mut keybuf);
844 1093276 : self.tree.append(&keybuf, off)?;
845 :
846 : #[cfg(feature = "testing")]
847 1093276 : {
848 1093276 : self.last_written_key = key;
849 1093276 : }
850 1093276 :
851 1093276 : Ok(())
852 1093276 : }
853 :
854 : ///
855 : /// Finish writing the image layer.
856 : ///
857 716 : async fn finish(
858 716 : self,
859 716 : ctx: &RequestContext,
860 716 : end_key: Option<Key>,
861 716 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
862 716 : let temp_path = self.path.clone();
863 716 : let result = self.finish0(ctx, end_key).await;
864 716 : if let Err(ref e) = result {
865 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
866 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
867 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
868 0 : }
869 716 : }
870 716 : result
871 716 : }
872 :
873 : ///
874 : /// Finish writing the image layer.
875 : ///
876 716 : async fn finish0(
877 716 : self,
878 716 : ctx: &RequestContext,
879 716 : end_key: Option<Key>,
880 716 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
881 716 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
882 716 :
883 716 : // Calculate compression ratio
884 716 : let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
885 716 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
886 716 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
887 716 : .inc_by(self.uncompressed_bytes_eligible);
888 716 : crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
889 716 : crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
890 716 :
891 716 : let mut file = self.blob_writer.into_inner();
892 716 :
893 716 : // Write out the index
894 716 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
895 716 : .await?;
896 716 : let (index_root_blk, block_buf) = self.tree.finish()?;
897 2344 : for buf in block_buf.blocks {
898 1628 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
899 1628 : res?;
900 : }
901 :
902 716 : let final_key_range = if let Some(end_key) = end_key {
903 576 : self.key_range.start..end_key
904 : } else {
905 140 : self.key_range.clone()
906 : };
907 :
908 : // Fill in the summary on blk 0
909 716 : let summary = Summary {
910 716 : magic: IMAGE_FILE_MAGIC,
911 716 : format_version: STORAGE_FORMAT_VERSION,
912 716 : tenant_id: self.tenant_shard_id.tenant_id,
913 716 : timeline_id: self.timeline_id,
914 716 : key_range: final_key_range.clone(),
915 716 : lsn: self.lsn,
916 716 : index_start_blk,
917 716 : index_root_blk,
918 716 : };
919 716 :
920 716 : let mut buf = Vec::with_capacity(PAGE_SZ);
921 716 : // TODO: could use smallvec here but it's a pain with Slice<T>
922 716 : Summary::ser_into(&summary, &mut buf)?;
923 716 : file.seek(SeekFrom::Start(0)).await?;
924 716 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
925 716 : res?;
926 :
927 716 : let metadata = file
928 716 : .metadata()
929 716 : .await
930 716 : .context("get metadata to determine file size")?;
931 :
932 716 : let desc = PersistentLayerDesc::new_img(
933 716 : self.tenant_shard_id,
934 716 : self.timeline_id,
935 716 : final_key_range,
936 716 : self.lsn,
937 716 : metadata.len(),
938 716 : );
939 :
940 : #[cfg(feature = "testing")]
941 716 : if let Some(end_key) = end_key {
942 576 : assert!(
943 576 : self.last_written_key < end_key,
944 0 : "written key violates end_key range"
945 : );
946 140 : }
947 :
948 : // Note: Because we open the file in write-only mode, we cannot
949 : // reuse the same VirtualFile for reading later. That's why we don't
950 : // set inner.file here. The first read will have to re-open it.
951 :
952 : // fsync the file
953 716 : file.sync_all()
954 716 : .await
955 716 : .maybe_fatal_err("image_layer sync_all")?;
956 :
957 716 : trace!("created image layer {}", self.path);
958 :
959 716 : Ok((desc, self.path))
960 716 : }
961 : }
962 :
963 : /// A builder object for constructing a new image layer.
964 : ///
965 : /// Usage:
966 : ///
967 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
968 : ///
969 : /// 2. Write the contents by calling `put_page_image` for every key-value
970 : /// pair in the key range.
971 : ///
972 : /// 3. Call `finish`.
973 : ///
974 : /// # Note
975 : ///
976 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
977 : /// possible for the writer to drop before `finish` is actually called. So this
978 : /// could lead to odd temporary files in the directory, exhausting file system.
979 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
980 : /// implementation that cleans up the temporary file in failure. It's not
981 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
982 : /// out some fields, making it impossible to implement `Drop`.
983 : ///
984 : #[must_use]
985 : pub struct ImageLayerWriter {
986 : inner: Option<ImageLayerWriterInner>,
987 : }
988 :
989 : impl ImageLayerWriter {
990 : ///
991 : /// Start building a new image layer.
992 : ///
993 1180 : pub async fn new(
994 1180 : conf: &'static PageServerConf,
995 1180 : timeline_id: TimelineId,
996 1180 : tenant_shard_id: TenantShardId,
997 1180 : key_range: &Range<Key>,
998 1180 : lsn: Lsn,
999 1180 : ctx: &RequestContext,
1000 1180 : ) -> anyhow::Result<ImageLayerWriter> {
1001 1180 : Ok(Self {
1002 1180 : inner: Some(
1003 1180 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
1004 1180 : .await?,
1005 : ),
1006 : })
1007 1180 : }
1008 :
1009 : ///
1010 : /// Write next value to the file.
1011 : ///
1012 : /// The page versions must be appended in blknum order.
1013 : ///
1014 1093276 : pub async fn put_image(
1015 1093276 : &mut self,
1016 1093276 : key: Key,
1017 1093276 : img: Bytes,
1018 1093276 : ctx: &RequestContext,
1019 1093276 : ) -> anyhow::Result<()> {
1020 1093276 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
1021 1093276 : }
1022 :
1023 : /// Estimated size of the image layer.
1024 17080 : pub(crate) fn estimated_size(&self) -> u64 {
1025 17080 : let inner = self.inner.as_ref().unwrap();
1026 17080 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
1027 17080 : }
1028 :
1029 17272 : pub(crate) fn num_keys(&self) -> usize {
1030 17272 : self.inner.as_ref().unwrap().num_keys
1031 17272 : }
1032 :
1033 : ///
1034 : /// Finish writing the image layer.
1035 : ///
1036 140 : pub(crate) async fn finish(
1037 140 : mut self,
1038 140 : ctx: &RequestContext,
1039 140 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1040 140 : self.inner.take().unwrap().finish(ctx, None).await
1041 140 : }
1042 :
1043 : /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
1044 576 : pub(super) async fn finish_with_end_key(
1045 576 : mut self,
1046 576 : end_key: Key,
1047 576 : ctx: &RequestContext,
1048 576 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
1049 576 : self.inner.take().unwrap().finish(ctx, Some(end_key)).await
1050 576 : }
1051 : }
1052 :
1053 : impl Drop for ImageLayerWriter {
1054 1180 : fn drop(&mut self) {
1055 1180 : if let Some(inner) = self.inner.take() {
1056 464 : inner.blob_writer.into_inner().remove();
1057 716 : }
1058 1180 : }
1059 : }
1060 :
1061 : pub struct ImageLayerIterator<'a> {
1062 : image_layer: &'a ImageLayerInner,
1063 : ctx: &'a RequestContext,
1064 : planner: StreamingVectoredReadPlanner,
1065 : index_iter: DiskBtreeIterator<'a>,
1066 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1067 : is_end: bool,
1068 : }
1069 :
1070 : impl ImageLayerIterator<'_> {
1071 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1072 0 : self.image_layer.layer_dbg_info()
1073 0 : }
1074 :
1075 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1076 38252 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1077 38252 : assert!(self.key_values_batch.is_empty());
1078 38252 : assert!(!self.is_end);
1079 :
1080 38252 : let plan = loop {
1081 58072 : if let Some(res) = self.index_iter.next().await {
1082 57884 : let (raw_key, offset) = res?;
1083 57884 : if let Some(batch_plan) = self.planner.handle(
1084 57884 : Key::from_slice(&raw_key[..KEY_SIZE]),
1085 57884 : self.image_layer.lsn,
1086 57884 : offset,
1087 57884 : true,
1088 57884 : ) {
1089 38064 : break batch_plan;
1090 19820 : }
1091 : } else {
1092 188 : self.is_end = true;
1093 188 : let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
1094 188 : if let Some(item) = self.planner.handle_range_end(payload_end) {
1095 188 : break item;
1096 : } else {
1097 0 : return Ok(()); // TODO: a test case on empty iterator
1098 : }
1099 : }
1100 : };
1101 38252 : let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
1102 38252 : let mut next_batch = std::collections::VecDeque::new();
1103 38252 : let buf_size = plan.size();
1104 38252 : let buf = IoBufferMut::with_capacity(buf_size);
1105 38252 : let blobs_buf = vectored_blob_reader
1106 38252 : .read_blobs(&plan, buf, self.ctx)
1107 38252 : .await?;
1108 38252 : let view = BufView::new_slice(&blobs_buf.buf);
1109 57828 : for meta in blobs_buf.blobs.iter() {
1110 57828 : let img_buf = meta.read(&view).await?;
1111 57828 : next_batch.push_back((
1112 57828 : meta.meta.key,
1113 57828 : self.image_layer.lsn,
1114 57828 : Value::Image(img_buf.into_bytes()),
1115 57828 : ));
1116 : }
1117 38252 : self.key_values_batch = next_batch;
1118 38252 : Ok(())
1119 38252 : }
1120 :
1121 57608 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1122 57608 : if self.key_values_batch.is_empty() {
1123 38404 : if self.is_end {
1124 320 : return Ok(None);
1125 38084 : }
1126 38084 : self.next_batch().await?;
1127 19204 : }
1128 57288 : Ok(Some(
1129 57288 : self.key_values_batch
1130 57288 : .pop_front()
1131 57288 : .expect("should not be empty"),
1132 57288 : ))
1133 57608 : }
1134 : }
1135 :
1136 : #[cfg(test)]
1137 : mod test {
1138 : use std::{sync::Arc, time::Duration};
1139 :
1140 : use bytes::Bytes;
1141 : use itertools::Itertools;
1142 : use pageserver_api::{
1143 : key::Key,
1144 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
1145 : value::Value,
1146 : };
1147 : use utils::{
1148 : generation::Generation,
1149 : id::{TenantId, TimelineId},
1150 : lsn::Lsn,
1151 : };
1152 :
1153 : use crate::{
1154 : context::RequestContext,
1155 : tenant::{
1156 : config::TenantConf,
1157 : harness::{TenantHarness, TIMELINE_ID},
1158 : storage_layer::{Layer, ResidentLayer},
1159 : vectored_blob_io::StreamingVectoredReadPlanner,
1160 : Tenant, Timeline,
1161 : },
1162 : DEFAULT_PG_VERSION,
1163 : };
1164 :
1165 : use super::{ImageLayerIterator, ImageLayerWriter};
1166 :
1167 : #[tokio::test]
1168 4 : async fn image_layer_rewrite() {
1169 4 : let tenant_conf = TenantConf {
1170 4 : gc_period: Duration::ZERO,
1171 4 : compaction_period: Duration::ZERO,
1172 4 : ..TenantConf::default()
1173 4 : };
1174 4 : let tenant_id = TenantId::generate();
1175 4 : let mut gen = Generation::new(0xdead0001);
1176 20 : let mut get_next_gen = || {
1177 20 : let ret = gen;
1178 20 : gen = gen.next();
1179 20 : ret
1180 20 : };
1181 4 : // The LSN at which we will create an image layer to filter
1182 4 : let lsn = Lsn(0xdeadbeef0000);
1183 4 : let timeline_id = TimelineId::generate();
1184 4 :
1185 4 : //
1186 4 : // Create an unsharded parent with a layer.
1187 4 : //
1188 4 :
1189 4 : let harness = TenantHarness::create_custom(
1190 4 : "test_image_layer_rewrite--parent",
1191 4 : tenant_conf.clone(),
1192 4 : tenant_id,
1193 4 : ShardIdentity::unsharded(),
1194 4 : get_next_gen(),
1195 4 : )
1196 4 : .await
1197 4 : .unwrap();
1198 4 : let (tenant, ctx) = harness.load().await;
1199 4 : let timeline = tenant
1200 4 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1201 4 : .await
1202 4 : .unwrap();
1203 4 :
1204 4 : // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
1205 4 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1206 4 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1207 4 : let range = input_start..input_end;
1208 4 :
1209 4 : // Build an image layer to filter
1210 4 : let resident = {
1211 4 : let mut writer = ImageLayerWriter::new(
1212 4 : harness.conf,
1213 4 : timeline_id,
1214 4 : harness.tenant_shard_id,
1215 4 : &range,
1216 4 : lsn,
1217 4 : &ctx,
1218 4 : )
1219 4 : .await
1220 4 : .unwrap();
1221 4 :
1222 4 : let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
1223 4 : let mut key = range.start;
1224 524292 : while key < range.end {
1225 524288 : writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
1226 524288 :
1227 524288 : key = key.next();
1228 4 : }
1229 4 : let (desc, path) = writer.finish(&ctx).await.unwrap();
1230 4 : Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
1231 4 : };
1232 4 : let original_size = resident.metadata().file_size;
1233 4 :
1234 4 : //
1235 4 : // Create child shards and do the rewrite, exercising filter().
1236 4 : // TODO: abstraction in TenantHarness for splits.
1237 4 : //
1238 4 :
1239 4 : // Filter for various shards: this exercises cases like values at start of key range, end of key
1240 4 : // range, middle of key range.
1241 4 : let shard_count = ShardCount::new(4);
1242 16 : for shard_number in 0..shard_count.count() {
1243 4 : //
1244 4 : // mimic the shard split
1245 4 : //
1246 16 : let shard_identity = ShardIdentity::new(
1247 16 : ShardNumber(shard_number),
1248 16 : shard_count,
1249 16 : ShardStripeSize(0x8000),
1250 16 : )
1251 16 : .unwrap();
1252 16 : let harness = TenantHarness::create_custom(
1253 16 : Box::leak(Box::new(format!(
1254 16 : "test_image_layer_rewrite--child{}",
1255 16 : shard_identity.shard_slug()
1256 16 : ))),
1257 16 : tenant_conf.clone(),
1258 16 : tenant_id,
1259 16 : shard_identity,
1260 16 : // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
1261 16 : // But here, all we care about is that the gen number is unique.
1262 16 : get_next_gen(),
1263 16 : )
1264 16 : .await
1265 16 : .unwrap();
1266 16 : let (tenant, ctx) = harness.load().await;
1267 16 : let timeline = tenant
1268 16 : .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
1269 16 : .await
1270 16 : .unwrap();
1271 4 :
1272 4 : //
1273 4 : // use filter() and make assertions
1274 4 : //
1275 4 :
1276 16 : let mut filtered_writer = ImageLayerWriter::new(
1277 16 : harness.conf,
1278 16 : timeline_id,
1279 16 : harness.tenant_shard_id,
1280 16 : &range,
1281 16 : lsn,
1282 16 : &ctx,
1283 16 : )
1284 16 : .await
1285 16 : .unwrap();
1286 4 :
1287 16 : let wrote_keys = resident
1288 16 : .filter(&shard_identity, &mut filtered_writer, &ctx)
1289 16 : .await
1290 16 : .unwrap();
1291 16 : let replacement = if wrote_keys > 0 {
1292 12 : let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
1293 12 : let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
1294 12 : Some(resident)
1295 4 : } else {
1296 4 : None
1297 4 : };
1298 4 :
1299 4 : // This exact size and those below will need updating as/when the layer encoding changes, but
1300 4 : // should be deterministic for a given version of the format, as we used no randomness generating the input.
1301 16 : assert_eq!(original_size, 1597440);
1302 4 :
1303 16 : match shard_number {
1304 4 : 0 => {
1305 4 : // We should have written out just one stripe for our shard identity
1306 4 : assert_eq!(wrote_keys, 0x8000);
1307 4 : let replacement = replacement.unwrap();
1308 4 :
1309 4 : // We should have dropped some of the data
1310 4 : assert!(replacement.metadata().file_size < original_size);
1311 4 : assert!(replacement.metadata().file_size > 0);
1312 4 :
1313 4 : // Assert that we dropped ~3/4 of the data.
1314 4 : assert_eq!(replacement.metadata().file_size, 417792);
1315 4 : }
1316 4 : 1 => {
1317 4 : // Shard 1 has no keys in our input range
1318 4 : assert_eq!(wrote_keys, 0x0);
1319 4 : assert!(replacement.is_none());
1320 4 : }
1321 4 : 2 => {
1322 4 : // Shard 2 has one stripes in the input range
1323 4 : assert_eq!(wrote_keys, 0x8000);
1324 4 : let replacement = replacement.unwrap();
1325 4 : assert!(replacement.metadata().file_size < original_size);
1326 4 : assert!(replacement.metadata().file_size > 0);
1327 4 : assert_eq!(replacement.metadata().file_size, 417792);
1328 4 : }
1329 4 : 3 => {
1330 4 : // Shard 3 has two stripes in the input range
1331 4 : assert_eq!(wrote_keys, 0x10000);
1332 4 : let replacement = replacement.unwrap();
1333 4 : assert!(replacement.metadata().file_size < original_size);
1334 4 : assert!(replacement.metadata().file_size > 0);
1335 4 : assert_eq!(replacement.metadata().file_size, 811008);
1336 4 : }
1337 4 : _ => unreachable!(),
1338 4 : }
1339 4 : }
1340 4 : }
1341 :
1342 4 : async fn produce_image_layer(
1343 4 : tenant: &Tenant,
1344 4 : tline: &Arc<Timeline>,
1345 4 : mut images: Vec<(Key, Bytes)>,
1346 4 : lsn: Lsn,
1347 4 : ctx: &RequestContext,
1348 4 : ) -> anyhow::Result<ResidentLayer> {
1349 4 : images.sort();
1350 4 : let (key_start, _) = images.first().unwrap();
1351 4 : let (key_last, _) = images.last().unwrap();
1352 4 : let key_end = key_last.next();
1353 4 : let key_range = *key_start..key_end;
1354 4 : let mut writer = ImageLayerWriter::new(
1355 4 : tenant.conf,
1356 4 : tline.timeline_id,
1357 4 : tenant.tenant_shard_id,
1358 4 : &key_range,
1359 4 : lsn,
1360 4 : ctx,
1361 4 : )
1362 4 : .await?;
1363 :
1364 4004 : for (key, img) in images {
1365 4000 : writer.put_image(key, img, ctx).await?;
1366 : }
1367 4 : let (desc, path) = writer.finish(ctx).await?;
1368 4 : let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
1369 :
1370 4 : Ok::<_, anyhow::Error>(img_layer)
1371 4 : }
1372 :
1373 56 : async fn assert_img_iter_equal(
1374 56 : img_iter: &mut ImageLayerIterator<'_>,
1375 56 : expect: &[(Key, Bytes)],
1376 56 : expect_lsn: Lsn,
1377 56 : ) {
1378 56 : let mut expect_iter = expect.iter();
1379 : loop {
1380 56056 : let o1 = img_iter.next().await.unwrap();
1381 56056 : let o2 = expect_iter.next();
1382 56056 : match (o1, o2) {
1383 56 : (None, None) => break,
1384 56000 : (Some((k1, l1, v1)), Some((k2, i2))) => {
1385 56000 : let Value::Image(i1) = v1 else {
1386 0 : panic!("expect Value::Image")
1387 : };
1388 56000 : assert_eq!(&k1, k2);
1389 56000 : assert_eq!(l1, expect_lsn);
1390 56000 : assert_eq!(&i1, i2);
1391 : }
1392 0 : (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
1393 : }
1394 : }
1395 56 : }
1396 :
1397 : #[tokio::test]
1398 4 : async fn image_layer_iterator() {
1399 4 : let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
1400 4 : let (tenant, ctx) = harness.load().await;
1401 4 :
1402 4 : let tline = tenant
1403 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
1404 4 : .await
1405 4 : .unwrap();
1406 4 :
1407 4000 : fn get_key(id: u32) -> Key {
1408 4000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
1409 4000 : key.field6 = id;
1410 4000 : key
1411 4000 : }
1412 4 : const N: usize = 1000;
1413 4 : let test_imgs = (0..N)
1414 4000 : .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
1415 4 : .collect_vec();
1416 4 : let resident_layer =
1417 4 : produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
1418 4 : .await
1419 4 : .unwrap();
1420 4 : let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
1421 12 : for max_read_size in [1, 1024] {
1422 64 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
1423 56 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
1424 56 : // Test if the batch size is correctly determined
1425 56 : let mut iter = img_layer.iter(&ctx);
1426 56 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1427 56 : let mut num_items = 0;
1428 224 : for _ in 0..3 {
1429 168 : iter.next_batch().await.unwrap();
1430 168 : num_items += iter.key_values_batch.len();
1431 168 : if max_read_size == 1 {
1432 4 : // every key should be a batch b/c the value is larger than max_read_size
1433 84 : assert_eq!(iter.key_values_batch.len(), 1);
1434 4 : } else {
1435 84 : assert!(iter.key_values_batch.len() <= batch_size);
1436 4 : }
1437 168 : if num_items >= N {
1438 4 : break;
1439 168 : }
1440 168 : iter.key_values_batch.clear();
1441 4 : }
1442 4 : // Test if the result is correct
1443 56 : let mut iter = img_layer.iter(&ctx);
1444 56 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
1445 56 : assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
1446 4 : }
1447 4 : }
1448 4 : }
1449 : }
|