LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: f2bfe5dc5ab550768e936d6bc7b94d9b2e2d4cc9.info Lines: 96.3 % 597 575
Test Date: 2025-01-27 20:39:28 Functions: 92.9 % 70 65

            Line data    Source code
       1              : use std::{future::Future, ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
       9              : use pageserver_api::value::Value;
      10              : 
      11              : use super::layer::S3_UPLOAD_LIMIT;
      12              : use super::{
      13              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      14              : };
      15              : 
      16              : pub(crate) enum BatchWriterResult {
      17              :     Produced(ResidentLayer),
      18              :     Discarded(PersistentLayerKey),
      19              : }
      20              : 
      21              : #[cfg(test)]
      22              : impl BatchWriterResult {
      23           48 :     fn into_resident_layer(self) -> ResidentLayer {
      24           48 :         match self {
      25           48 :             BatchWriterResult::Produced(layer) => layer,
      26            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      27              :         }
      28           48 :     }
      29              : 
      30           32 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      31           32 :         match self {
      32            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      33           32 :             BatchWriterResult::Discarded(layer) => layer,
      34           32 :         }
      35           32 :     }
      36              : }
      37              : 
      38              : enum LayerWriterWrapper {
      39              :     Image(ImageLayerWriter),
      40              :     Delta(DeltaLayerWriter),
      41              : }
      42              : 
      43              : /// An layer writer that takes unfinished layers and finish them atomically.
      44              : #[must_use]
      45              : pub struct BatchLayerWriter {
      46              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      47              :     conf: &'static PageServerConf,
      48              : }
      49              : 
      50              : impl BatchLayerWriter {
      51          224 :     pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
      52          224 :         Ok(Self {
      53          224 :             generated_layer_writers: Vec::new(),
      54          224 :             conf,
      55          224 :         })
      56          224 :     }
      57              : 
      58          120 :     pub fn add_unfinished_image_writer(
      59          120 :         &mut self,
      60          120 :         writer: ImageLayerWriter,
      61          120 :         key_range: Range<Key>,
      62          120 :         lsn: Lsn,
      63          120 :     ) {
      64          120 :         self.generated_layer_writers.push((
      65          120 :             LayerWriterWrapper::Image(writer),
      66          120 :             PersistentLayerKey {
      67          120 :                 key_range,
      68          120 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      69          120 :                 is_delta: false,
      70          120 :             },
      71          120 :         ));
      72          120 :     }
      73              : 
      74          120 :     pub fn add_unfinished_delta_writer(
      75          120 :         &mut self,
      76          120 :         writer: DeltaLayerWriter,
      77          120 :         key_range: Range<Key>,
      78          120 :         lsn_range: Range<Lsn>,
      79          120 :     ) {
      80          120 :         self.generated_layer_writers.push((
      81          120 :             LayerWriterWrapper::Delta(writer),
      82          120 :             PersistentLayerKey {
      83          120 :                 key_range,
      84          120 :                 lsn_range,
      85          120 :                 is_delta: true,
      86          120 :             },
      87          120 :         ));
      88          120 :     }
      89              : 
      90          208 :     pub(crate) async fn finish_with_discard_fn<D, F>(
      91          208 :         self,
      92          208 :         tline: &Arc<Timeline>,
      93          208 :         ctx: &RequestContext,
      94          208 :         discard_fn: D,
      95          208 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
      96          208 :     where
      97          208 :         D: Fn(&PersistentLayerKey) -> F,
      98          208 :         F: Future<Output = bool>,
      99          208 :     {
     100          208 :         let Self {
     101          208 :             generated_layer_writers,
     102          208 :             ..
     103          208 :         } = self;
     104          208 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     105            0 :             for produced_layer in generated_layers {
     106            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     107            0 :                     let layer: Layer = resident_layer.into();
     108            0 :                     layer.delete_on_drop();
     109            0 :                 }
     110              :             }
     111            0 :         };
     112              :         // BEGIN: catch every error and do the recovery in the below section
     113          208 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     114          448 :         for (inner, layer_key) in generated_layer_writers {
     115          240 :             if discard_fn(&layer_key).await {
     116           76 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     117           76 :             } else {
     118          164 :                 let res = match inner {
     119           76 :                     LayerWriterWrapper::Delta(writer) => {
     120           76 :                         writer.finish(layer_key.key_range.end, ctx).await
     121              :                     }
     122           88 :                     LayerWriterWrapper::Image(writer) => {
     123           88 :                         writer
     124           88 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     125           88 :                             .await
     126              :                     }
     127              :                 };
     128          164 :                 let layer = match res {
     129          164 :                     Ok((desc, path)) => {
     130          164 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     131          164 :                             Ok(layer) => layer,
     132            0 :                             Err(e) => {
     133            0 :                                 tokio::fs::remove_file(&path).await.ok();
     134            0 :                                 clean_up_layers(generated_layers);
     135            0 :                                 return Err(e);
     136              :                             }
     137              :                         }
     138              :                     }
     139            0 :                     Err(e) => {
     140            0 :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     141            0 :                         // so we don't need to remove the layer we just failed to create by ourselves.
     142            0 :                         clean_up_layers(generated_layers);
     143            0 :                         return Err(e);
     144              :                     }
     145              :                 };
     146          164 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     147              :             }
     148              :         }
     149              :         // END: catch every error and do the recovery in the above section
     150          208 :         Ok(generated_layers)
     151          208 :     }
     152              : }
     153              : 
     154              : /// An image writer that takes images and produces multiple image layers.
     155              : #[must_use]
     156              : pub struct SplitImageLayerWriter {
     157              :     inner: ImageLayerWriter,
     158              :     target_layer_size: u64,
     159              :     lsn: Lsn,
     160              :     conf: &'static PageServerConf,
     161              :     timeline_id: TimelineId,
     162              :     tenant_shard_id: TenantShardId,
     163              :     batches: BatchLayerWriter,
     164              :     start_key: Key,
     165              : }
     166              : 
     167              : impl SplitImageLayerWriter {
     168          100 :     pub async fn new(
     169          100 :         conf: &'static PageServerConf,
     170          100 :         timeline_id: TimelineId,
     171          100 :         tenant_shard_id: TenantShardId,
     172          100 :         start_key: Key,
     173          100 :         lsn: Lsn,
     174          100 :         target_layer_size: u64,
     175          100 :         ctx: &RequestContext,
     176          100 :     ) -> anyhow::Result<Self> {
     177          100 :         Ok(Self {
     178          100 :             target_layer_size,
     179          100 :             inner: ImageLayerWriter::new(
     180          100 :                 conf,
     181          100 :                 timeline_id,
     182          100 :                 tenant_shard_id,
     183          100 :                 &(start_key..Key::MAX),
     184          100 :                 lsn,
     185          100 :                 ctx,
     186          100 :             )
     187          100 :             .await?,
     188          100 :             conf,
     189          100 :             timeline_id,
     190          100 :             tenant_shard_id,
     191          100 :             batches: BatchLayerWriter::new(conf).await?,
     192          100 :             lsn,
     193          100 :             start_key,
     194              :         })
     195          100 :     }
     196              : 
     197        17180 :     pub async fn put_image(
     198        17180 :         &mut self,
     199        17180 :         key: Key,
     200        17180 :         img: Bytes,
     201        17180 :         ctx: &RequestContext,
     202        17180 :     ) -> anyhow::Result<()> {
     203        17180 :         // The current estimation is an upper bound of the space that the key/image could take
     204        17180 :         // because we did not consider compression in this estimation. The resulting image layer
     205        17180 :         // could be smaller than the target size.
     206        17180 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     207        17180 :         if self.inner.num_keys() >= 1
     208        17080 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     209              :         {
     210           28 :             let next_image_writer = ImageLayerWriter::new(
     211           28 :                 self.conf,
     212           28 :                 self.timeline_id,
     213           28 :                 self.tenant_shard_id,
     214           28 :                 &(key..Key::MAX),
     215           28 :                 self.lsn,
     216           28 :                 ctx,
     217           28 :             )
     218           28 :             .await?;
     219           28 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     220           28 :             self.batches.add_unfinished_image_writer(
     221           28 :                 prev_image_writer,
     222           28 :                 self.start_key..key,
     223           28 :                 self.lsn,
     224           28 :             );
     225           28 :             self.start_key = key;
     226        17152 :         }
     227        17180 :         self.inner.put_image(key, img, ctx).await
     228        17180 :     }
     229              : 
     230           92 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     231           92 :         self,
     232           92 :         tline: &Arc<Timeline>,
     233           92 :         ctx: &RequestContext,
     234           92 :         end_key: Key,
     235           92 :         discard_fn: D,
     236           92 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     237           92 :     where
     238           92 :         D: Fn(&PersistentLayerKey) -> F,
     239           92 :         F: Future<Output = bool>,
     240           92 :     {
     241           92 :         let Self {
     242           92 :             mut batches, inner, ..
     243           92 :         } = self;
     244           92 :         if inner.num_keys() != 0 {
     245           92 :             batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     246           92 :         }
     247           92 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     248           92 :     }
     249              : 
     250              :     #[cfg(test)]
     251            8 :     pub(crate) async fn finish(
     252            8 :         self,
     253            8 :         tline: &Arc<Timeline>,
     254            8 :         ctx: &RequestContext,
     255            8 :         end_key: Key,
     256            8 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     257           12 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     258            8 :             .await
     259            8 :     }
     260              : }
     261              : 
     262              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     263              : ///
     264              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     265              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     266              : /// will split them into multiple files based on size.
     267              : #[must_use]
     268              : pub struct SplitDeltaLayerWriter {
     269              :     inner: Option<(Key, DeltaLayerWriter)>,
     270              :     target_layer_size: u64,
     271              :     conf: &'static PageServerConf,
     272              :     timeline_id: TimelineId,
     273              :     tenant_shard_id: TenantShardId,
     274              :     lsn_range: Range<Lsn>,
     275              :     last_key_written: Key,
     276              :     batches: BatchLayerWriter,
     277              : }
     278              : 
     279              : impl SplitDeltaLayerWriter {
     280          124 :     pub async fn new(
     281          124 :         conf: &'static PageServerConf,
     282          124 :         timeline_id: TimelineId,
     283          124 :         tenant_shard_id: TenantShardId,
     284          124 :         lsn_range: Range<Lsn>,
     285          124 :         target_layer_size: u64,
     286          124 :     ) -> anyhow::Result<Self> {
     287          124 :         Ok(Self {
     288          124 :             target_layer_size,
     289          124 :             inner: None,
     290          124 :             conf,
     291          124 :             timeline_id,
     292          124 :             tenant_shard_id,
     293          124 :             lsn_range,
     294          124 :             last_key_written: Key::MIN,
     295          124 :             batches: BatchLayerWriter::new(conf).await?,
     296              :         })
     297          124 :     }
     298              : 
     299        24416 :     pub async fn put_value(
     300        24416 :         &mut self,
     301        24416 :         key: Key,
     302        24416 :         lsn: Lsn,
     303        24416 :         val: Value,
     304        24416 :         ctx: &RequestContext,
     305        24416 :     ) -> anyhow::Result<()> {
     306        24416 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     307        24416 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     308        24416 :         //
     309        24416 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     310        24416 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     311        24416 : 
     312        24416 :         if self.inner.is_none() {
     313          100 :             self.inner = Some((
     314          100 :                 key,
     315          100 :                 DeltaLayerWriter::new(
     316          100 :                     self.conf,
     317          100 :                     self.timeline_id,
     318          100 :                     self.tenant_shard_id,
     319          100 :                     key,
     320          100 :                     self.lsn_range.clone(),
     321          100 :                     ctx,
     322          100 :                 )
     323          100 :                 .await?,
     324              :             ));
     325        24316 :         }
     326        24416 :         let (_, inner) = self.inner.as_mut().unwrap();
     327        24416 : 
     328        24416 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     329        24416 :         if inner.num_keys() >= 1
     330        24316 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     331              :         {
     332         5992 :             if key != self.last_key_written {
     333           28 :                 let next_delta_writer = DeltaLayerWriter::new(
     334           28 :                     self.conf,
     335           28 :                     self.timeline_id,
     336           28 :                     self.tenant_shard_id,
     337           28 :                     key,
     338           28 :                     self.lsn_range.clone(),
     339           28 :                     ctx,
     340           28 :                 )
     341           28 :                 .await?;
     342           28 :                 let (start_key, prev_delta_writer) =
     343           28 :                     std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
     344           28 :                 self.batches.add_unfinished_delta_writer(
     345           28 :                     prev_delta_writer,
     346           28 :                     start_key..key,
     347           28 :                     self.lsn_range.clone(),
     348           28 :                 );
     349         5964 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     350              :                 // We have to produce a very large file b/c a key is updated too often.
     351            0 :                 anyhow::bail!(
     352            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     353            0 :                     key,
     354            0 :                     inner.estimated_size()
     355            0 :                 );
     356         5964 :             }
     357        18424 :         }
     358        24416 :         self.last_key_written = key;
     359        24416 :         let (_, inner) = self.inner.as_mut().unwrap();
     360        24416 :         inner.put_value(key, lsn, val, ctx).await
     361        24416 :     }
     362              : 
     363          116 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     364          116 :         self,
     365          116 :         tline: &Arc<Timeline>,
     366          116 :         ctx: &RequestContext,
     367          116 :         discard_fn: D,
     368          116 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     369          116 :     where
     370          116 :         D: Fn(&PersistentLayerKey) -> F,
     371          116 :         F: Future<Output = bool>,
     372          116 :     {
     373          116 :         let Self {
     374          116 :             mut batches, inner, ..
     375          116 :         } = self;
     376          116 :         if let Some((start_key, writer)) = inner {
     377           92 :             if writer.num_keys() != 0 {
     378           92 :                 let end_key = self.last_key_written.next();
     379           92 :                 batches.add_unfinished_delta_writer(
     380           92 :                     writer,
     381           92 :                     start_key..end_key,
     382           92 :                     self.lsn_range.clone(),
     383           92 :                 );
     384           92 :             }
     385           24 :         }
     386          116 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     387          116 :     }
     388              : 
     389              :     #[cfg(test)]
     390           12 :     pub(crate) async fn finish(
     391           12 :         self,
     392           12 :         tline: &Arc<Timeline>,
     393           12 :         ctx: &RequestContext,
     394           12 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     395           16 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     396           12 :             .await
     397           12 :     }
     398              : }
     399              : 
     400              : #[cfg(test)]
     401              : mod tests {
     402              :     use itertools::Itertools;
     403              :     use rand::{RngCore, SeedableRng};
     404              : 
     405              :     use crate::{
     406              :         tenant::{
     407              :             harness::{TenantHarness, TIMELINE_ID},
     408              :             storage_layer::AsLayerDesc,
     409              :         },
     410              :         DEFAULT_PG_VERSION,
     411              :     };
     412              : 
     413              :     use super::*;
     414              : 
     415        40104 :     fn get_key(id: u32) -> Key {
     416        40104 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     417        40104 :         key.field6 = id;
     418        40104 :         key
     419        40104 :     }
     420              : 
     421           16 :     fn get_img(id: u32) -> Bytes {
     422           16 :         format!("{id:064}").into()
     423           16 :     }
     424              : 
     425        40008 :     fn get_large_img() -> Bytes {
     426        40008 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     427        40008 :         let mut data = vec![0; 8192];
     428        40008 :         rng.fill_bytes(&mut data);
     429        40008 :         data.into()
     430        40008 :     }
     431              : 
     432              :     #[tokio::test]
     433            4 :     async fn write_one_image() {
     434            4 :         let harness = TenantHarness::create("split_writer_write_one_image")
     435            4 :             .await
     436            4 :             .unwrap();
     437            4 :         let (tenant, ctx) = harness.load().await;
     438            4 : 
     439            4 :         let tline = tenant
     440            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     441            4 :             .await
     442            4 :             .unwrap();
     443            4 : 
     444            4 :         let mut image_writer = SplitImageLayerWriter::new(
     445            4 :             tenant.conf,
     446            4 :             tline.timeline_id,
     447            4 :             tenant.tenant_shard_id,
     448            4 :             get_key(0),
     449            4 :             Lsn(0x18),
     450            4 :             4 * 1024 * 1024,
     451            4 :             &ctx,
     452            4 :         )
     453            4 :         .await
     454            4 :         .unwrap();
     455            4 : 
     456            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     457            4 :             tenant.conf,
     458            4 :             tline.timeline_id,
     459            4 :             tenant.tenant_shard_id,
     460            4 :             Lsn(0x18)..Lsn(0x20),
     461            4 :             4 * 1024 * 1024,
     462            4 :         )
     463            4 :         .await
     464            4 :         .unwrap();
     465            4 : 
     466            4 :         image_writer
     467            4 :             .put_image(get_key(0), get_img(0), &ctx)
     468            4 :             .await
     469            4 :             .unwrap();
     470            4 :         let layers = image_writer
     471            4 :             .finish(&tline, &ctx, get_key(10))
     472            4 :             .await
     473            4 :             .unwrap();
     474            4 :         assert_eq!(layers.len(), 1);
     475            4 : 
     476            4 :         delta_writer
     477            4 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     478            4 :             .await
     479            4 :             .unwrap();
     480            4 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     481            4 :         assert_eq!(layers.len(), 1);
     482            4 :         assert_eq!(
     483            4 :             layers
     484            4 :                 .into_iter()
     485            4 :                 .next()
     486            4 :                 .unwrap()
     487            4 :                 .into_resident_layer()
     488            4 :                 .layer_desc()
     489            4 :                 .key(),
     490            4 :             PersistentLayerKey {
     491            4 :                 key_range: get_key(0)..get_key(1),
     492            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     493            4 :                 is_delta: true
     494            4 :             }
     495            4 :         );
     496            4 :     }
     497              : 
     498              :     #[tokio::test]
     499            4 :     async fn write_split() {
     500            4 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     501            4 :         write_split_helper("split_writer_write_split", false).await;
     502            4 :     }
     503              : 
     504              :     #[tokio::test]
     505            4 :     async fn write_split_discard() {
     506            4 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     507            4 :         write_split_helper("split_writer_write_split_discard", true).await;
     508            4 :     }
     509              : 
     510              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     511              :     /// set to true, all layers will be discarded.
     512            8 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     513            8 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     514            8 :         let (tenant, ctx) = harness.load().await;
     515              : 
     516            8 :         let tline = tenant
     517            8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     518            8 :             .await
     519            8 :             .unwrap();
     520              : 
     521            8 :         let mut image_writer = SplitImageLayerWriter::new(
     522            8 :             tenant.conf,
     523            8 :             tline.timeline_id,
     524            8 :             tenant.tenant_shard_id,
     525            8 :             get_key(0),
     526            8 :             Lsn(0x18),
     527            8 :             4 * 1024 * 1024,
     528            8 :             &ctx,
     529            8 :         )
     530            8 :         .await
     531            8 :         .unwrap();
     532            8 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     533            8 :             tenant.conf,
     534            8 :             tline.timeline_id,
     535            8 :             tenant.tenant_shard_id,
     536            8 :             Lsn(0x18)..Lsn(0x20),
     537            8 :             4 * 1024 * 1024,
     538            8 :         )
     539            8 :         .await
     540            8 :         .unwrap();
     541              :         const N: usize = 2000;
     542        16008 :         for i in 0..N {
     543        16000 :             let i = i as u32;
     544        16000 :             image_writer
     545        16000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     546        16000 :                 .await
     547        16000 :                 .unwrap();
     548        16000 :             delta_writer
     549        16000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     550        16000 :                 .await
     551        16000 :                 .unwrap();
     552              :         }
     553            8 :         let image_layers = image_writer
     554           32 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     555            8 :             .await
     556            8 :             .unwrap();
     557            8 :         let delta_layers = delta_writer
     558           32 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     559            8 :             .await
     560            8 :             .unwrap();
     561            8 :         let image_layers = image_layers
     562            8 :             .into_iter()
     563           32 :             .map(|x| {
     564           32 :                 if discard {
     565           16 :                     x.into_discarded_layer()
     566              :                 } else {
     567           16 :                     x.into_resident_layer().layer_desc().key()
     568              :                 }
     569           32 :             })
     570            8 :             .collect_vec();
     571            8 :         let delta_layers = delta_layers
     572            8 :             .into_iter()
     573           32 :             .map(|x| {
     574           32 :                 if discard {
     575           16 :                     x.into_discarded_layer()
     576              :                 } else {
     577           16 :                     x.into_resident_layer().layer_desc().key()
     578              :                 }
     579           32 :             })
     580            8 :             .collect_vec();
     581            8 :         assert_eq!(image_layers.len(), N / 512 + 1);
     582            8 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     583            8 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     584            8 :         assert_eq!(
     585            8 :             delta_layers.last().unwrap().key_range.end,
     586            8 :             get_key(N as u32)
     587            8 :         );
     588           32 :         for idx in 0..image_layers.len() {
     589           32 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     590           32 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     591           32 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     592           32 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     593           32 :             if idx > 0 {
     594           24 :                 assert_eq!(
     595           24 :                     image_layers[idx - 1].key_range.end,
     596           24 :                     image_layers[idx].key_range.start
     597           24 :                 );
     598           24 :                 assert_eq!(
     599           24 :                     delta_layers[idx - 1].key_range.end,
     600           24 :                     delta_layers[idx].key_range.start
     601           24 :                 );
     602            8 :             }
     603              :         }
     604            8 :     }
     605              : 
     606              :     #[tokio::test]
     607            4 :     async fn write_large_img() {
     608            4 :         let harness = TenantHarness::create("split_writer_write_large_img")
     609            4 :             .await
     610            4 :             .unwrap();
     611            4 :         let (tenant, ctx) = harness.load().await;
     612            4 : 
     613            4 :         let tline = tenant
     614            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     615            4 :             .await
     616            4 :             .unwrap();
     617            4 : 
     618            4 :         let mut image_writer = SplitImageLayerWriter::new(
     619            4 :             tenant.conf,
     620            4 :             tline.timeline_id,
     621            4 :             tenant.tenant_shard_id,
     622            4 :             get_key(0),
     623            4 :             Lsn(0x18),
     624            4 :             4 * 1024,
     625            4 :             &ctx,
     626            4 :         )
     627            4 :         .await
     628            4 :         .unwrap();
     629            4 : 
     630            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     631            4 :             tenant.conf,
     632            4 :             tline.timeline_id,
     633            4 :             tenant.tenant_shard_id,
     634            4 :             Lsn(0x18)..Lsn(0x20),
     635            4 :             4 * 1024,
     636            4 :         )
     637            4 :         .await
     638            4 :         .unwrap();
     639            4 : 
     640            4 :         image_writer
     641            4 :             .put_image(get_key(0), get_img(0), &ctx)
     642            4 :             .await
     643            4 :             .unwrap();
     644            4 :         image_writer
     645            4 :             .put_image(get_key(1), get_large_img(), &ctx)
     646            4 :             .await
     647            4 :             .unwrap();
     648            4 :         let layers = image_writer
     649            4 :             .finish(&tline, &ctx, get_key(10))
     650            4 :             .await
     651            4 :             .unwrap();
     652            4 :         assert_eq!(layers.len(), 2);
     653            4 : 
     654            4 :         delta_writer
     655            4 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     656            4 :             .await
     657            4 :             .unwrap();
     658            4 :         delta_writer
     659            4 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     660            4 :             .await
     661            4 :             .unwrap();
     662            4 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     663            4 :         assert_eq!(layers.len(), 2);
     664            4 :         let mut layers_iter = layers.into_iter();
     665            4 :         assert_eq!(
     666            4 :             layers_iter
     667            4 :                 .next()
     668            4 :                 .unwrap()
     669            4 :                 .into_resident_layer()
     670            4 :                 .layer_desc()
     671            4 :                 .key(),
     672            4 :             PersistentLayerKey {
     673            4 :                 key_range: get_key(0)..get_key(1),
     674            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     675            4 :                 is_delta: true
     676            4 :             }
     677            4 :         );
     678            4 :         assert_eq!(
     679            4 :             layers_iter
     680            4 :                 .next()
     681            4 :                 .unwrap()
     682            4 :                 .into_resident_layer()
     683            4 :                 .layer_desc()
     684            4 :                 .key(),
     685            4 :             PersistentLayerKey {
     686            4 :                 key_range: get_key(1)..get_key(2),
     687            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     688            4 :                 is_delta: true
     689            4 :             }
     690            4 :         );
     691            4 :     }
     692              : 
     693              :     #[tokio::test]
     694            4 :     async fn write_split_single_key() {
     695            4 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     696            4 :             .await
     697            4 :             .unwrap();
     698            4 :         let (tenant, ctx) = harness.load().await;
     699            4 : 
     700            4 :         let tline = tenant
     701            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     702            4 :             .await
     703            4 :             .unwrap();
     704            4 : 
     705            4 :         const N: usize = 2000;
     706            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     707            4 :             tenant.conf,
     708            4 :             tline.timeline_id,
     709            4 :             tenant.tenant_shard_id,
     710            4 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     711            4 :             4 * 1024 * 1024,
     712            4 :         )
     713            4 :         .await
     714            4 :         .unwrap();
     715            4 : 
     716         8004 :         for i in 0..N {
     717         8000 :             let i = i as u32;
     718         8000 :             delta_writer
     719         8000 :                 .put_value(
     720         8000 :                     get_key(0),
     721         8000 :                     Lsn(i as u64 * 16 + 0x10),
     722         8000 :                     Value::Image(get_large_img()),
     723         8000 :                     &ctx,
     724         8000 :                 )
     725         8000 :                 .await
     726         8000 :                 .unwrap();
     727            4 :         }
     728            4 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     729            4 :         assert_eq!(delta_layers.len(), 1);
     730            4 :         let delta_layer = delta_layers
     731            4 :             .into_iter()
     732            4 :             .next()
     733            4 :             .unwrap()
     734            4 :             .into_resident_layer();
     735            4 :         assert_eq!(
     736            4 :             delta_layer.layer_desc().key(),
     737            4 :             PersistentLayerKey {
     738            4 :                 key_range: get_key(0)..get_key(1),
     739            4 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     740            4 :                 is_delta: true
     741            4 :             }
     742            4 :         );
     743            4 :     }
     744              : }
        

Generated by: LCOV version 2.1-beta