LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 96.0 % 595 571
Test Date: 2025-07-16 12:29:03 Functions: 90.7 % 75 68

            Line data    Source code
       1              : use std::future::Future;
       2              : use std::ops::Range;
       3              : use std::sync::Arc;
       4              : 
       5              : use bytes::Bytes;
       6              : use pageserver_api::key::{KEY_SIZE, Key};
       7              : use tokio_util::sync::CancellationToken;
       8              : use utils::id::TimelineId;
       9              : use utils::lsn::Lsn;
      10              : use utils::shard::TenantShardId;
      11              : use wal_decoder::models::value::Value;
      12              : 
      13              : use super::errors::PutError;
      14              : use super::layer::S3_UPLOAD_LIMIT;
      15              : use super::{
      16              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      17              : };
      18              : use crate::config::PageServerConf;
      19              : use crate::context::RequestContext;
      20              : use crate::tenant::Timeline;
      21              : use crate::tenant::storage_layer::Layer;
      22              : 
      23              : pub(crate) enum BatchWriterResult {
      24              :     Produced(ResidentLayer),
      25              :     Discarded(PersistentLayerKey),
      26              : }
      27              : 
      28              : #[cfg(test)]
      29              : impl BatchWriterResult {
      30           12 :     fn into_resident_layer(self) -> ResidentLayer {
      31           12 :         match self {
      32           12 :             BatchWriterResult::Produced(layer) => layer,
      33            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      34              :         }
      35           12 :     }
      36              : 
      37            8 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      38            8 :         match self {
      39            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      40            8 :             BatchWriterResult::Discarded(layer) => layer,
      41              :         }
      42            8 :     }
      43              : }
      44              : 
      45              : enum LayerWriterWrapper {
      46              :     Image(ImageLayerWriter),
      47              :     Delta(DeltaLayerWriter),
      48              : }
      49              : 
      50              : /// An layer writer that takes unfinished layers and finish them atomically.
      51              : #[must_use]
      52              : pub struct BatchLayerWriter {
      53              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      54              :     conf: &'static PageServerConf,
      55              : }
      56              : 
      57              : impl BatchLayerWriter {
      58          249 :     pub fn new(conf: &'static PageServerConf) -> Self {
      59          249 :         Self {
      60          249 :             generated_layer_writers: Vec::new(),
      61          249 :             conf,
      62          249 :         }
      63          249 :     }
      64              : 
      65          160 :     pub fn add_unfinished_image_writer(
      66          160 :         &mut self,
      67          160 :         writer: ImageLayerWriter,
      68          160 :         key_range: Range<Key>,
      69          160 :         lsn: Lsn,
      70          160 :     ) {
      71          160 :         self.generated_layer_writers.push((
      72          160 :             LayerWriterWrapper::Image(writer),
      73          160 :             PersistentLayerKey {
      74          160 :                 key_range,
      75          160 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      76          160 :                 is_delta: false,
      77          160 :             },
      78          160 :         ));
      79          160 :     }
      80              : 
      81           30 :     pub fn add_unfinished_delta_writer(
      82           30 :         &mut self,
      83           30 :         writer: DeltaLayerWriter,
      84           30 :         key_range: Range<Key>,
      85           30 :         lsn_range: Range<Lsn>,
      86           30 :     ) {
      87           30 :         self.generated_layer_writers.push((
      88           30 :             LayerWriterWrapper::Delta(writer),
      89           30 :             PersistentLayerKey {
      90           30 :                 key_range,
      91           30 :                 lsn_range,
      92           30 :                 is_delta: true,
      93           30 :             },
      94           30 :         ));
      95           30 :     }
      96              : 
      97          191 :     pub(crate) async fn finish(
      98          191 :         self,
      99          191 :         tline: &Arc<Timeline>,
     100          191 :         ctx: &RequestContext,
     101          191 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
     102          191 :         let res = self
     103          260 :             .finish_with_discard_fn(tline, ctx, |_| async { false })
     104          191 :             .await?;
     105          191 :         let mut output = Vec::new();
     106          321 :         for r in res {
     107          130 :             if let BatchWriterResult::Produced(layer) = r {
     108          130 :                 output.push(layer);
     109          130 :             }
     110              :         }
     111          191 :         Ok(output)
     112          191 :     }
     113              : 
     114          243 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     115          243 :         self,
     116          243 :         tline: &Arc<Timeline>,
     117          243 :         ctx: &RequestContext,
     118          243 :         discard_fn: D,
     119          243 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     120          243 :     where
     121          243 :         D: Fn(&PersistentLayerKey) -> F,
     122          243 :         F: Future<Output = bool>,
     123          243 :     {
     124              :         let Self {
     125          243 :             generated_layer_writers,
     126              :             ..
     127          243 :         } = self;
     128          243 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     129            0 :             for produced_layer in generated_layers {
     130            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     131            0 :                     let layer: Layer = resident_layer.into();
     132            0 :                     layer.delete_on_drop();
     133            0 :                 }
     134              :             }
     135            0 :         };
     136              :         // BEGIN: catch every error and do the recovery in the below section
     137          243 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     138          433 :         for (inner, layer_key) in generated_layer_writers {
     139          190 :             if discard_fn(&layer_key).await {
     140           19 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     141           19 :             } else {
     142          171 :                 let res = match inner {
     143           19 :                     LayerWriterWrapper::Delta(writer) => {
     144           19 :                         writer.finish(layer_key.key_range.end, ctx).await
     145              :                     }
     146          152 :                     LayerWriterWrapper::Image(writer) => {
     147          152 :                         writer
     148          152 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     149          152 :                             .await
     150              :                     }
     151              :                 };
     152          171 :                 let layer = match res {
     153          171 :                     Ok((desc, path)) => {
     154          171 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     155          171 :                             Ok(layer) => layer,
     156            0 :                             Err(e) => {
     157            0 :                                 tokio::fs::remove_file(&path).await.ok();
     158            0 :                                 clean_up_layers(generated_layers);
     159            0 :                                 return Err(e);
     160              :                             }
     161              :                         }
     162              :                     }
     163            0 :                     Err(e) => {
     164              :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     165              :                         // so we don't need to remove the layer we just failed to create by ourselves.
     166            0 :                         clean_up_layers(generated_layers);
     167            0 :                         return Err(e);
     168              :                     }
     169              :                 };
     170          171 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     171              :             }
     172              :         }
     173              :         // END: catch every error and do the recovery in the above section
     174          243 :         Ok(generated_layers)
     175          243 :     }
     176              : 
     177            0 :     pub fn pending_layer_num(&self) -> usize {
     178            0 :         self.generated_layer_writers.len()
     179            0 :     }
     180              : }
     181              : 
     182              : /// An image writer that takes images and produces multiple image layers.
     183              : #[must_use]
     184              : pub struct SplitImageLayerWriter<'a> {
     185              :     inner: Option<ImageLayerWriter>,
     186              :     target_layer_size: u64,
     187              :     lsn: Lsn,
     188              :     conf: &'static PageServerConf,
     189              :     timeline_id: TimelineId,
     190              :     tenant_shard_id: TenantShardId,
     191              :     batches: BatchLayerWriter,
     192              :     start_key: Key,
     193              :     gate: &'a utils::sync::gate::Gate,
     194              :     cancel: CancellationToken,
     195              : }
     196              : 
     197              : impl<'a> SplitImageLayerWriter<'a> {
     198              :     #[allow(clippy::too_many_arguments)]
     199           26 :     pub fn new(
     200           26 :         conf: &'static PageServerConf,
     201           26 :         timeline_id: TimelineId,
     202           26 :         tenant_shard_id: TenantShardId,
     203           26 :         start_key: Key,
     204           26 :         lsn: Lsn,
     205           26 :         target_layer_size: u64,
     206           26 :         gate: &'a utils::sync::gate::Gate,
     207           26 :         cancel: CancellationToken,
     208           26 :     ) -> Self {
     209           26 :         Self {
     210           26 :             target_layer_size,
     211           26 :             inner: None,
     212           26 :             conf,
     213           26 :             timeline_id,
     214           26 :             tenant_shard_id,
     215           26 :             batches: BatchLayerWriter::new(conf),
     216           26 :             lsn,
     217           26 :             start_key,
     218           26 :             gate,
     219           26 :             cancel,
     220           26 :         }
     221           26 :     }
     222              : 
     223         4303 :     pub async fn put_image(
     224         4303 :         &mut self,
     225         4303 :         key: Key,
     226         4303 :         img: Bytes,
     227         4303 :         ctx: &RequestContext,
     228         4303 :     ) -> Result<(), PutError> {
     229         4303 :         if self.inner.is_none() {
     230           26 :             self.inner = Some(
     231           26 :                 ImageLayerWriter::new(
     232           26 :                     self.conf,
     233           26 :                     self.timeline_id,
     234           26 :                     self.tenant_shard_id,
     235           26 :                     &(self.start_key..Key::MAX),
     236           26 :                     self.lsn,
     237           26 :                     self.gate,
     238           26 :                     self.cancel.clone(),
     239           26 :                     ctx,
     240           26 :                 )
     241           26 :                 .await
     242           26 :                 .map_err(PutError::Other)?,
     243              :             );
     244         4277 :         }
     245              : 
     246         4303 :         let inner = self.inner.as_mut().unwrap();
     247              : 
     248              :         // The current estimation is an upper bound of the space that the key/image could take
     249              :         // because we did not consider compression in this estimation. The resulting image layer
     250              :         // could be smaller than the target size.
     251         4303 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     252         4303 :         if inner.num_keys() >= 1
     253         4277 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     254              :         {
     255            7 :             let next_image_writer = ImageLayerWriter::new(
     256            7 :                 self.conf,
     257            7 :                 self.timeline_id,
     258            7 :                 self.tenant_shard_id,
     259            7 :                 &(key..Key::MAX),
     260            7 :                 self.lsn,
     261            7 :                 self.gate,
     262            7 :                 self.cancel.clone(),
     263            7 :                 ctx,
     264            7 :             )
     265            7 :             .await
     266            7 :             .map_err(PutError::Other)?;
     267            7 :             let prev_image_writer = std::mem::replace(inner, next_image_writer);
     268            7 :             self.batches.add_unfinished_image_writer(
     269            7 :                 prev_image_writer,
     270            7 :                 self.start_key..key,
     271            7 :                 self.lsn,
     272              :             );
     273            7 :             self.start_key = key;
     274         4296 :         }
     275         4303 :         inner.put_image(key, img, ctx).await
     276         4303 :     }
     277              : 
     278           23 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     279           23 :         self,
     280           23 :         tline: &Arc<Timeline>,
     281           23 :         ctx: &RequestContext,
     282           23 :         end_key: Key,
     283           23 :         discard_fn: D,
     284           23 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     285           23 :     where
     286           23 :         D: Fn(&PersistentLayerKey) -> F,
     287           23 :         F: Future<Output = bool>,
     288           23 :     {
     289           23 :         let Self {
     290           23 :             mut batches, inner, ..
     291           23 :         } = self;
     292           23 :         if let Some(inner) = inner {
     293           23 :             if inner.num_keys() != 0 {
     294           23 :                 batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     295           23 :             }
     296            0 :         }
     297           23 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     298           23 :     }
     299              : 
     300              :     #[cfg(test)]
     301            2 :     pub(crate) async fn finish(
     302            2 :         self,
     303            2 :         tline: &Arc<Timeline>,
     304            2 :         ctx: &RequestContext,
     305            2 :         end_key: Key,
     306            2 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     307            6 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     308            2 :             .await
     309            2 :     }
     310              : }
     311              : 
     312              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     313              : ///
     314              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     315              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     316              : /// will split them into multiple files based on size.
     317              : #[must_use]
     318              : pub struct SplitDeltaLayerWriter<'a> {
     319              :     inner: Option<(Key, DeltaLayerWriter)>,
     320              :     target_layer_size: u64,
     321              :     conf: &'static PageServerConf,
     322              :     timeline_id: TimelineId,
     323              :     tenant_shard_id: TenantShardId,
     324              :     lsn_range: Range<Lsn>,
     325              :     last_key_written: Key,
     326              :     batches: BatchLayerWriter,
     327              :     gate: &'a utils::sync::gate::Gate,
     328              :     cancel: CancellationToken,
     329              : }
     330              : 
     331              : impl<'a> SplitDeltaLayerWriter<'a> {
     332           32 :     pub fn new(
     333           32 :         conf: &'static PageServerConf,
     334           32 :         timeline_id: TimelineId,
     335           32 :         tenant_shard_id: TenantShardId,
     336           32 :         lsn_range: Range<Lsn>,
     337           32 :         target_layer_size: u64,
     338           32 :         gate: &'a utils::sync::gate::Gate,
     339           32 :         cancel: CancellationToken,
     340           32 :     ) -> Self {
     341           32 :         Self {
     342           32 :             target_layer_size,
     343           32 :             inner: None,
     344           32 :             conf,
     345           32 :             timeline_id,
     346           32 :             tenant_shard_id,
     347           32 :             lsn_range,
     348           32 :             last_key_written: Key::MIN,
     349           32 :             batches: BatchLayerWriter::new(conf),
     350           32 :             gate,
     351           32 :             cancel,
     352           32 :         }
     353           32 :     }
     354              : 
     355         6104 :     pub async fn put_value(
     356         6104 :         &mut self,
     357         6104 :         key: Key,
     358         6104 :         lsn: Lsn,
     359         6104 :         val: Value,
     360         6104 :         ctx: &RequestContext,
     361         6104 :     ) -> Result<(), PutError> {
     362              :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     363              :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     364              :         //
     365              :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     366              :         // strategy. https://github.com/neondatabase/neon/issues/8837
     367              : 
     368         6104 :         if self.inner.is_none() {
     369           25 :             self.inner = Some((
     370           25 :                 key,
     371           25 :                 DeltaLayerWriter::new(
     372           25 :                     self.conf,
     373           25 :                     self.timeline_id,
     374           25 :                     self.tenant_shard_id,
     375           25 :                     key,
     376           25 :                     self.lsn_range.clone(),
     377           25 :                     self.gate,
     378           25 :                     self.cancel.clone(),
     379           25 :                     ctx,
     380           25 :                 )
     381           25 :                 .await
     382           25 :                 .map_err(PutError::Other)?,
     383              :             ));
     384         6079 :         }
     385         6104 :         let (_, inner) = self.inner.as_mut().unwrap();
     386              : 
     387         6104 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     388         6104 :         if inner.num_keys() >= 1
     389         6079 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     390              :         {
     391         1498 :             if key != self.last_key_written {
     392            7 :                 let next_delta_writer = DeltaLayerWriter::new(
     393            7 :                     self.conf,
     394            7 :                     self.timeline_id,
     395            7 :                     self.tenant_shard_id,
     396            7 :                     key,
     397            7 :                     self.lsn_range.clone(),
     398            7 :                     self.gate,
     399            7 :                     self.cancel.clone(),
     400            7 :                     ctx,
     401            7 :                 )
     402            7 :                 .await
     403            7 :                 .map_err(PutError::Other)?;
     404            7 :                 let (start_key, prev_delta_writer) =
     405            7 :                     self.inner.replace((key, next_delta_writer)).unwrap();
     406            7 :                 self.batches.add_unfinished_delta_writer(
     407            7 :                     prev_delta_writer,
     408            7 :                     start_key..key,
     409            7 :                     self.lsn_range.clone(),
     410              :                 );
     411         1491 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     412              :                 // We have to produce a very large file b/c a key is updated too often.
     413            0 :                 return Err(PutError::Other(anyhow::anyhow!(
     414            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     415            0 :                     key,
     416            0 :                     inner.estimated_size()
     417            0 :                 )));
     418         1491 :             }
     419         4606 :         }
     420         6104 :         self.last_key_written = key;
     421         6104 :         let (_, inner) = self.inner.as_mut().unwrap();
     422         6104 :         inner.put_value(key, lsn, val, ctx).await
     423         6104 :     }
     424              : 
     425           29 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     426           29 :         self,
     427           29 :         tline: &Arc<Timeline>,
     428           29 :         ctx: &RequestContext,
     429           29 :         discard_fn: D,
     430           29 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     431           29 :     where
     432           29 :         D: Fn(&PersistentLayerKey) -> F,
     433           29 :         F: Future<Output = bool>,
     434           29 :     {
     435           29 :         let Self {
     436           29 :             mut batches, inner, ..
     437           29 :         } = self;
     438           29 :         if let Some((start_key, writer)) = inner {
     439           23 :             if writer.num_keys() != 0 {
     440           23 :                 let end_key = self.last_key_written.next();
     441           23 :                 batches.add_unfinished_delta_writer(
     442           23 :                     writer,
     443           23 :                     start_key..end_key,
     444           23 :                     self.lsn_range.clone(),
     445           23 :                 );
     446           23 :             }
     447            6 :         }
     448           29 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     449           29 :     }
     450              : 
     451              :     #[cfg(test)]
     452            3 :     pub(crate) async fn finish(
     453            3 :         self,
     454            3 :         tline: &Arc<Timeline>,
     455            3 :         ctx: &RequestContext,
     456            3 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     457            8 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     458            3 :             .await
     459            3 :     }
     460              : }
     461              : 
     462              : #[cfg(test)]
     463              : mod tests {
     464              :     use itertools::Itertools;
     465              :     use rand::{RngCore, SeedableRng};
     466              : 
     467              :     use super::*;
     468              :     use crate::DEFAULT_PG_VERSION;
     469              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     470              :     use crate::tenant::storage_layer::AsLayerDesc;
     471              : 
     472        10026 :     fn get_key(id: u32) -> Key {
     473        10026 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     474        10026 :         key.field6 = id;
     475        10026 :         key
     476        10026 :     }
     477              : 
     478            4 :     fn get_img(id: u32) -> Bytes {
     479            4 :         format!("{id:064}").into()
     480            4 :     }
     481              : 
     482        10002 :     fn get_large_img() -> Bytes {
     483        10002 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     484        10002 :         let mut data = vec![0; 8192];
     485        10002 :         rng.fill_bytes(&mut data);
     486        10002 :         data.into()
     487        10002 :     }
     488              : 
     489              :     #[tokio::test]
     490            1 :     async fn write_one_image() {
     491            1 :         let harness = TenantHarness::create("split_writer_write_one_image")
     492            1 :             .await
     493            1 :             .unwrap();
     494            1 :         let (tenant, ctx) = harness.load().await;
     495              : 
     496            1 :         let tline = tenant
     497            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     498            1 :             .await
     499            1 :             .unwrap();
     500              : 
     501            1 :         let mut image_writer = SplitImageLayerWriter::new(
     502            1 :             tenant.conf,
     503            1 :             tline.timeline_id,
     504            1 :             tenant.tenant_shard_id,
     505            1 :             get_key(0),
     506            1 :             Lsn(0x18),
     507            1 :             4 * 1024 * 1024,
     508            1 :             &tline.gate,
     509            1 :             tline.cancel.clone(),
     510              :         );
     511              : 
     512            1 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     513            1 :             tenant.conf,
     514            1 :             tline.timeline_id,
     515            1 :             tenant.tenant_shard_id,
     516            1 :             Lsn(0x18)..Lsn(0x20),
     517            1 :             4 * 1024 * 1024,
     518            1 :             &tline.gate,
     519            1 :             tline.cancel.clone(),
     520              :         );
     521              : 
     522            1 :         image_writer
     523            1 :             .put_image(get_key(0), get_img(0), &ctx)
     524            1 :             .await
     525            1 :             .unwrap();
     526            1 :         let layers = image_writer
     527            1 :             .finish(&tline, &ctx, get_key(10))
     528            1 :             .await
     529            1 :             .unwrap();
     530            1 :         assert_eq!(layers.len(), 1);
     531              : 
     532            1 :         delta_writer
     533            1 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     534            1 :             .await
     535            1 :             .unwrap();
     536            1 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     537            1 :         assert_eq!(layers.len(), 1);
     538            1 :         assert_eq!(
     539            1 :             layers
     540            1 :                 .into_iter()
     541            1 :                 .next()
     542            1 :                 .unwrap()
     543            1 :                 .into_resident_layer()
     544            1 :                 .layer_desc()
     545            1 :                 .key(),
     546            1 :             PersistentLayerKey {
     547            1 :                 key_range: get_key(0)..get_key(1),
     548            1 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     549            1 :                 is_delta: true
     550            1 :             }
     551            1 :         );
     552            1 :     }
     553              : 
     554              :     #[tokio::test]
     555            1 :     async fn write_split() {
     556              :         // Test the split writer with retaining all the layers we have produced (discard=false)
     557            1 :         write_split_helper("split_writer_write_split", false).await;
     558            1 :     }
     559              : 
     560              :     #[tokio::test]
     561            1 :     async fn write_split_discard() {
     562              :         // Test the split writer with discarding all the layers we have produced (discard=true)
     563            1 :         write_split_helper("split_writer_write_split_discard", true).await;
     564            1 :     }
     565              : 
     566              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     567              :     /// set to true, all layers will be discarded.
     568            2 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     569            2 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     570            2 :         let (tenant, ctx) = harness.load().await;
     571              : 
     572            2 :         let tline = tenant
     573            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     574            2 :             .await
     575            2 :             .unwrap();
     576              : 
     577            2 :         let mut image_writer = SplitImageLayerWriter::new(
     578            2 :             tenant.conf,
     579            2 :             tline.timeline_id,
     580            2 :             tenant.tenant_shard_id,
     581            2 :             get_key(0),
     582            2 :             Lsn(0x18),
     583            2 :             4 * 1024 * 1024,
     584            2 :             &tline.gate,
     585            2 :             tline.cancel.clone(),
     586              :         );
     587            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     588            2 :             tenant.conf,
     589            2 :             tline.timeline_id,
     590            2 :             tenant.tenant_shard_id,
     591            2 :             Lsn(0x18)..Lsn(0x20),
     592            2 :             4 * 1024 * 1024,
     593            2 :             &tline.gate,
     594            2 :             tline.cancel.clone(),
     595              :         );
     596              :         const N: usize = 2000;
     597         4002 :         for i in 0..N {
     598         4000 :             let i = i as u32;
     599         4000 :             image_writer
     600         4000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     601         4000 :                 .await
     602         4000 :                 .unwrap();
     603         4000 :             delta_writer
     604         4000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     605         4000 :                 .await
     606         4000 :                 .unwrap();
     607              :         }
     608            2 :         let image_layers = image_writer
     609           16 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     610            2 :             .await
     611            2 :             .unwrap();
     612            2 :         let delta_layers = delta_writer
     613           16 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     614            2 :             .await
     615            2 :             .unwrap();
     616            2 :         let image_layers = image_layers
     617            2 :             .into_iter()
     618            8 :             .map(|x| {
     619            8 :                 if discard {
     620            4 :                     x.into_discarded_layer()
     621              :                 } else {
     622            4 :                     x.into_resident_layer().layer_desc().key()
     623              :                 }
     624            8 :             })
     625            2 :             .collect_vec();
     626            2 :         let delta_layers = delta_layers
     627            2 :             .into_iter()
     628            8 :             .map(|x| {
     629            8 :                 if discard {
     630            4 :                     x.into_discarded_layer()
     631              :                 } else {
     632            4 :                     x.into_resident_layer().layer_desc().key()
     633              :                 }
     634            8 :             })
     635            2 :             .collect_vec();
     636            2 :         assert_eq!(image_layers.len(), N / 512 + 1);
     637            2 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     638            2 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     639            2 :         assert_eq!(
     640            2 :             delta_layers.last().unwrap().key_range.end,
     641            2 :             get_key(N as u32)
     642              :         );
     643            8 :         for idx in 0..image_layers.len() {
     644            8 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     645            8 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     646            8 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     647            8 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     648            8 :             if idx > 0 {
     649            6 :                 assert_eq!(
     650            6 :                     image_layers[idx - 1].key_range.end,
     651            6 :                     image_layers[idx].key_range.start
     652              :                 );
     653            6 :                 assert_eq!(
     654            6 :                     delta_layers[idx - 1].key_range.end,
     655            6 :                     delta_layers[idx].key_range.start
     656              :                 );
     657            2 :             }
     658              :         }
     659            2 :     }
     660              : 
     661              :     #[tokio::test]
     662            1 :     async fn write_large_img() {
     663            1 :         let harness = TenantHarness::create("split_writer_write_large_img")
     664            1 :             .await
     665            1 :             .unwrap();
     666            1 :         let (tenant, ctx) = harness.load().await;
     667              : 
     668            1 :         let tline = tenant
     669            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     670            1 :             .await
     671            1 :             .unwrap();
     672              : 
     673            1 :         let mut image_writer = SplitImageLayerWriter::new(
     674            1 :             tenant.conf,
     675            1 :             tline.timeline_id,
     676            1 :             tenant.tenant_shard_id,
     677            1 :             get_key(0),
     678            1 :             Lsn(0x18),
     679            1 :             4 * 1024,
     680            1 :             &tline.gate,
     681            1 :             tline.cancel.clone(),
     682              :         );
     683              : 
     684            1 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     685            1 :             tenant.conf,
     686            1 :             tline.timeline_id,
     687            1 :             tenant.tenant_shard_id,
     688            1 :             Lsn(0x18)..Lsn(0x20),
     689            1 :             4 * 1024,
     690            1 :             &tline.gate,
     691            1 :             tline.cancel.clone(),
     692              :         );
     693              : 
     694            1 :         image_writer
     695            1 :             .put_image(get_key(0), get_img(0), &ctx)
     696            1 :             .await
     697            1 :             .unwrap();
     698            1 :         image_writer
     699            1 :             .put_image(get_key(1), get_large_img(), &ctx)
     700            1 :             .await
     701            1 :             .unwrap();
     702            1 :         let layers = image_writer
     703            1 :             .finish(&tline, &ctx, get_key(10))
     704            1 :             .await
     705            1 :             .unwrap();
     706            1 :         assert_eq!(layers.len(), 2);
     707              : 
     708            1 :         delta_writer
     709            1 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     710            1 :             .await
     711            1 :             .unwrap();
     712            1 :         delta_writer
     713            1 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     714            1 :             .await
     715            1 :             .unwrap();
     716            1 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     717            1 :         assert_eq!(layers.len(), 2);
     718            1 :         let mut layers_iter = layers.into_iter();
     719            1 :         assert_eq!(
     720            1 :             layers_iter
     721            1 :                 .next()
     722            1 :                 .unwrap()
     723            1 :                 .into_resident_layer()
     724            1 :                 .layer_desc()
     725            1 :                 .key(),
     726            1 :             PersistentLayerKey {
     727            1 :                 key_range: get_key(0)..get_key(1),
     728            1 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     729            1 :                 is_delta: true
     730            1 :             }
     731              :         );
     732            1 :         assert_eq!(
     733            1 :             layers_iter
     734            1 :                 .next()
     735            1 :                 .unwrap()
     736            1 :                 .into_resident_layer()
     737            1 :                 .layer_desc()
     738            1 :                 .key(),
     739            1 :             PersistentLayerKey {
     740            1 :                 key_range: get_key(1)..get_key(2),
     741            1 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     742            1 :                 is_delta: true
     743            1 :             }
     744            1 :         );
     745            1 :     }
     746              : 
     747              :     #[tokio::test]
     748            1 :     async fn write_split_single_key() {
     749            1 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     750            1 :             .await
     751            1 :             .unwrap();
     752            1 :         let (tenant, ctx) = harness.load().await;
     753              : 
     754            1 :         let tline = tenant
     755            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     756            1 :             .await
     757            1 :             .unwrap();
     758              : 
     759              :         const N: usize = 2000;
     760            1 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     761            1 :             tenant.conf,
     762            1 :             tline.timeline_id,
     763            1 :             tenant.tenant_shard_id,
     764            1 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     765            1 :             4 * 1024 * 1024,
     766            1 :             &tline.gate,
     767            1 :             tline.cancel.clone(),
     768              :         );
     769              : 
     770         2001 :         for i in 0..N {
     771         2000 :             let i = i as u32;
     772         2000 :             delta_writer
     773         2000 :                 .put_value(
     774         2000 :                     get_key(0),
     775         2000 :                     Lsn(i as u64 * 16 + 0x10),
     776         2000 :                     Value::Image(get_large_img()),
     777         2000 :                     &ctx,
     778         2000 :                 )
     779         2000 :                 .await
     780         2000 :                 .unwrap();
     781              :         }
     782            1 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     783            1 :         assert_eq!(delta_layers.len(), 1);
     784            1 :         let delta_layer = delta_layers
     785            1 :             .into_iter()
     786            1 :             .next()
     787            1 :             .unwrap()
     788            1 :             .into_resident_layer();
     789            1 :         assert_eq!(
     790            1 :             delta_layer.layer_desc().key(),
     791            1 :             PersistentLayerKey {
     792            1 :                 key_range: get_key(0)..get_key(1),
     793            1 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     794            1 :                 is_delta: true
     795            1 :             }
     796            1 :         );
     797            1 :     }
     798              : }
        

Generated by: LCOV version 2.1-beta