LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: 903780b8ddc62f532be8f220102da7b91c63a235.info Lines: 96.1 % 597 574
Test Date: 2024-10-25 10:10:57 Functions: 92.9 % 70 65

            Line data    Source code
       1              : use std::{future::Future, ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
       9              : 
      10              : use super::layer::S3_UPLOAD_LIMIT;
      11              : use super::{
      12              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      13              : };
      14              : 
      15              : pub(crate) enum BatchWriterResult {
      16              :     Produced(ResidentLayer),
      17              :     Discarded(PersistentLayerKey),
      18              : }
      19              : 
      20              : #[cfg(test)]
      21              : impl BatchWriterResult {
      22           24 :     fn into_resident_layer(self) -> ResidentLayer {
      23           24 :         match self {
      24           24 :             BatchWriterResult::Produced(layer) => layer,
      25            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      26              :         }
      27           24 :     }
      28              : 
      29           16 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      30           16 :         match self {
      31            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      32           16 :             BatchWriterResult::Discarded(layer) => layer,
      33           16 :         }
      34           16 :     }
      35              : }
      36              : 
      37              : enum LayerWriterWrapper {
      38              :     Image(ImageLayerWriter),
      39              :     Delta(DeltaLayerWriter),
      40              : }
      41              : 
      42              : /// An layer writer that takes unfinished layers and finish them atomically.
      43              : #[must_use]
      44              : pub struct BatchLayerWriter {
      45              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      46              :     conf: &'static PageServerConf,
      47              : }
      48              : 
      49              : impl BatchLayerWriter {
      50           68 :     pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
      51           68 :         Ok(Self {
      52           68 :             generated_layer_writers: Vec::new(),
      53           68 :             conf,
      54           68 :         })
      55           68 :     }
      56              : 
      57           42 :     pub fn add_unfinished_image_writer(
      58           42 :         &mut self,
      59           42 :         writer: ImageLayerWriter,
      60           42 :         key_range: Range<Key>,
      61           42 :         lsn: Lsn,
      62           42 :     ) {
      63           42 :         self.generated_layer_writers.push((
      64           42 :             LayerWriterWrapper::Image(writer),
      65           42 :             PersistentLayerKey {
      66           42 :                 key_range,
      67           42 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      68           42 :                 is_delta: false,
      69           42 :             },
      70           42 :         ));
      71           42 :     }
      72              : 
      73           42 :     pub fn add_unfinished_delta_writer(
      74           42 :         &mut self,
      75           42 :         writer: DeltaLayerWriter,
      76           42 :         key_range: Range<Key>,
      77           42 :         lsn_range: Range<Lsn>,
      78           42 :     ) {
      79           42 :         self.generated_layer_writers.push((
      80           42 :             LayerWriterWrapper::Delta(writer),
      81           42 :             PersistentLayerKey {
      82           42 :                 key_range,
      83           42 :                 lsn_range,
      84           42 :                 is_delta: true,
      85           42 :             },
      86           42 :         ));
      87           42 :     }
      88              : 
      89           60 :     pub(crate) async fn finish_with_discard_fn<D, F>(
      90           60 :         self,
      91           60 :         tline: &Arc<Timeline>,
      92           60 :         ctx: &RequestContext,
      93           60 :         discard_fn: D,
      94           60 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
      95           60 :     where
      96           60 :         D: Fn(&PersistentLayerKey) -> F,
      97           60 :         F: Future<Output = bool>,
      98           60 :     {
      99           60 :         let Self {
     100           60 :             generated_layer_writers,
     101           60 :             ..
     102           60 :         } = self;
     103           60 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     104            0 :             for produced_layer in generated_layers {
     105            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     106            0 :                     let layer: Layer = resident_layer.into();
     107            0 :                     layer.delete_on_drop();
     108            0 :                 }
     109              :             }
     110            0 :         };
     111              :         // BEGIN: catch every error and do the recovery in the below section
     112           60 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     113          144 :         for (inner, layer_key) in generated_layer_writers {
     114           84 :             if discard_fn(&layer_key).await {
     115           32 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     116           32 :             } else {
     117           52 :                 let res = match inner {
     118           26 :                     LayerWriterWrapper::Delta(writer) => {
     119           68 :                         writer.finish(layer_key.key_range.end, ctx).await
     120              :                     }
     121           26 :                     LayerWriterWrapper::Image(writer) => {
     122           26 :                         writer
     123           26 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     124           52 :                             .await
     125              :                     }
     126              :                 };
     127           52 :                 let layer = match res {
     128           52 :                     Ok((desc, path)) => {
     129           52 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     130           52 :                             Ok(layer) => layer,
     131            0 :                             Err(e) => {
     132            0 :                                 tokio::fs::remove_file(&path).await.ok();
     133            0 :                                 clean_up_layers(generated_layers);
     134            0 :                                 return Err(e);
     135              :                             }
     136              :                         }
     137              :                     }
     138            0 :                     Err(e) => {
     139            0 :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     140            0 :                         // so we don't need to remove the layer we just failed to create by ourselves.
     141            0 :                         clean_up_layers(generated_layers);
     142            0 :                         return Err(e);
     143              :                     }
     144              :                 };
     145           52 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     146              :             }
     147              :         }
     148              :         // END: catch every error and do the recovery in the above section
     149           60 :         Ok(generated_layers)
     150           60 :     }
     151              : }
     152              : 
     153              : /// An image writer that takes images and produces multiple image layers.
     154              : #[must_use]
     155              : pub struct SplitImageLayerWriter {
     156              :     inner: ImageLayerWriter,
     157              :     target_layer_size: u64,
     158              :     lsn: Lsn,
     159              :     conf: &'static PageServerConf,
     160              :     timeline_id: TimelineId,
     161              :     tenant_shard_id: TenantShardId,
     162              :     batches: BatchLayerWriter,
     163              :     start_key: Key,
     164              : }
     165              : 
     166              : impl SplitImageLayerWriter {
     167           32 :     pub async fn new(
     168           32 :         conf: &'static PageServerConf,
     169           32 :         timeline_id: TimelineId,
     170           32 :         tenant_shard_id: TenantShardId,
     171           32 :         start_key: Key,
     172           32 :         lsn: Lsn,
     173           32 :         target_layer_size: u64,
     174           32 :         ctx: &RequestContext,
     175           32 :     ) -> anyhow::Result<Self> {
     176           32 :         Ok(Self {
     177           32 :             target_layer_size,
     178           32 :             inner: ImageLayerWriter::new(
     179           32 :                 conf,
     180           32 :                 timeline_id,
     181           32 :                 tenant_shard_id,
     182           32 :                 &(start_key..Key::MAX),
     183           32 :                 lsn,
     184           32 :                 ctx,
     185           32 :             )
     186           16 :             .await?,
     187           32 :             conf,
     188           32 :             timeline_id,
     189           32 :             tenant_shard_id,
     190           32 :             batches: BatchLayerWriter::new(conf).await?,
     191           32 :             lsn,
     192           32 :             start_key,
     193              :         })
     194           32 :     }
     195              : 
     196         8414 :     pub async fn put_image(
     197         8414 :         &mut self,
     198         8414 :         key: Key,
     199         8414 :         img: Bytes,
     200         8414 :         ctx: &RequestContext,
     201         8414 :     ) -> anyhow::Result<()> {
     202         8414 :         // The current estimation is an upper bound of the space that the key/image could take
     203         8414 :         // because we did not consider compression in this estimation. The resulting image layer
     204         8414 :         // could be smaller than the target size.
     205         8414 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     206         8414 :         if self.inner.num_keys() >= 1
     207         8382 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     208              :         {
     209           14 :             let next_image_writer = ImageLayerWriter::new(
     210           14 :                 self.conf,
     211           14 :                 self.timeline_id,
     212           14 :                 self.tenant_shard_id,
     213           14 :                 &(key..Key::MAX),
     214           14 :                 self.lsn,
     215           14 :                 ctx,
     216           14 :             )
     217            7 :             .await?;
     218           14 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     219           14 :             self.batches.add_unfinished_image_writer(
     220           14 :                 prev_image_writer,
     221           14 :                 self.start_key..key,
     222           14 :                 self.lsn,
     223           14 :             );
     224           14 :             self.start_key = key;
     225         8400 :         }
     226         8541 :         self.inner.put_image(key, img, ctx).await
     227         8414 :     }
     228              : 
     229           28 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     230           28 :         self,
     231           28 :         tline: &Arc<Timeline>,
     232           28 :         ctx: &RequestContext,
     233           28 :         end_key: Key,
     234           28 :         discard_fn: D,
     235           28 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     236           28 :     where
     237           28 :         D: Fn(&PersistentLayerKey) -> F,
     238           28 :         F: Future<Output = bool>,
     239           28 :     {
     240           28 :         let Self {
     241           28 :             mut batches, inner, ..
     242           28 :         } = self;
     243           28 :         if inner.num_keys() != 0 {
     244           28 :             batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     245           28 :         }
     246           52 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     247           28 :     }
     248              : 
     249              :     #[cfg(test)]
     250            4 :     pub(crate) async fn finish(
     251            4 :         self,
     252            4 :         tline: &Arc<Timeline>,
     253            4 :         ctx: &RequestContext,
     254            4 :         end_key: Key,
     255            4 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     256            6 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     257           12 :             .await
     258            4 :     }
     259              : }
     260              : 
     261              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     262              : ///
     263              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     264              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     265              : /// will split them into multiple files based on size.
     266              : #[must_use]
     267              : pub struct SplitDeltaLayerWriter {
     268              :     inner: Option<(Key, DeltaLayerWriter)>,
     269              :     target_layer_size: u64,
     270              :     conf: &'static PageServerConf,
     271              :     timeline_id: TimelineId,
     272              :     tenant_shard_id: TenantShardId,
     273              :     lsn_range: Range<Lsn>,
     274              :     last_key_written: Key,
     275              :     batches: BatchLayerWriter,
     276              : }
     277              : 
     278              : impl SplitDeltaLayerWriter {
     279           36 :     pub async fn new(
     280           36 :         conf: &'static PageServerConf,
     281           36 :         timeline_id: TimelineId,
     282           36 :         tenant_shard_id: TenantShardId,
     283           36 :         lsn_range: Range<Lsn>,
     284           36 :         target_layer_size: u64,
     285           36 :     ) -> anyhow::Result<Self> {
     286           36 :         Ok(Self {
     287           36 :             target_layer_size,
     288           36 :             inner: None,
     289           36 :             conf,
     290           36 :             timeline_id,
     291           36 :             tenant_shard_id,
     292           36 :             lsn_range,
     293           36 :             last_key_written: Key::MIN,
     294           36 :             batches: BatchLayerWriter::new(conf).await?,
     295              :         })
     296           36 :     }
     297              : 
     298        12134 :     pub async fn put_value(
     299        12134 :         &mut self,
     300        12134 :         key: Key,
     301        12134 :         lsn: Lsn,
     302        12134 :         val: Value,
     303        12134 :         ctx: &RequestContext,
     304        12134 :     ) -> anyhow::Result<()> {
     305        12134 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     306        12134 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     307        12134 :         //
     308        12134 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     309        12134 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     310        12134 : 
     311        12134 :         if self.inner.is_none() {
     312           32 :             self.inner = Some((
     313           32 :                 key,
     314           32 :                 DeltaLayerWriter::new(
     315           32 :                     self.conf,
     316           32 :                     self.timeline_id,
     317           32 :                     self.tenant_shard_id,
     318           32 :                     key,
     319           32 :                     self.lsn_range.clone(),
     320           32 :                     ctx,
     321           32 :                 )
     322           16 :                 .await?,
     323              :             ));
     324        12102 :         }
     325        12134 :         let (_, inner) = self.inner.as_mut().unwrap();
     326        12134 : 
     327        12134 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     328        12134 :         if inner.num_keys() >= 1
     329        12102 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     330              :         {
     331         2996 :             if key != self.last_key_written {
     332           14 :                 let next_delta_writer = DeltaLayerWriter::new(
     333           14 :                     self.conf,
     334           14 :                     self.timeline_id,
     335           14 :                     self.tenant_shard_id,
     336           14 :                     key,
     337           14 :                     self.lsn_range.clone(),
     338           14 :                     ctx,
     339           14 :                 )
     340            7 :                 .await?;
     341           14 :                 let (start_key, prev_delta_writer) =
     342           14 :                     std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
     343           14 :                 self.batches.add_unfinished_delta_writer(
     344           14 :                     prev_delta_writer,
     345           14 :                     start_key..key,
     346           14 :                     self.lsn_range.clone(),
     347           14 :                 );
     348         2982 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     349              :                 // We have to produce a very large file b/c a key is updated too often.
     350            0 :                 anyhow::bail!(
     351            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     352            0 :                     key,
     353            0 :                     inner.estimated_size()
     354            0 :                 );
     355         2982 :             }
     356         9138 :         }
     357        12134 :         self.last_key_written = key;
     358        12134 :         let (_, inner) = self.inner.as_mut().unwrap();
     359        12134 :         inner.put_value(key, lsn, val, ctx).await
     360        12134 :     }
     361              : 
     362           32 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     363           32 :         self,
     364           32 :         tline: &Arc<Timeline>,
     365           32 :         ctx: &RequestContext,
     366           32 :         discard_fn: D,
     367           32 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     368           32 :     where
     369           32 :         D: Fn(&PersistentLayerKey) -> F,
     370           32 :         F: Future<Output = bool>,
     371           32 :     {
     372           32 :         let Self {
     373           32 :             mut batches, inner, ..
     374           32 :         } = self;
     375           32 :         if let Some((start_key, writer)) = inner {
     376           28 :             if writer.num_keys() != 0 {
     377           28 :                 let end_key = self.last_key_written.next();
     378           28 :                 batches.add_unfinished_delta_writer(
     379           28 :                     writer,
     380           28 :                     start_key..end_key,
     381           28 :                     self.lsn_range.clone(),
     382           28 :                 );
     383           28 :             }
     384            4 :         }
     385           68 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     386           32 :     }
     387              : 
     388              :     #[cfg(test)]
     389            6 :     pub(crate) async fn finish(
     390            6 :         self,
     391            6 :         tline: &Arc<Timeline>,
     392            6 :         ctx: &RequestContext,
     393            6 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     394            8 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     395           23 :             .await
     396            6 :     }
     397              : }
     398              : 
     399              : #[cfg(test)]
     400              : mod tests {
     401              :     use itertools::Itertools;
     402              :     use rand::{RngCore, SeedableRng};
     403              : 
     404              :     use crate::{
     405              :         tenant::{
     406              :             harness::{TenantHarness, TIMELINE_ID},
     407              :             storage_layer::AsLayerDesc,
     408              :         },
     409              :         DEFAULT_PG_VERSION,
     410              :     };
     411              : 
     412              :     use super::*;
     413              : 
     414        20052 :     fn get_key(id: u32) -> Key {
     415        20052 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     416        20052 :         key.field6 = id;
     417        20052 :         key
     418        20052 :     }
     419              : 
     420            8 :     fn get_img(id: u32) -> Bytes {
     421            8 :         format!("{id:064}").into()
     422            8 :     }
     423              : 
     424        20004 :     fn get_large_img() -> Bytes {
     425        20004 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     426        20004 :         let mut data = vec![0; 8192];
     427        20004 :         rng.fill_bytes(&mut data);
     428        20004 :         data.into()
     429        20004 :     }
     430              : 
     431              :     #[tokio::test]
     432            2 :     async fn write_one_image() {
     433            2 :         let harness = TenantHarness::create("split_writer_write_one_image")
     434            2 :             .await
     435            2 :             .unwrap();
     436           10 :         let (tenant, ctx) = harness.load().await;
     437            2 : 
     438            2 :         let tline = tenant
     439            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     440            4 :             .await
     441            2 :             .unwrap();
     442            2 : 
     443            2 :         let mut image_writer = SplitImageLayerWriter::new(
     444            2 :             tenant.conf,
     445            2 :             tline.timeline_id,
     446            2 :             tenant.tenant_shard_id,
     447            2 :             get_key(0),
     448            2 :             Lsn(0x18),
     449            2 :             4 * 1024 * 1024,
     450            2 :             &ctx,
     451            2 :         )
     452            2 :         .await
     453            2 :         .unwrap();
     454            2 : 
     455            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     456            2 :             tenant.conf,
     457            2 :             tline.timeline_id,
     458            2 :             tenant.tenant_shard_id,
     459            2 :             Lsn(0x18)..Lsn(0x20),
     460            2 :             4 * 1024 * 1024,
     461            2 :         )
     462            2 :         .await
     463            2 :         .unwrap();
     464            2 : 
     465            2 :         image_writer
     466            2 :             .put_image(get_key(0), get_img(0), &ctx)
     467            2 :             .await
     468            2 :             .unwrap();
     469            2 :         let layers = image_writer
     470            2 :             .finish(&tline, &ctx, get_key(10))
     471            4 :             .await
     472            2 :             .unwrap();
     473            2 :         assert_eq!(layers.len(), 1);
     474            2 : 
     475            2 :         delta_writer
     476            2 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     477            2 :             .await
     478            2 :             .unwrap();
     479            5 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     480            2 :         assert_eq!(layers.len(), 1);
     481            2 :         assert_eq!(
     482            2 :             layers
     483            2 :                 .into_iter()
     484            2 :                 .next()
     485            2 :                 .unwrap()
     486            2 :                 .into_resident_layer()
     487            2 :                 .layer_desc()
     488            2 :                 .key(),
     489            2 :             PersistentLayerKey {
     490            2 :                 key_range: get_key(0)..get_key(1),
     491            2 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     492            2 :                 is_delta: true
     493            2 :             }
     494            2 :         );
     495            2 :     }
     496              : 
     497              :     #[tokio::test]
     498            2 :     async fn write_split() {
     499            2 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     500         4372 :         write_split_helper("split_writer_write_split", false).await;
     501            2 :     }
     502              : 
     503              :     #[tokio::test]
     504            2 :     async fn write_split_discard() {
     505            2 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     506         4336 :         write_split_helper("split_writer_write_split_discard", true).await;
     507            2 :     }
     508              : 
     509              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     510              :     /// set to true, all layers will be discarded.
     511            4 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     512            4 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     513           20 :         let (tenant, ctx) = harness.load().await;
     514              : 
     515            4 :         let tline = tenant
     516            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     517            8 :             .await
     518            4 :             .unwrap();
     519              : 
     520            4 :         let mut image_writer = SplitImageLayerWriter::new(
     521            4 :             tenant.conf,
     522            4 :             tline.timeline_id,
     523            4 :             tenant.tenant_shard_id,
     524            4 :             get_key(0),
     525            4 :             Lsn(0x18),
     526            4 :             4 * 1024 * 1024,
     527            4 :             &ctx,
     528            4 :         )
     529            2 :         .await
     530            4 :         .unwrap();
     531            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     532            4 :             tenant.conf,
     533            4 :             tline.timeline_id,
     534            4 :             tenant.tenant_shard_id,
     535            4 :             Lsn(0x18)..Lsn(0x20),
     536            4 :             4 * 1024 * 1024,
     537            4 :         )
     538            0 :         .await
     539            4 :         .unwrap();
     540              :         const N: usize = 2000;
     541         8004 :         for i in 0..N {
     542         8000 :             let i = i as u32;
     543         8000 :             image_writer
     544         8000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     545         8130 :                 .await
     546         8000 :                 .unwrap();
     547         8000 :             delta_writer
     548         8000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     549          512 :                 .await
     550         8000 :                 .unwrap();
     551              :         }
     552            4 :         let image_layers = image_writer
     553           16 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     554           16 :             .await
     555            4 :             .unwrap();
     556            4 :         let delta_layers = delta_writer
     557           16 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     558           20 :             .await
     559            4 :             .unwrap();
     560            4 :         let image_layers = image_layers
     561            4 :             .into_iter()
     562           16 :             .map(|x| {
     563           16 :                 if discard {
     564            8 :                     x.into_discarded_layer()
     565              :                 } else {
     566            8 :                     x.into_resident_layer().layer_desc().key()
     567              :                 }
     568           16 :             })
     569            4 :             .collect_vec();
     570            4 :         let delta_layers = delta_layers
     571            4 :             .into_iter()
     572           16 :             .map(|x| {
     573           16 :                 if discard {
     574            8 :                     x.into_discarded_layer()
     575              :                 } else {
     576            8 :                     x.into_resident_layer().layer_desc().key()
     577              :                 }
     578           16 :             })
     579            4 :             .collect_vec();
     580            4 :         assert_eq!(image_layers.len(), N / 512 + 1);
     581            4 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     582            4 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     583            4 :         assert_eq!(
     584            4 :             delta_layers.last().unwrap().key_range.end,
     585            4 :             get_key(N as u32)
     586            4 :         );
     587           16 :         for idx in 0..image_layers.len() {
     588           16 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     589           16 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     590           16 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     591           16 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     592           16 :             if idx > 0 {
     593           12 :                 assert_eq!(
     594           12 :                     image_layers[idx - 1].key_range.end,
     595           12 :                     image_layers[idx].key_range.start
     596           12 :                 );
     597           12 :                 assert_eq!(
     598           12 :                     delta_layers[idx - 1].key_range.end,
     599           12 :                     delta_layers[idx].key_range.start
     600           12 :                 );
     601            4 :             }
     602              :         }
     603            4 :     }
     604              : 
     605              :     #[tokio::test]
     606            2 :     async fn write_large_img() {
     607            2 :         let harness = TenantHarness::create("split_writer_write_large_img")
     608            2 :             .await
     609            2 :             .unwrap();
     610           10 :         let (tenant, ctx) = harness.load().await;
     611            2 : 
     612            2 :         let tline = tenant
     613            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     614            4 :             .await
     615            2 :             .unwrap();
     616            2 : 
     617            2 :         let mut image_writer = SplitImageLayerWriter::new(
     618            2 :             tenant.conf,
     619            2 :             tline.timeline_id,
     620            2 :             tenant.tenant_shard_id,
     621            2 :             get_key(0),
     622            2 :             Lsn(0x18),
     623            2 :             4 * 1024,
     624            2 :             &ctx,
     625            2 :         )
     626            2 :         .await
     627            2 :         .unwrap();
     628            2 : 
     629            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     630            2 :             tenant.conf,
     631            2 :             tline.timeline_id,
     632            2 :             tenant.tenant_shard_id,
     633            2 :             Lsn(0x18)..Lsn(0x20),
     634            2 :             4 * 1024,
     635            2 :         )
     636            2 :         .await
     637            2 :         .unwrap();
     638            2 : 
     639            2 :         image_writer
     640            2 :             .put_image(get_key(0), get_img(0), &ctx)
     641            2 :             .await
     642            2 :             .unwrap();
     643            2 :         image_writer
     644            2 :             .put_image(get_key(1), get_large_img(), &ctx)
     645            3 :             .await
     646            2 :             .unwrap();
     647            2 :         let layers = image_writer
     648            2 :             .finish(&tline, &ctx, get_key(10))
     649            8 :             .await
     650            2 :             .unwrap();
     651            2 :         assert_eq!(layers.len(), 2);
     652            2 : 
     653            2 :         delta_writer
     654            2 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     655            2 :             .await
     656            2 :             .unwrap();
     657            2 :         delta_writer
     658            2 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     659            2 :             .await
     660            2 :             .unwrap();
     661           10 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     662            2 :         assert_eq!(layers.len(), 2);
     663            2 :         let mut layers_iter = layers.into_iter();
     664            2 :         assert_eq!(
     665            2 :             layers_iter
     666            2 :                 .next()
     667            2 :                 .unwrap()
     668            2 :                 .into_resident_layer()
     669            2 :                 .layer_desc()
     670            2 :                 .key(),
     671            2 :             PersistentLayerKey {
     672            2 :                 key_range: get_key(0)..get_key(1),
     673            2 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     674            2 :                 is_delta: true
     675            2 :             }
     676            2 :         );
     677            2 :         assert_eq!(
     678            2 :             layers_iter
     679            2 :                 .next()
     680            2 :                 .unwrap()
     681            2 :                 .into_resident_layer()
     682            2 :                 .layer_desc()
     683            2 :                 .key(),
     684            2 :             PersistentLayerKey {
     685            2 :                 key_range: get_key(1)..get_key(2),
     686            2 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     687            2 :                 is_delta: true
     688            2 :             }
     689            2 :         );
     690            2 :     }
     691              : 
     692              :     #[tokio::test]
     693            2 :     async fn write_split_single_key() {
     694            2 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     695            2 :             .await
     696            2 :             .unwrap();
     697           10 :         let (tenant, ctx) = harness.load().await;
     698            2 : 
     699            2 :         let tline = tenant
     700            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     701            4 :             .await
     702            2 :             .unwrap();
     703            2 : 
     704            2 :         const N: usize = 2000;
     705            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     706            2 :             tenant.conf,
     707            2 :             tline.timeline_id,
     708            2 :             tenant.tenant_shard_id,
     709            2 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     710            2 :             4 * 1024 * 1024,
     711            2 :         )
     712            2 :         .await
     713            2 :         .unwrap();
     714            2 : 
     715         4002 :         for i in 0..N {
     716         4000 :             let i = i as u32;
     717         4000 :             delta_writer
     718         4000 :                 .put_value(
     719         4000 :                     get_key(0),
     720         4000 :                     Lsn(i as u64 * 16 + 0x10),
     721         4000 :                     Value::Image(get_large_img()),
     722         4000 :                     &ctx,
     723         4000 :                 )
     724          254 :                 .await
     725         4000 :                 .unwrap();
     726            2 :         }
     727            8 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     728            2 :         assert_eq!(delta_layers.len(), 1);
     729            2 :         let delta_layer = delta_layers
     730            2 :             .into_iter()
     731            2 :             .next()
     732            2 :             .unwrap()
     733            2 :             .into_resident_layer();
     734            2 :         assert_eq!(
     735            2 :             delta_layer.layer_desc().key(),
     736            2 :             PersistentLayerKey {
     737            2 :                 key_range: get_key(0)..get_key(1),
     738            2 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     739            2 :                 is_delta: true
     740            2 :             }
     741            2 :         );
     742            2 :     }
     743              : }
        

Generated by: LCOV version 2.1-beta