LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: 6c6fe25ecc82be7eef3e957667d85acf2b969737.info Lines: 96.1 % 645 620
Test Date: 2025-04-16 17:36:39 Functions: 91.0 % 78 71

            Line data    Source code
       1              : use std::future::Future;
       2              : use std::ops::Range;
       3              : use std::sync::Arc;
       4              : 
       5              : use bytes::Bytes;
       6              : use pageserver_api::key::{KEY_SIZE, Key};
       7              : use pageserver_api::value::Value;
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::TimelineId;
      10              : use utils::lsn::Lsn;
      11              : use utils::shard::TenantShardId;
      12              : 
      13              : use super::layer::S3_UPLOAD_LIMIT;
      14              : use super::{
      15              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      16              : };
      17              : use crate::config::PageServerConf;
      18              : use crate::context::RequestContext;
      19              : use crate::tenant::Timeline;
      20              : use crate::tenant::storage_layer::Layer;
      21              : 
      22              : pub(crate) enum BatchWriterResult {
      23              :     Produced(ResidentLayer),
      24              :     Discarded(PersistentLayerKey),
      25              : }
      26              : 
      27              : #[cfg(test)]
      28              : impl BatchWriterResult {
      29           48 :     fn into_resident_layer(self) -> ResidentLayer {
      30           48 :         match self {
      31           48 :             BatchWriterResult::Produced(layer) => layer,
      32            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      33              :         }
      34           48 :     }
      35              : 
      36           32 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      37           32 :         match self {
      38            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      39           32 :             BatchWriterResult::Discarded(layer) => layer,
      40           32 :         }
      41           32 :     }
      42              : }
      43              : 
      44              : enum LayerWriterWrapper {
      45              :     Image(ImageLayerWriter),
      46              :     Delta(DeltaLayerWriter),
      47              : }
      48              : 
      49              : /// An layer writer that takes unfinished layers and finish them atomically.
      50              : #[must_use]
      51              : pub struct BatchLayerWriter {
      52              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      53              :     conf: &'static PageServerConf,
      54              : }
      55              : 
      56              : impl BatchLayerWriter {
      57         1392 :     pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
      58         1392 :         Ok(Self {
      59         1392 :             generated_layer_writers: Vec::new(),
      60         1392 :             conf,
      61         1392 :         })
      62         1392 :     }
      63              : 
      64          628 :     pub fn add_unfinished_image_writer(
      65          628 :         &mut self,
      66          628 :         writer: ImageLayerWriter,
      67          628 :         key_range: Range<Key>,
      68          628 :         lsn: Lsn,
      69          628 :     ) {
      70          628 :         self.generated_layer_writers.push((
      71          628 :             LayerWriterWrapper::Image(writer),
      72          628 :             PersistentLayerKey {
      73          628 :                 key_range,
      74          628 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      75          628 :                 is_delta: false,
      76          628 :             },
      77          628 :         ));
      78          628 :     }
      79              : 
      80          120 :     pub fn add_unfinished_delta_writer(
      81          120 :         &mut self,
      82          120 :         writer: DeltaLayerWriter,
      83          120 :         key_range: Range<Key>,
      84          120 :         lsn_range: Range<Lsn>,
      85          120 :     ) {
      86          120 :         self.generated_layer_writers.push((
      87          120 :             LayerWriterWrapper::Delta(writer),
      88          120 :             PersistentLayerKey {
      89          120 :                 key_range,
      90          120 :                 lsn_range,
      91          120 :                 is_delta: true,
      92          120 :             },
      93          120 :         ));
      94          120 :     }
      95              : 
      96         1160 :     pub(crate) async fn finish(
      97         1160 :         self,
      98         1160 :         tline: &Arc<Timeline>,
      99         1160 :         ctx: &RequestContext,
     100         1160 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
     101         1160 :         let res = self
     102         1160 :             .finish_with_discard_fn(tline, ctx, |_| async { false })
     103         1160 :             .await?;
     104         1160 :         let mut output = Vec::new();
     105         1668 :         for r in res {
     106          508 :             if let BatchWriterResult::Produced(layer) = r {
     107          508 :                 output.push(layer);
     108          508 :             }
     109              :         }
     110         1160 :         Ok(output)
     111         1160 :     }
     112              : 
     113         1368 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     114         1368 :         self,
     115         1368 :         tline: &Arc<Timeline>,
     116         1368 :         ctx: &RequestContext,
     117         1368 :         discard_fn: D,
     118         1368 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     119         1368 :     where
     120         1368 :         D: Fn(&PersistentLayerKey) -> F,
     121         1368 :         F: Future<Output = bool>,
     122         1368 :     {
     123         1368 :         let Self {
     124         1368 :             generated_layer_writers,
     125         1368 :             ..
     126         1368 :         } = self;
     127         1368 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     128            0 :             for produced_layer in generated_layers {
     129            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     130            0 :                     let layer: Layer = resident_layer.into();
     131            0 :                     layer.delete_on_drop();
     132            0 :                 }
     133              :             }
     134            0 :         };
     135              :         // BEGIN: catch every error and do the recovery in the below section
     136         1368 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     137         2116 :         for (inner, layer_key) in generated_layer_writers {
     138          748 :             if discard_fn(&layer_key).await {
     139           76 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     140           76 :             } else {
     141          672 :                 let res = match inner {
     142           76 :                     LayerWriterWrapper::Delta(writer) => {
     143           76 :                         writer.finish(layer_key.key_range.end, ctx).await
     144              :                     }
     145          596 :                     LayerWriterWrapper::Image(writer) => {
     146          596 :                         writer
     147          596 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     148          596 :                             .await
     149              :                     }
     150              :                 };
     151          672 :                 let layer = match res {
     152          672 :                     Ok((desc, path)) => {
     153          672 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     154          672 :                             Ok(layer) => layer,
     155            0 :                             Err(e) => {
     156            0 :                                 tokio::fs::remove_file(&path).await.ok();
     157            0 :                                 clean_up_layers(generated_layers);
     158            0 :                                 return Err(e);
     159              :                             }
     160              :                         }
     161              :                     }
     162            0 :                     Err(e) => {
     163            0 :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     164            0 :                         // so we don't need to remove the layer we just failed to create by ourselves.
     165            0 :                         clean_up_layers(generated_layers);
     166            0 :                         return Err(e);
     167              :                     }
     168              :                 };
     169          672 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     170              :             }
     171              :         }
     172              :         // END: catch every error and do the recovery in the above section
     173         1368 :         Ok(generated_layers)
     174         1368 :     }
     175              : 
     176            0 :     pub fn pending_layer_num(&self) -> usize {
     177            0 :         self.generated_layer_writers.len()
     178            0 :     }
     179              : }
     180              : 
     181              : /// An image writer that takes images and produces multiple image layers.
     182              : #[must_use]
     183              : pub struct SplitImageLayerWriter<'a> {
     184              :     inner: ImageLayerWriter,
     185              :     target_layer_size: u64,
     186              :     lsn: Lsn,
     187              :     conf: &'static PageServerConf,
     188              :     timeline_id: TimelineId,
     189              :     tenant_shard_id: TenantShardId,
     190              :     batches: BatchLayerWriter,
     191              :     start_key: Key,
     192              :     gate: &'a utils::sync::gate::Gate,
     193              :     cancel: CancellationToken,
     194              : }
     195              : 
     196              : impl<'a> SplitImageLayerWriter<'a> {
     197              :     #[allow(clippy::too_many_arguments)]
     198          104 :     pub async fn new(
     199          104 :         conf: &'static PageServerConf,
     200          104 :         timeline_id: TimelineId,
     201          104 :         tenant_shard_id: TenantShardId,
     202          104 :         start_key: Key,
     203          104 :         lsn: Lsn,
     204          104 :         target_layer_size: u64,
     205          104 :         gate: &'a utils::sync::gate::Gate,
     206          104 :         cancel: CancellationToken,
     207          104 :         ctx: &RequestContext,
     208          104 :     ) -> anyhow::Result<Self> {
     209          104 :         Ok(Self {
     210          104 :             target_layer_size,
     211          104 :             inner: ImageLayerWriter::new(
     212          104 :                 conf,
     213          104 :                 timeline_id,
     214          104 :                 tenant_shard_id,
     215          104 :                 &(start_key..Key::MAX),
     216          104 :                 lsn,
     217          104 :                 gate,
     218          104 :                 cancel.clone(),
     219          104 :                 ctx,
     220          104 :             )
     221          104 :             .await?,
     222          104 :             conf,
     223          104 :             timeline_id,
     224          104 :             tenant_shard_id,
     225          104 :             batches: BatchLayerWriter::new(conf).await?,
     226          104 :             lsn,
     227          104 :             start_key,
     228          104 :             gate,
     229          104 :             cancel,
     230              :         })
     231          104 :     }
     232              : 
     233        17212 :     pub async fn put_image(
     234        17212 :         &mut self,
     235        17212 :         key: Key,
     236        17212 :         img: Bytes,
     237        17212 :         ctx: &RequestContext,
     238        17212 :     ) -> anyhow::Result<()> {
     239        17212 :         // The current estimation is an upper bound of the space that the key/image could take
     240        17212 :         // because we did not consider compression in this estimation. The resulting image layer
     241        17212 :         // could be smaller than the target size.
     242        17212 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     243        17212 :         if self.inner.num_keys() >= 1
     244        17108 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     245              :         {
     246           28 :             let next_image_writer = ImageLayerWriter::new(
     247           28 :                 self.conf,
     248           28 :                 self.timeline_id,
     249           28 :                 self.tenant_shard_id,
     250           28 :                 &(key..Key::MAX),
     251           28 :                 self.lsn,
     252           28 :                 self.gate,
     253           28 :                 self.cancel.clone(),
     254           28 :                 ctx,
     255           28 :             )
     256           28 :             .await?;
     257           28 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     258           28 :             self.batches.add_unfinished_image_writer(
     259           28 :                 prev_image_writer,
     260           28 :                 self.start_key..key,
     261           28 :                 self.lsn,
     262           28 :             );
     263           28 :             self.start_key = key;
     264        17184 :         }
     265        17212 :         self.inner.put_image(key, img, ctx).await
     266        17212 :     }
     267              : 
     268           92 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     269           92 :         self,
     270           92 :         tline: &Arc<Timeline>,
     271           92 :         ctx: &RequestContext,
     272           92 :         end_key: Key,
     273           92 :         discard_fn: D,
     274           92 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     275           92 :     where
     276           92 :         D: Fn(&PersistentLayerKey) -> F,
     277           92 :         F: Future<Output = bool>,
     278           92 :     {
     279           92 :         let Self {
     280           92 :             mut batches, inner, ..
     281           92 :         } = self;
     282           92 :         if inner.num_keys() != 0 {
     283           92 :             batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     284           92 :         }
     285           92 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     286           92 :     }
     287              : 
     288              :     #[cfg(test)]
     289            8 :     pub(crate) async fn finish(
     290            8 :         self,
     291            8 :         tline: &Arc<Timeline>,
     292            8 :         ctx: &RequestContext,
     293            8 :         end_key: Key,
     294            8 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     295           12 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     296            8 :             .await
     297            8 :     }
     298              : }
     299              : 
     300              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     301              : ///
     302              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     303              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     304              : /// will split them into multiple files based on size.
     305              : #[must_use]
     306              : pub struct SplitDeltaLayerWriter<'a> {
     307              :     inner: Option<(Key, DeltaLayerWriter)>,
     308              :     target_layer_size: u64,
     309              :     conf: &'static PageServerConf,
     310              :     timeline_id: TimelineId,
     311              :     tenant_shard_id: TenantShardId,
     312              :     lsn_range: Range<Lsn>,
     313              :     last_key_written: Key,
     314              :     batches: BatchLayerWriter,
     315              :     gate: &'a utils::sync::gate::Gate,
     316              :     cancel: CancellationToken,
     317              : }
     318              : 
     319              : impl<'a> SplitDeltaLayerWriter<'a> {
     320          128 :     pub async fn new(
     321          128 :         conf: &'static PageServerConf,
     322          128 :         timeline_id: TimelineId,
     323          128 :         tenant_shard_id: TenantShardId,
     324          128 :         lsn_range: Range<Lsn>,
     325          128 :         target_layer_size: u64,
     326          128 :         gate: &'a utils::sync::gate::Gate,
     327          128 :         cancel: CancellationToken,
     328          128 :     ) -> anyhow::Result<Self> {
     329          128 :         Ok(Self {
     330          128 :             target_layer_size,
     331          128 :             inner: None,
     332          128 :             conf,
     333          128 :             timeline_id,
     334          128 :             tenant_shard_id,
     335          128 :             lsn_range,
     336          128 :             last_key_written: Key::MIN,
     337          128 :             batches: BatchLayerWriter::new(conf).await?,
     338          128 :             gate,
     339          128 :             cancel,
     340              :         })
     341          128 :     }
     342              : 
     343        24416 :     pub async fn put_value(
     344        24416 :         &mut self,
     345        24416 :         key: Key,
     346        24416 :         lsn: Lsn,
     347        24416 :         val: Value,
     348        24416 :         ctx: &RequestContext,
     349        24416 :     ) -> anyhow::Result<()> {
     350        24416 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     351        24416 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     352        24416 :         //
     353        24416 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     354        24416 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     355        24416 : 
     356        24416 :         if self.inner.is_none() {
     357          100 :             self.inner = Some((
     358          100 :                 key,
     359          100 :                 DeltaLayerWriter::new(
     360          100 :                     self.conf,
     361          100 :                     self.timeline_id,
     362          100 :                     self.tenant_shard_id,
     363          100 :                     key,
     364          100 :                     self.lsn_range.clone(),
     365          100 :                     self.gate,
     366          100 :                     self.cancel.clone(),
     367          100 :                     ctx,
     368          100 :                 )
     369          100 :                 .await?,
     370              :             ));
     371        24316 :         }
     372        24416 :         let (_, inner) = self.inner.as_mut().unwrap();
     373        24416 : 
     374        24416 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     375        24416 :         if inner.num_keys() >= 1
     376        24316 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     377              :         {
     378         5992 :             if key != self.last_key_written {
     379           28 :                 let next_delta_writer = DeltaLayerWriter::new(
     380           28 :                     self.conf,
     381           28 :                     self.timeline_id,
     382           28 :                     self.tenant_shard_id,
     383           28 :                     key,
     384           28 :                     self.lsn_range.clone(),
     385           28 :                     self.gate,
     386           28 :                     self.cancel.clone(),
     387           28 :                     ctx,
     388           28 :                 )
     389           28 :                 .await?;
     390           28 :                 let (start_key, prev_delta_writer) =
     391           28 :                     self.inner.replace((key, next_delta_writer)).unwrap();
     392           28 :                 self.batches.add_unfinished_delta_writer(
     393           28 :                     prev_delta_writer,
     394           28 :                     start_key..key,
     395           28 :                     self.lsn_range.clone(),
     396           28 :                 );
     397         5964 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     398              :                 // We have to produce a very large file b/c a key is updated too often.
     399            0 :                 anyhow::bail!(
     400            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     401            0 :                     key,
     402            0 :                     inner.estimated_size()
     403            0 :                 );
     404         5964 :             }
     405        18424 :         }
     406        24416 :         self.last_key_written = key;
     407        24416 :         let (_, inner) = self.inner.as_mut().unwrap();
     408        24416 :         inner.put_value(key, lsn, val, ctx).await
     409        24416 :     }
     410              : 
     411          116 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     412          116 :         self,
     413          116 :         tline: &Arc<Timeline>,
     414          116 :         ctx: &RequestContext,
     415          116 :         discard_fn: D,
     416          116 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     417          116 :     where
     418          116 :         D: Fn(&PersistentLayerKey) -> F,
     419          116 :         F: Future<Output = bool>,
     420          116 :     {
     421          116 :         let Self {
     422          116 :             mut batches, inner, ..
     423          116 :         } = self;
     424          116 :         if let Some((start_key, writer)) = inner {
     425           92 :             if writer.num_keys() != 0 {
     426           92 :                 let end_key = self.last_key_written.next();
     427           92 :                 batches.add_unfinished_delta_writer(
     428           92 :                     writer,
     429           92 :                     start_key..end_key,
     430           92 :                     self.lsn_range.clone(),
     431           92 :                 );
     432           92 :             }
     433           24 :         }
     434          116 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     435          116 :     }
     436              : 
     437              :     #[cfg(test)]
     438           12 :     pub(crate) async fn finish(
     439           12 :         self,
     440           12 :         tline: &Arc<Timeline>,
     441           12 :         ctx: &RequestContext,
     442           12 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     443           16 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     444           12 :             .await
     445           12 :     }
     446              : }
     447              : 
     448              : #[cfg(test)]
     449              : mod tests {
     450              :     use itertools::Itertools;
     451              :     use rand::{RngCore, SeedableRng};
     452              : 
     453              :     use super::*;
     454              :     use crate::DEFAULT_PG_VERSION;
     455              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     456              :     use crate::tenant::storage_layer::AsLayerDesc;
     457              : 
     458        40104 :     fn get_key(id: u32) -> Key {
     459        40104 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     460        40104 :         key.field6 = id;
     461        40104 :         key
     462        40104 :     }
     463              : 
     464           16 :     fn get_img(id: u32) -> Bytes {
     465           16 :         format!("{id:064}").into()
     466           16 :     }
     467              : 
     468        40008 :     fn get_large_img() -> Bytes {
     469        40008 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     470        40008 :         let mut data = vec![0; 8192];
     471        40008 :         rng.fill_bytes(&mut data);
     472        40008 :         data.into()
     473        40008 :     }
     474              : 
     475              :     #[tokio::test]
     476            4 :     async fn write_one_image() {
     477            4 :         let harness = TenantHarness::create("split_writer_write_one_image")
     478            4 :             .await
     479            4 :             .unwrap();
     480            4 :         let (tenant, ctx) = harness.load().await;
     481            4 : 
     482            4 :         let tline = tenant
     483            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     484            4 :             .await
     485            4 :             .unwrap();
     486            4 : 
     487            4 :         let mut image_writer = SplitImageLayerWriter::new(
     488            4 :             tenant.conf,
     489            4 :             tline.timeline_id,
     490            4 :             tenant.tenant_shard_id,
     491            4 :             get_key(0),
     492            4 :             Lsn(0x18),
     493            4 :             4 * 1024 * 1024,
     494            4 :             &tline.gate,
     495            4 :             tline.cancel.clone(),
     496            4 :             &ctx,
     497            4 :         )
     498            4 :         .await
     499            4 :         .unwrap();
     500            4 : 
     501            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     502            4 :             tenant.conf,
     503            4 :             tline.timeline_id,
     504            4 :             tenant.tenant_shard_id,
     505            4 :             Lsn(0x18)..Lsn(0x20),
     506            4 :             4 * 1024 * 1024,
     507            4 :             &tline.gate,
     508            4 :             tline.cancel.clone(),
     509            4 :         )
     510            4 :         .await
     511            4 :         .unwrap();
     512            4 : 
     513            4 :         image_writer
     514            4 :             .put_image(get_key(0), get_img(0), &ctx)
     515            4 :             .await
     516            4 :             .unwrap();
     517            4 :         let layers = image_writer
     518            4 :             .finish(&tline, &ctx, get_key(10))
     519            4 :             .await
     520            4 :             .unwrap();
     521            4 :         assert_eq!(layers.len(), 1);
     522            4 : 
     523            4 :         delta_writer
     524            4 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     525            4 :             .await
     526            4 :             .unwrap();
     527            4 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     528            4 :         assert_eq!(layers.len(), 1);
     529            4 :         assert_eq!(
     530            4 :             layers
     531            4 :                 .into_iter()
     532            4 :                 .next()
     533            4 :                 .unwrap()
     534            4 :                 .into_resident_layer()
     535            4 :                 .layer_desc()
     536            4 :                 .key(),
     537            4 :             PersistentLayerKey {
     538            4 :                 key_range: get_key(0)..get_key(1),
     539            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     540            4 :                 is_delta: true
     541            4 :             }
     542            4 :         );
     543            4 :     }
     544              : 
     545              :     #[tokio::test]
     546            4 :     async fn write_split() {
     547            4 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     548            4 :         write_split_helper("split_writer_write_split", false).await;
     549            4 :     }
     550              : 
     551              :     #[tokio::test]
     552            4 :     async fn write_split_discard() {
     553            4 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     554            4 :         write_split_helper("split_writer_write_split_discard", true).await;
     555            4 :     }
     556              : 
     557              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     558              :     /// set to true, all layers will be discarded.
     559            8 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     560            8 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     561            8 :         let (tenant, ctx) = harness.load().await;
     562              : 
     563            8 :         let tline = tenant
     564            8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     565            8 :             .await
     566            8 :             .unwrap();
     567              : 
     568            8 :         let mut image_writer = SplitImageLayerWriter::new(
     569            8 :             tenant.conf,
     570            8 :             tline.timeline_id,
     571            8 :             tenant.tenant_shard_id,
     572            8 :             get_key(0),
     573            8 :             Lsn(0x18),
     574            8 :             4 * 1024 * 1024,
     575            8 :             &tline.gate,
     576            8 :             tline.cancel.clone(),
     577            8 :             &ctx,
     578            8 :         )
     579            8 :         .await
     580            8 :         .unwrap();
     581            8 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     582            8 :             tenant.conf,
     583            8 :             tline.timeline_id,
     584            8 :             tenant.tenant_shard_id,
     585            8 :             Lsn(0x18)..Lsn(0x20),
     586            8 :             4 * 1024 * 1024,
     587            8 :             &tline.gate,
     588            8 :             tline.cancel.clone(),
     589            8 :         )
     590            8 :         .await
     591            8 :         .unwrap();
     592              :         const N: usize = 2000;
     593        16008 :         for i in 0..N {
     594        16000 :             let i = i as u32;
     595        16000 :             image_writer
     596        16000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     597        16000 :                 .await
     598        16000 :                 .unwrap();
     599        16000 :             delta_writer
     600        16000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     601        16000 :                 .await
     602        16000 :                 .unwrap();
     603              :         }
     604            8 :         let image_layers = image_writer
     605           32 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     606            8 :             .await
     607            8 :             .unwrap();
     608            8 :         let delta_layers = delta_writer
     609           32 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     610            8 :             .await
     611            8 :             .unwrap();
     612            8 :         let image_layers = image_layers
     613            8 :             .into_iter()
     614           32 :             .map(|x| {
     615           32 :                 if discard {
     616           16 :                     x.into_discarded_layer()
     617              :                 } else {
     618           16 :                     x.into_resident_layer().layer_desc().key()
     619              :                 }
     620           32 :             })
     621            8 :             .collect_vec();
     622            8 :         let delta_layers = delta_layers
     623            8 :             .into_iter()
     624           32 :             .map(|x| {
     625           32 :                 if discard {
     626           16 :                     x.into_discarded_layer()
     627              :                 } else {
     628           16 :                     x.into_resident_layer().layer_desc().key()
     629              :                 }
     630           32 :             })
     631            8 :             .collect_vec();
     632            8 :         assert_eq!(image_layers.len(), N / 512 + 1);
     633            8 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     634            8 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     635            8 :         assert_eq!(
     636            8 :             delta_layers.last().unwrap().key_range.end,
     637            8 :             get_key(N as u32)
     638            8 :         );
     639           32 :         for idx in 0..image_layers.len() {
     640           32 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     641           32 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     642           32 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     643           32 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     644           32 :             if idx > 0 {
     645           24 :                 assert_eq!(
     646           24 :                     image_layers[idx - 1].key_range.end,
     647           24 :                     image_layers[idx].key_range.start
     648           24 :                 );
     649           24 :                 assert_eq!(
     650           24 :                     delta_layers[idx - 1].key_range.end,
     651           24 :                     delta_layers[idx].key_range.start
     652           24 :                 );
     653            8 :             }
     654              :         }
     655            8 :     }
     656              : 
     657              :     #[tokio::test]
     658            4 :     async fn write_large_img() {
     659            4 :         let harness = TenantHarness::create("split_writer_write_large_img")
     660            4 :             .await
     661            4 :             .unwrap();
     662            4 :         let (tenant, ctx) = harness.load().await;
     663            4 : 
     664            4 :         let tline = tenant
     665            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     666            4 :             .await
     667            4 :             .unwrap();
     668            4 : 
     669            4 :         let mut image_writer = SplitImageLayerWriter::new(
     670            4 :             tenant.conf,
     671            4 :             tline.timeline_id,
     672            4 :             tenant.tenant_shard_id,
     673            4 :             get_key(0),
     674            4 :             Lsn(0x18),
     675            4 :             4 * 1024,
     676            4 :             &tline.gate,
     677            4 :             tline.cancel.clone(),
     678            4 :             &ctx,
     679            4 :         )
     680            4 :         .await
     681            4 :         .unwrap();
     682            4 : 
     683            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     684            4 :             tenant.conf,
     685            4 :             tline.timeline_id,
     686            4 :             tenant.tenant_shard_id,
     687            4 :             Lsn(0x18)..Lsn(0x20),
     688            4 :             4 * 1024,
     689            4 :             &tline.gate,
     690            4 :             tline.cancel.clone(),
     691            4 :         )
     692            4 :         .await
     693            4 :         .unwrap();
     694            4 : 
     695            4 :         image_writer
     696            4 :             .put_image(get_key(0), get_img(0), &ctx)
     697            4 :             .await
     698            4 :             .unwrap();
     699            4 :         image_writer
     700            4 :             .put_image(get_key(1), get_large_img(), &ctx)
     701            4 :             .await
     702            4 :             .unwrap();
     703            4 :         let layers = image_writer
     704            4 :             .finish(&tline, &ctx, get_key(10))
     705            4 :             .await
     706            4 :             .unwrap();
     707            4 :         assert_eq!(layers.len(), 2);
     708            4 : 
     709            4 :         delta_writer
     710            4 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     711            4 :             .await
     712            4 :             .unwrap();
     713            4 :         delta_writer
     714            4 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     715            4 :             .await
     716            4 :             .unwrap();
     717            4 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     718            4 :         assert_eq!(layers.len(), 2);
     719            4 :         let mut layers_iter = layers.into_iter();
     720            4 :         assert_eq!(
     721            4 :             layers_iter
     722            4 :                 .next()
     723            4 :                 .unwrap()
     724            4 :                 .into_resident_layer()
     725            4 :                 .layer_desc()
     726            4 :                 .key(),
     727            4 :             PersistentLayerKey {
     728            4 :                 key_range: get_key(0)..get_key(1),
     729            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     730            4 :                 is_delta: true
     731            4 :             }
     732            4 :         );
     733            4 :         assert_eq!(
     734            4 :             layers_iter
     735            4 :                 .next()
     736            4 :                 .unwrap()
     737            4 :                 .into_resident_layer()
     738            4 :                 .layer_desc()
     739            4 :                 .key(),
     740            4 :             PersistentLayerKey {
     741            4 :                 key_range: get_key(1)..get_key(2),
     742            4 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     743            4 :                 is_delta: true
     744            4 :             }
     745            4 :         );
     746            4 :     }
     747              : 
     748              :     #[tokio::test]
     749            4 :     async fn write_split_single_key() {
     750            4 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     751            4 :             .await
     752            4 :             .unwrap();
     753            4 :         let (tenant, ctx) = harness.load().await;
     754            4 : 
     755            4 :         let tline = tenant
     756            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     757            4 :             .await
     758            4 :             .unwrap();
     759            4 : 
     760            4 :         const N: usize = 2000;
     761            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     762            4 :             tenant.conf,
     763            4 :             tline.timeline_id,
     764            4 :             tenant.tenant_shard_id,
     765            4 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     766            4 :             4 * 1024 * 1024,
     767            4 :             &tline.gate,
     768            4 :             tline.cancel.clone(),
     769            4 :         )
     770            4 :         .await
     771            4 :         .unwrap();
     772            4 : 
     773         8004 :         for i in 0..N {
     774         8000 :             let i = i as u32;
     775         8000 :             delta_writer
     776         8000 :                 .put_value(
     777         8000 :                     get_key(0),
     778         8000 :                     Lsn(i as u64 * 16 + 0x10),
     779         8000 :                     Value::Image(get_large_img()),
     780         8000 :                     &ctx,
     781         8000 :                 )
     782         8000 :                 .await
     783         8000 :                 .unwrap();
     784            4 :         }
     785            4 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     786            4 :         assert_eq!(delta_layers.len(), 1);
     787            4 :         let delta_layer = delta_layers
     788            4 :             .into_iter()
     789            4 :             .next()
     790            4 :             .unwrap()
     791            4 :             .into_resident_layer();
     792            4 :         assert_eq!(
     793            4 :             delta_layer.layer_desc().key(),
     794            4 :             PersistentLayerKey {
     795            4 :                 key_range: get_key(0)..get_key(1),
     796            4 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     797            4 :                 is_delta: true
     798            4 :             }
     799            4 :         );
     800            4 :     }
     801              : }
        

Generated by: LCOV version 2.1-beta