LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 96.4 % 615 593
Test Date: 2025-03-12 00:01:28 Functions: 92.3 % 78 72

            Line data    Source code
       1              : use std::future::Future;
       2              : use std::ops::Range;
       3              : use std::sync::Arc;
       4              : 
       5              : use bytes::Bytes;
       6              : use pageserver_api::key::{KEY_SIZE, Key};
       7              : use pageserver_api::value::Value;
       8              : use utils::id::TimelineId;
       9              : use utils::lsn::Lsn;
      10              : use utils::shard::TenantShardId;
      11              : 
      12              : use super::layer::S3_UPLOAD_LIMIT;
      13              : use super::{
      14              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      15              : };
      16              : use crate::config::PageServerConf;
      17              : use crate::context::RequestContext;
      18              : use crate::tenant::Timeline;
      19              : use crate::tenant::storage_layer::Layer;
      20              : 
      21              : pub(crate) enum BatchWriterResult {
      22              :     Produced(ResidentLayer),
      23              :     Discarded(PersistentLayerKey),
      24              : }
      25              : 
      26              : #[cfg(test)]
      27              : impl BatchWriterResult {
      28           48 :     fn into_resident_layer(self) -> ResidentLayer {
      29           48 :         match self {
      30           48 :             BatchWriterResult::Produced(layer) => layer,
      31            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      32              :         }
      33           48 :     }
      34              : 
      35           32 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      36           32 :         match self {
      37            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      38           32 :             BatchWriterResult::Discarded(layer) => layer,
      39           32 :         }
      40           32 :     }
      41              : }
      42              : 
      43              : enum LayerWriterWrapper {
      44              :     Image(ImageLayerWriter),
      45              :     Delta(DeltaLayerWriter),
      46              : }
      47              : 
      48              : /// An layer writer that takes unfinished layers and finish them atomically.
      49              : #[must_use]
      50              : pub struct BatchLayerWriter {
      51              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      52              :     conf: &'static PageServerConf,
      53              : }
      54              : 
      55              : impl BatchLayerWriter {
      56         1372 :     pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
      57         1372 :         Ok(Self {
      58         1372 :             generated_layer_writers: Vec::new(),
      59         1372 :             conf,
      60         1372 :         })
      61         1372 :     }
      62              : 
      63          616 :     pub fn add_unfinished_image_writer(
      64          616 :         &mut self,
      65          616 :         writer: ImageLayerWriter,
      66          616 :         key_range: Range<Key>,
      67          616 :         lsn: Lsn,
      68          616 :     ) {
      69          616 :         self.generated_layer_writers.push((
      70          616 :             LayerWriterWrapper::Image(writer),
      71          616 :             PersistentLayerKey {
      72          616 :                 key_range,
      73          616 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      74          616 :                 is_delta: false,
      75          616 :             },
      76          616 :         ));
      77          616 :     }
      78              : 
      79          120 :     pub fn add_unfinished_delta_writer(
      80          120 :         &mut self,
      81          120 :         writer: DeltaLayerWriter,
      82          120 :         key_range: Range<Key>,
      83          120 :         lsn_range: Range<Lsn>,
      84          120 :     ) {
      85          120 :         self.generated_layer_writers.push((
      86          120 :             LayerWriterWrapper::Delta(writer),
      87          120 :             PersistentLayerKey {
      88          120 :                 key_range,
      89          120 :                 lsn_range,
      90          120 :                 is_delta: true,
      91          120 :             },
      92          120 :         ));
      93          120 :     }
      94              : 
      95         1148 :     pub(crate) async fn finish(
      96         1148 :         self,
      97         1148 :         tline: &Arc<Timeline>,
      98         1148 :         ctx: &RequestContext,
      99         1148 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
     100         1148 :         let res = self
     101         1148 :             .finish_with_discard_fn(tline, ctx, |_| async { false })
     102         1148 :             .await?;
     103         1148 :         let mut output = Vec::new();
     104         1644 :         for r in res {
     105          496 :             if let BatchWriterResult::Produced(layer) = r {
     106          496 :                 output.push(layer);
     107          496 :             }
     108              :         }
     109         1148 :         Ok(output)
     110         1148 :     }
     111              : 
     112         1356 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     113         1356 :         self,
     114         1356 :         tline: &Arc<Timeline>,
     115         1356 :         ctx: &RequestContext,
     116         1356 :         discard_fn: D,
     117         1356 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     118         1356 :     where
     119         1356 :         D: Fn(&PersistentLayerKey) -> F,
     120         1356 :         F: Future<Output = bool>,
     121         1356 :     {
     122         1356 :         let Self {
     123         1356 :             generated_layer_writers,
     124         1356 :             ..
     125         1356 :         } = self;
     126         1356 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     127            0 :             for produced_layer in generated_layers {
     128            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     129            0 :                     let layer: Layer = resident_layer.into();
     130            0 :                     layer.delete_on_drop();
     131            0 :                 }
     132              :             }
     133            0 :         };
     134              :         // BEGIN: catch every error and do the recovery in the below section
     135         1356 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     136         2092 :         for (inner, layer_key) in generated_layer_writers {
     137          736 :             if discard_fn(&layer_key).await {
     138           76 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     139           76 :             } else {
     140          660 :                 let res = match inner {
     141           76 :                     LayerWriterWrapper::Delta(writer) => {
     142           76 :                         writer.finish(layer_key.key_range.end, ctx).await
     143              :                     }
     144          584 :                     LayerWriterWrapper::Image(writer) => {
     145          584 :                         writer
     146          584 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     147          584 :                             .await
     148              :                     }
     149              :                 };
     150          660 :                 let layer = match res {
     151          660 :                     Ok((desc, path)) => {
     152          660 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     153          660 :                             Ok(layer) => layer,
     154            0 :                             Err(e) => {
     155            0 :                                 tokio::fs::remove_file(&path).await.ok();
     156            0 :                                 clean_up_layers(generated_layers);
     157            0 :                                 return Err(e);
     158              :                             }
     159              :                         }
     160              :                     }
     161            0 :                     Err(e) => {
     162            0 :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     163            0 :                         // so we don't need to remove the layer we just failed to create by ourselves.
     164            0 :                         clean_up_layers(generated_layers);
     165            0 :                         return Err(e);
     166              :                     }
     167              :                 };
     168          660 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     169              :             }
     170              :         }
     171              :         // END: catch every error and do the recovery in the above section
     172         1356 :         Ok(generated_layers)
     173         1356 :     }
     174              : 
     175            4 :     pub fn pending_layer_num(&self) -> usize {
     176            4 :         self.generated_layer_writers.len()
     177            4 :     }
     178              : }
     179              : 
     180              : /// An image writer that takes images and produces multiple image layers.
     181              : #[must_use]
     182              : pub struct SplitImageLayerWriter {
     183              :     inner: ImageLayerWriter,
     184              :     target_layer_size: u64,
     185              :     lsn: Lsn,
     186              :     conf: &'static PageServerConf,
     187              :     timeline_id: TimelineId,
     188              :     tenant_shard_id: TenantShardId,
     189              :     batches: BatchLayerWriter,
     190              :     start_key: Key,
     191              : }
     192              : 
     193              : impl SplitImageLayerWriter {
     194          100 :     pub async fn new(
     195          100 :         conf: &'static PageServerConf,
     196          100 :         timeline_id: TimelineId,
     197          100 :         tenant_shard_id: TenantShardId,
     198          100 :         start_key: Key,
     199          100 :         lsn: Lsn,
     200          100 :         target_layer_size: u64,
     201          100 :         ctx: &RequestContext,
     202          100 :     ) -> anyhow::Result<Self> {
     203          100 :         Ok(Self {
     204          100 :             target_layer_size,
     205          100 :             inner: ImageLayerWriter::new(
     206          100 :                 conf,
     207          100 :                 timeline_id,
     208          100 :                 tenant_shard_id,
     209          100 :                 &(start_key..Key::MAX),
     210          100 :                 lsn,
     211          100 :                 ctx,
     212          100 :             )
     213          100 :             .await?,
     214          100 :             conf,
     215          100 :             timeline_id,
     216          100 :             tenant_shard_id,
     217          100 :             batches: BatchLayerWriter::new(conf).await?,
     218          100 :             lsn,
     219          100 :             start_key,
     220              :         })
     221          100 :     }
     222              : 
     223        17180 :     pub async fn put_image(
     224        17180 :         &mut self,
     225        17180 :         key: Key,
     226        17180 :         img: Bytes,
     227        17180 :         ctx: &RequestContext,
     228        17180 :     ) -> anyhow::Result<()> {
     229        17180 :         // The current estimation is an upper bound of the space that the key/image could take
     230        17180 :         // because we did not consider compression in this estimation. The resulting image layer
     231        17180 :         // could be smaller than the target size.
     232        17180 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     233        17180 :         if self.inner.num_keys() >= 1
     234        17080 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     235              :         {
     236           28 :             let next_image_writer = ImageLayerWriter::new(
     237           28 :                 self.conf,
     238           28 :                 self.timeline_id,
     239           28 :                 self.tenant_shard_id,
     240           28 :                 &(key..Key::MAX),
     241           28 :                 self.lsn,
     242           28 :                 ctx,
     243           28 :             )
     244           28 :             .await?;
     245           28 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     246           28 :             self.batches.add_unfinished_image_writer(
     247           28 :                 prev_image_writer,
     248           28 :                 self.start_key..key,
     249           28 :                 self.lsn,
     250           28 :             );
     251           28 :             self.start_key = key;
     252        17152 :         }
     253        17180 :         self.inner.put_image(key, img, ctx).await
     254        17180 :     }
     255              : 
     256           92 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     257           92 :         self,
     258           92 :         tline: &Arc<Timeline>,
     259           92 :         ctx: &RequestContext,
     260           92 :         end_key: Key,
     261           92 :         discard_fn: D,
     262           92 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     263           92 :     where
     264           92 :         D: Fn(&PersistentLayerKey) -> F,
     265           92 :         F: Future<Output = bool>,
     266           92 :     {
     267           92 :         let Self {
     268           92 :             mut batches, inner, ..
     269           92 :         } = self;
     270           92 :         if inner.num_keys() != 0 {
     271           92 :             batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     272           92 :         }
     273           92 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     274           92 :     }
     275              : 
     276              :     #[cfg(test)]
     277            8 :     pub(crate) async fn finish(
     278            8 :         self,
     279            8 :         tline: &Arc<Timeline>,
     280            8 :         ctx: &RequestContext,
     281            8 :         end_key: Key,
     282            8 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     283           12 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     284            8 :             .await
     285            8 :     }
     286              : }
     287              : 
     288              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     289              : ///
     290              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     291              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     292              : /// will split them into multiple files based on size.
     293              : #[must_use]
     294              : pub struct SplitDeltaLayerWriter {
     295              :     inner: Option<(Key, DeltaLayerWriter)>,
     296              :     target_layer_size: u64,
     297              :     conf: &'static PageServerConf,
     298              :     timeline_id: TimelineId,
     299              :     tenant_shard_id: TenantShardId,
     300              :     lsn_range: Range<Lsn>,
     301              :     last_key_written: Key,
     302              :     batches: BatchLayerWriter,
     303              : }
     304              : 
     305              : impl SplitDeltaLayerWriter {
     306          124 :     pub async fn new(
     307          124 :         conf: &'static PageServerConf,
     308          124 :         timeline_id: TimelineId,
     309          124 :         tenant_shard_id: TenantShardId,
     310          124 :         lsn_range: Range<Lsn>,
     311          124 :         target_layer_size: u64,
     312          124 :     ) -> anyhow::Result<Self> {
     313          124 :         Ok(Self {
     314          124 :             target_layer_size,
     315          124 :             inner: None,
     316          124 :             conf,
     317          124 :             timeline_id,
     318          124 :             tenant_shard_id,
     319          124 :             lsn_range,
     320          124 :             last_key_written: Key::MIN,
     321          124 :             batches: BatchLayerWriter::new(conf).await?,
     322              :         })
     323          124 :     }
     324              : 
     325        24416 :     pub async fn put_value(
     326        24416 :         &mut self,
     327        24416 :         key: Key,
     328        24416 :         lsn: Lsn,
     329        24416 :         val: Value,
     330        24416 :         ctx: &RequestContext,
     331        24416 :     ) -> anyhow::Result<()> {
     332        24416 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     333        24416 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     334        24416 :         //
     335        24416 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     336        24416 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     337        24416 : 
     338        24416 :         if self.inner.is_none() {
     339          100 :             self.inner = Some((
     340          100 :                 key,
     341          100 :                 DeltaLayerWriter::new(
     342          100 :                     self.conf,
     343          100 :                     self.timeline_id,
     344          100 :                     self.tenant_shard_id,
     345          100 :                     key,
     346          100 :                     self.lsn_range.clone(),
     347          100 :                     ctx,
     348          100 :                 )
     349          100 :                 .await?,
     350              :             ));
     351        24316 :         }
     352        24416 :         let (_, inner) = self.inner.as_mut().unwrap();
     353        24416 : 
     354        24416 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     355        24416 :         if inner.num_keys() >= 1
     356        24316 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     357              :         {
     358         5992 :             if key != self.last_key_written {
     359           28 :                 let next_delta_writer = DeltaLayerWriter::new(
     360           28 :                     self.conf,
     361           28 :                     self.timeline_id,
     362           28 :                     self.tenant_shard_id,
     363           28 :                     key,
     364           28 :                     self.lsn_range.clone(),
     365           28 :                     ctx,
     366           28 :                 )
     367           28 :                 .await?;
     368           28 :                 let (start_key, prev_delta_writer) =
     369           28 :                     std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
     370           28 :                 self.batches.add_unfinished_delta_writer(
     371           28 :                     prev_delta_writer,
     372           28 :                     start_key..key,
     373           28 :                     self.lsn_range.clone(),
     374           28 :                 );
     375         5964 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     376              :                 // We have to produce a very large file b/c a key is updated too often.
     377            0 :                 anyhow::bail!(
     378            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     379            0 :                     key,
     380            0 :                     inner.estimated_size()
     381            0 :                 );
     382         5964 :             }
     383        18424 :         }
     384        24416 :         self.last_key_written = key;
     385        24416 :         let (_, inner) = self.inner.as_mut().unwrap();
     386        24416 :         inner.put_value(key, lsn, val, ctx).await
     387        24416 :     }
     388              : 
     389          116 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     390          116 :         self,
     391          116 :         tline: &Arc<Timeline>,
     392          116 :         ctx: &RequestContext,
     393          116 :         discard_fn: D,
     394          116 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     395          116 :     where
     396          116 :         D: Fn(&PersistentLayerKey) -> F,
     397          116 :         F: Future<Output = bool>,
     398          116 :     {
     399          116 :         let Self {
     400          116 :             mut batches, inner, ..
     401          116 :         } = self;
     402          116 :         if let Some((start_key, writer)) = inner {
     403           92 :             if writer.num_keys() != 0 {
     404           92 :                 let end_key = self.last_key_written.next();
     405           92 :                 batches.add_unfinished_delta_writer(
     406           92 :                     writer,
     407           92 :                     start_key..end_key,
     408           92 :                     self.lsn_range.clone(),
     409           92 :                 );
     410           92 :             }
     411           24 :         }
     412          116 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     413          116 :     }
     414              : 
     415              :     #[cfg(test)]
     416           12 :     pub(crate) async fn finish(
     417           12 :         self,
     418           12 :         tline: &Arc<Timeline>,
     419           12 :         ctx: &RequestContext,
     420           12 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     421           16 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     422           12 :             .await
     423           12 :     }
     424              : }
     425              : 
     426              : #[cfg(test)]
     427              : mod tests {
     428              :     use itertools::Itertools;
     429              :     use rand::{RngCore, SeedableRng};
     430              : 
     431              :     use super::*;
     432              :     use crate::DEFAULT_PG_VERSION;
     433              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     434              :     use crate::tenant::storage_layer::AsLayerDesc;
     435              : 
     436        40104 :     fn get_key(id: u32) -> Key {
     437        40104 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     438        40104 :         key.field6 = id;
     439        40104 :         key
     440        40104 :     }
     441              : 
     442           16 :     fn get_img(id: u32) -> Bytes {
     443           16 :         format!("{id:064}").into()
     444           16 :     }
     445              : 
     446        40008 :     fn get_large_img() -> Bytes {
     447        40008 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     448        40008 :         let mut data = vec![0; 8192];
     449        40008 :         rng.fill_bytes(&mut data);
     450        40008 :         data.into()
     451        40008 :     }
     452              : 
     453              :     #[tokio::test]
     454            4 :     async fn write_one_image() {
     455            4 :         let harness = TenantHarness::create("split_writer_write_one_image")
     456            4 :             .await
     457            4 :             .unwrap();
     458            4 :         let (tenant, ctx) = harness.load().await;
     459            4 : 
     460            4 :         let tline = tenant
     461            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     462            4 :             .await
     463            4 :             .unwrap();
     464            4 : 
     465            4 :         let mut image_writer = SplitImageLayerWriter::new(
     466            4 :             tenant.conf,
     467            4 :             tline.timeline_id,
     468            4 :             tenant.tenant_shard_id,
     469            4 :             get_key(0),
     470            4 :             Lsn(0x18),
     471            4 :             4 * 1024 * 1024,
     472            4 :             &ctx,
     473            4 :         )
     474            4 :         .await
     475            4 :         .unwrap();
     476            4 : 
     477            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     478            4 :             tenant.conf,
     479            4 :             tline.timeline_id,
     480            4 :             tenant.tenant_shard_id,
     481            4 :             Lsn(0x18)..Lsn(0x20),
     482            4 :             4 * 1024 * 1024,
     483            4 :         )
     484            4 :         .await
     485            4 :         .unwrap();
     486            4 : 
     487            4 :         image_writer
     488            4 :             .put_image(get_key(0), get_img(0), &ctx)
     489            4 :             .await
     490            4 :             .unwrap();
     491            4 :         let layers = image_writer
     492            4 :             .finish(&tline, &ctx, get_key(10))
     493            4 :             .await
     494            4 :             .unwrap();
     495            4 :         assert_eq!(layers.len(), 1);
     496            4 : 
     497            4 :         delta_writer
     498            4 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     499            4 :             .await
     500            4 :             .unwrap();
     501            4 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     502            4 :         assert_eq!(layers.len(), 1);
     503            4 :         assert_eq!(
     504            4 :             layers
     505            4 :                 .into_iter()
     506            4 :                 .next()
     507            4 :                 .unwrap()
     508            4 :                 .into_resident_layer()
     509            4 :                 .layer_desc()
     510            4 :                 .key(),
     511            4 :             PersistentLayerKey {
     512            4 :                 key_range: get_key(0)..get_key(1),
     513            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     514            4 :                 is_delta: true
     515            4 :             }
     516            4 :         );
     517            4 :     }
     518              : 
     519              :     #[tokio::test]
     520            4 :     async fn write_split() {
     521            4 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     522            4 :         write_split_helper("split_writer_write_split", false).await;
     523            4 :     }
     524              : 
     525              :     #[tokio::test]
     526            4 :     async fn write_split_discard() {
     527            4 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     528            4 :         write_split_helper("split_writer_write_split_discard", true).await;
     529            4 :     }
     530              : 
     531              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     532              :     /// set to true, all layers will be discarded.
     533            8 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     534            8 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     535            8 :         let (tenant, ctx) = harness.load().await;
     536              : 
     537            8 :         let tline = tenant
     538            8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     539            8 :             .await
     540            8 :             .unwrap();
     541              : 
     542            8 :         let mut image_writer = SplitImageLayerWriter::new(
     543            8 :             tenant.conf,
     544            8 :             tline.timeline_id,
     545            8 :             tenant.tenant_shard_id,
     546            8 :             get_key(0),
     547            8 :             Lsn(0x18),
     548            8 :             4 * 1024 * 1024,
     549            8 :             &ctx,
     550            8 :         )
     551            8 :         .await
     552            8 :         .unwrap();
     553            8 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     554            8 :             tenant.conf,
     555            8 :             tline.timeline_id,
     556            8 :             tenant.tenant_shard_id,
     557            8 :             Lsn(0x18)..Lsn(0x20),
     558            8 :             4 * 1024 * 1024,
     559            8 :         )
     560            8 :         .await
     561            8 :         .unwrap();
     562              :         const N: usize = 2000;
     563        16008 :         for i in 0..N {
     564        16000 :             let i = i as u32;
     565        16000 :             image_writer
     566        16000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     567        16000 :                 .await
     568        16000 :                 .unwrap();
     569        16000 :             delta_writer
     570        16000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     571        16000 :                 .await
     572        16000 :                 .unwrap();
     573              :         }
     574            8 :         let image_layers = image_writer
     575           32 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     576            8 :             .await
     577            8 :             .unwrap();
     578            8 :         let delta_layers = delta_writer
     579           32 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     580            8 :             .await
     581            8 :             .unwrap();
     582            8 :         let image_layers = image_layers
     583            8 :             .into_iter()
     584           32 :             .map(|x| {
     585           32 :                 if discard {
     586           16 :                     x.into_discarded_layer()
     587              :                 } else {
     588           16 :                     x.into_resident_layer().layer_desc().key()
     589              :                 }
     590           32 :             })
     591            8 :             .collect_vec();
     592            8 :         let delta_layers = delta_layers
     593            8 :             .into_iter()
     594           32 :             .map(|x| {
     595           32 :                 if discard {
     596           16 :                     x.into_discarded_layer()
     597              :                 } else {
     598           16 :                     x.into_resident_layer().layer_desc().key()
     599              :                 }
     600           32 :             })
     601            8 :             .collect_vec();
     602            8 :         assert_eq!(image_layers.len(), N / 512 + 1);
     603            8 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     604            8 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     605            8 :         assert_eq!(
     606            8 :             delta_layers.last().unwrap().key_range.end,
     607            8 :             get_key(N as u32)
     608            8 :         );
     609           32 :         for idx in 0..image_layers.len() {
     610           32 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     611           32 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     612           32 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     613           32 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     614           32 :             if idx > 0 {
     615           24 :                 assert_eq!(
     616           24 :                     image_layers[idx - 1].key_range.end,
     617           24 :                     image_layers[idx].key_range.start
     618           24 :                 );
     619           24 :                 assert_eq!(
     620           24 :                     delta_layers[idx - 1].key_range.end,
     621           24 :                     delta_layers[idx].key_range.start
     622           24 :                 );
     623            8 :             }
     624              :         }
     625            8 :     }
     626              : 
     627              :     #[tokio::test]
     628            4 :     async fn write_large_img() {
     629            4 :         let harness = TenantHarness::create("split_writer_write_large_img")
     630            4 :             .await
     631            4 :             .unwrap();
     632            4 :         let (tenant, ctx) = harness.load().await;
     633            4 : 
     634            4 :         let tline = tenant
     635            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     636            4 :             .await
     637            4 :             .unwrap();
     638            4 : 
     639            4 :         let mut image_writer = SplitImageLayerWriter::new(
     640            4 :             tenant.conf,
     641            4 :             tline.timeline_id,
     642            4 :             tenant.tenant_shard_id,
     643            4 :             get_key(0),
     644            4 :             Lsn(0x18),
     645            4 :             4 * 1024,
     646            4 :             &ctx,
     647            4 :         )
     648            4 :         .await
     649            4 :         .unwrap();
     650            4 : 
     651            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     652            4 :             tenant.conf,
     653            4 :             tline.timeline_id,
     654            4 :             tenant.tenant_shard_id,
     655            4 :             Lsn(0x18)..Lsn(0x20),
     656            4 :             4 * 1024,
     657            4 :         )
     658            4 :         .await
     659            4 :         .unwrap();
     660            4 : 
     661            4 :         image_writer
     662            4 :             .put_image(get_key(0), get_img(0), &ctx)
     663            4 :             .await
     664            4 :             .unwrap();
     665            4 :         image_writer
     666            4 :             .put_image(get_key(1), get_large_img(), &ctx)
     667            4 :             .await
     668            4 :             .unwrap();
     669            4 :         let layers = image_writer
     670            4 :             .finish(&tline, &ctx, get_key(10))
     671            4 :             .await
     672            4 :             .unwrap();
     673            4 :         assert_eq!(layers.len(), 2);
     674            4 : 
     675            4 :         delta_writer
     676            4 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     677            4 :             .await
     678            4 :             .unwrap();
     679            4 :         delta_writer
     680            4 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     681            4 :             .await
     682            4 :             .unwrap();
     683            4 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     684            4 :         assert_eq!(layers.len(), 2);
     685            4 :         let mut layers_iter = layers.into_iter();
     686            4 :         assert_eq!(
     687            4 :             layers_iter
     688            4 :                 .next()
     689            4 :                 .unwrap()
     690            4 :                 .into_resident_layer()
     691            4 :                 .layer_desc()
     692            4 :                 .key(),
     693            4 :             PersistentLayerKey {
     694            4 :                 key_range: get_key(0)..get_key(1),
     695            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     696            4 :                 is_delta: true
     697            4 :             }
     698            4 :         );
     699            4 :         assert_eq!(
     700            4 :             layers_iter
     701            4 :                 .next()
     702            4 :                 .unwrap()
     703            4 :                 .into_resident_layer()
     704            4 :                 .layer_desc()
     705            4 :                 .key(),
     706            4 :             PersistentLayerKey {
     707            4 :                 key_range: get_key(1)..get_key(2),
     708            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     709            4 :                 is_delta: true
     710            4 :             }
     711            4 :         );
     712            4 :     }
     713              : 
     714              :     #[tokio::test]
     715            4 :     async fn write_split_single_key() {
     716            4 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     717            4 :             .await
     718            4 :             .unwrap();
     719            4 :         let (tenant, ctx) = harness.load().await;
     720            4 : 
     721            4 :         let tline = tenant
     722            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     723            4 :             .await
     724            4 :             .unwrap();
     725            4 : 
     726            4 :         const N: usize = 2000;
     727            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     728            4 :             tenant.conf,
     729            4 :             tline.timeline_id,
     730            4 :             tenant.tenant_shard_id,
     731            4 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     732            4 :             4 * 1024 * 1024,
     733            4 :         )
     734            4 :         .await
     735            4 :         .unwrap();
     736            4 : 
     737         8004 :         for i in 0..N {
     738         8000 :             let i = i as u32;
     739         8000 :             delta_writer
     740         8000 :                 .put_value(
     741         8000 :                     get_key(0),
     742         8000 :                     Lsn(i as u64 * 16 + 0x10),
     743         8000 :                     Value::Image(get_large_img()),
     744         8000 :                     &ctx,
     745         8000 :                 )
     746         8000 :                 .await
     747         8000 :                 .unwrap();
     748            4 :         }
     749            4 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     750            4 :         assert_eq!(delta_layers.len(), 1);
     751            4 :         let delta_layer = delta_layers
     752            4 :             .into_iter()
     753            4 :             .next()
     754            4 :             .unwrap()
     755            4 :             .into_resident_layer();
     756            4 :         assert_eq!(
     757            4 :             delta_layer.layer_desc().key(),
     758            4 :             PersistentLayerKey {
     759            4 :                 key_range: get_key(0)..get_key(1),
     760            4 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     761            4 :                 is_delta: true
     762            4 :             }
     763            4 :         );
     764            4 :     }
     765              : }
        

Generated by: LCOV version 2.1-beta