LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 96.1 % 648 623
Test Date: 2025-05-26 10:37:33 Functions: 91.0 % 78 71

            Line data    Source code
       1              : use std::future::Future;
       2              : use std::ops::Range;
       3              : use std::sync::Arc;
       4              : 
       5              : use bytes::Bytes;
       6              : use pageserver_api::key::{KEY_SIZE, Key};
       7              : use pageserver_api::value::Value;
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::TimelineId;
      10              : use utils::lsn::Lsn;
      11              : use utils::shard::TenantShardId;
      12              : 
      13              : use super::errors::PutError;
      14              : use super::layer::S3_UPLOAD_LIMIT;
      15              : use super::{
      16              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      17              : };
      18              : use crate::config::PageServerConf;
      19              : use crate::context::RequestContext;
      20              : use crate::tenant::Timeline;
      21              : use crate::tenant::storage_layer::Layer;
      22              : 
      23              : pub(crate) enum BatchWriterResult {
      24              :     Produced(ResidentLayer),
      25              :     Discarded(PersistentLayerKey),
      26              : }
      27              : 
      28              : #[cfg(test)]
      29              : impl BatchWriterResult {
      30           12 :     fn into_resident_layer(self) -> ResidentLayer {
      31           12 :         match self {
      32           12 :             BatchWriterResult::Produced(layer) => layer,
      33            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      34              :         }
      35           12 :     }
      36              : 
      37            8 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      38            8 :         match self {
      39            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      40            8 :             BatchWriterResult::Discarded(layer) => layer,
      41            8 :         }
      42            8 :     }
      43              : }
      44              : 
      45              : enum LayerWriterWrapper {
      46              :     Image(ImageLayerWriter),
      47              :     Delta(DeltaLayerWriter),
      48              : }
      49              : 
      50              : /// An layer writer that takes unfinished layers and finish them atomically.
      51              : #[must_use]
      52              : pub struct BatchLayerWriter {
      53              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      54              :     conf: &'static PageServerConf,
      55              : }
      56              : 
      57              : impl BatchLayerWriter {
      58          349 :     pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
      59          349 :         Ok(Self {
      60          349 :             generated_layer_writers: Vec::new(),
      61          349 :             conf,
      62          349 :         })
      63          349 :     }
      64              : 
      65          158 :     pub fn add_unfinished_image_writer(
      66          158 :         &mut self,
      67          158 :         writer: ImageLayerWriter,
      68          158 :         key_range: Range<Key>,
      69          158 :         lsn: Lsn,
      70          158 :     ) {
      71          158 :         self.generated_layer_writers.push((
      72          158 :             LayerWriterWrapper::Image(writer),
      73          158 :             PersistentLayerKey {
      74          158 :                 key_range,
      75          158 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      76          158 :                 is_delta: false,
      77          158 :             },
      78          158 :         ));
      79          158 :     }
      80              : 
      81           30 :     pub fn add_unfinished_delta_writer(
      82           30 :         &mut self,
      83           30 :         writer: DeltaLayerWriter,
      84           30 :         key_range: Range<Key>,
      85           30 :         lsn_range: Range<Lsn>,
      86           30 :     ) {
      87           30 :         self.generated_layer_writers.push((
      88           30 :             LayerWriterWrapper::Delta(writer),
      89           30 :             PersistentLayerKey {
      90           30 :                 key_range,
      91           30 :                 lsn_range,
      92           30 :                 is_delta: true,
      93           30 :             },
      94           30 :         ));
      95           30 :     }
      96              : 
      97          291 :     pub(crate) async fn finish(
      98          291 :         self,
      99          291 :         tline: &Arc<Timeline>,
     100          291 :         ctx: &RequestContext,
     101          291 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
     102          291 :         let res = self
     103          291 :             .finish_with_discard_fn(tline, ctx, |_| async { false })
     104          291 :             .await?;
     105          291 :         let mut output = Vec::new();
     106          419 :         for r in res {
     107          128 :             if let BatchWriterResult::Produced(layer) = r {
     108          128 :                 output.push(layer);
     109          128 :             }
     110              :         }
     111          291 :         Ok(output)
     112          291 :     }
     113              : 
     114          343 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     115          343 :         self,
     116          343 :         tline: &Arc<Timeline>,
     117          343 :         ctx: &RequestContext,
     118          343 :         discard_fn: D,
     119          343 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     120          343 :     where
     121          343 :         D: Fn(&PersistentLayerKey) -> F,
     122          343 :         F: Future<Output = bool>,
     123          343 :     {
     124          343 :         let Self {
     125          343 :             generated_layer_writers,
     126          343 :             ..
     127          343 :         } = self;
     128          343 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     129            0 :             for produced_layer in generated_layers {
     130            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     131            0 :                     let layer: Layer = resident_layer.into();
     132            0 :                     layer.delete_on_drop();
     133            0 :                 }
     134              :             }
     135            0 :         };
     136              :         // BEGIN: catch every error and do the recovery in the below section
     137          343 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     138          531 :         for (inner, layer_key) in generated_layer_writers {
     139          188 :             if discard_fn(&layer_key).await {
     140           19 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     141           19 :             } else {
     142          169 :                 let res = match inner {
     143           19 :                     LayerWriterWrapper::Delta(writer) => {
     144           19 :                         writer.finish(layer_key.key_range.end, ctx).await
     145              :                     }
     146          150 :                     LayerWriterWrapper::Image(writer) => {
     147          150 :                         writer
     148          150 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     149          150 :                             .await
     150              :                     }
     151              :                 };
     152          169 :                 let layer = match res {
     153          169 :                     Ok((desc, path)) => {
     154          169 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     155          169 :                             Ok(layer) => layer,
     156            0 :                             Err(e) => {
     157            0 :                                 tokio::fs::remove_file(&path).await.ok();
     158            0 :                                 clean_up_layers(generated_layers);
     159            0 :                                 return Err(e);
     160              :                             }
     161              :                         }
     162              :                     }
     163            0 :                     Err(e) => {
     164            0 :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     165            0 :                         // so we don't need to remove the layer we just failed to create by ourselves.
     166            0 :                         clean_up_layers(generated_layers);
     167            0 :                         return Err(e);
     168              :                     }
     169              :                 };
     170          169 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     171              :             }
     172              :         }
     173              :         // END: catch every error and do the recovery in the above section
     174          343 :         Ok(generated_layers)
     175          343 :     }
     176              : 
     177            0 :     pub fn pending_layer_num(&self) -> usize {
     178            0 :         self.generated_layer_writers.len()
     179            0 :     }
     180              : }
     181              : 
     182              : /// An image writer that takes images and produces multiple image layers.
     183              : #[must_use]
     184              : pub struct SplitImageLayerWriter<'a> {
     185              :     inner: ImageLayerWriter,
     186              :     target_layer_size: u64,
     187              :     lsn: Lsn,
     188              :     conf: &'static PageServerConf,
     189              :     timeline_id: TimelineId,
     190              :     tenant_shard_id: TenantShardId,
     191              :     batches: BatchLayerWriter,
     192              :     start_key: Key,
     193              :     gate: &'a utils::sync::gate::Gate,
     194              :     cancel: CancellationToken,
     195              : }
     196              : 
     197              : impl<'a> SplitImageLayerWriter<'a> {
     198              :     #[allow(clippy::too_many_arguments)]
     199           26 :     pub async fn new(
     200           26 :         conf: &'static PageServerConf,
     201           26 :         timeline_id: TimelineId,
     202           26 :         tenant_shard_id: TenantShardId,
     203           26 :         start_key: Key,
     204           26 :         lsn: Lsn,
     205           26 :         target_layer_size: u64,
     206           26 :         gate: &'a utils::sync::gate::Gate,
     207           26 :         cancel: CancellationToken,
     208           26 :         ctx: &RequestContext,
     209           26 :     ) -> anyhow::Result<Self> {
     210           26 :         Ok(Self {
     211           26 :             target_layer_size,
     212           26 :             inner: ImageLayerWriter::new(
     213           26 :                 conf,
     214           26 :                 timeline_id,
     215           26 :                 tenant_shard_id,
     216           26 :                 &(start_key..Key::MAX),
     217           26 :                 lsn,
     218           26 :                 gate,
     219           26 :                 cancel.clone(),
     220           26 :                 ctx,
     221           26 :             )
     222           26 :             .await?,
     223           26 :             conf,
     224           26 :             timeline_id,
     225           26 :             tenant_shard_id,
     226           26 :             batches: BatchLayerWriter::new(conf).await?,
     227           26 :             lsn,
     228           26 :             start_key,
     229           26 :             gate,
     230           26 :             cancel,
     231              :         })
     232           26 :     }
     233              : 
     234         4303 :     pub async fn put_image(
     235         4303 :         &mut self,
     236         4303 :         key: Key,
     237         4303 :         img: Bytes,
     238         4303 :         ctx: &RequestContext,
     239         4303 :     ) -> Result<(), PutError> {
     240         4303 :         // The current estimation is an upper bound of the space that the key/image could take
     241         4303 :         // because we did not consider compression in this estimation. The resulting image layer
     242         4303 :         // could be smaller than the target size.
     243         4303 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     244         4303 :         if self.inner.num_keys() >= 1
     245         4277 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     246              :         {
     247            7 :             let next_image_writer = ImageLayerWriter::new(
     248            7 :                 self.conf,
     249            7 :                 self.timeline_id,
     250            7 :                 self.tenant_shard_id,
     251            7 :                 &(key..Key::MAX),
     252            7 :                 self.lsn,
     253            7 :                 self.gate,
     254            7 :                 self.cancel.clone(),
     255            7 :                 ctx,
     256            7 :             )
     257            7 :             .await
     258            7 :             .map_err(PutError::Other)?;
     259            7 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     260            7 :             self.batches.add_unfinished_image_writer(
     261            7 :                 prev_image_writer,
     262            7 :                 self.start_key..key,
     263            7 :                 self.lsn,
     264            7 :             );
     265            7 :             self.start_key = key;
     266         4296 :         }
     267         4303 :         self.inner.put_image(key, img, ctx).await
     268         4303 :     }
     269              : 
     270           23 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     271           23 :         self,
     272           23 :         tline: &Arc<Timeline>,
     273           23 :         ctx: &RequestContext,
     274           23 :         end_key: Key,
     275           23 :         discard_fn: D,
     276           23 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     277           23 :     where
     278           23 :         D: Fn(&PersistentLayerKey) -> F,
     279           23 :         F: Future<Output = bool>,
     280           23 :     {
     281           23 :         let Self {
     282           23 :             mut batches, inner, ..
     283           23 :         } = self;
     284           23 :         if inner.num_keys() != 0 {
     285           23 :             batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     286           23 :         }
     287           23 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     288           23 :     }
     289              : 
     290              :     #[cfg(test)]
     291            2 :     pub(crate) async fn finish(
     292            2 :         self,
     293            2 :         tline: &Arc<Timeline>,
     294            2 :         ctx: &RequestContext,
     295            2 :         end_key: Key,
     296            2 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     297            3 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     298            2 :             .await
     299            2 :     }
     300              : }
     301              : 
     302              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     303              : ///
     304              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     305              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     306              : /// will split them into multiple files based on size.
     307              : #[must_use]
     308              : pub struct SplitDeltaLayerWriter<'a> {
     309              :     inner: Option<(Key, DeltaLayerWriter)>,
     310              :     target_layer_size: u64,
     311              :     conf: &'static PageServerConf,
     312              :     timeline_id: TimelineId,
     313              :     tenant_shard_id: TenantShardId,
     314              :     lsn_range: Range<Lsn>,
     315              :     last_key_written: Key,
     316              :     batches: BatchLayerWriter,
     317              :     gate: &'a utils::sync::gate::Gate,
     318              :     cancel: CancellationToken,
     319              : }
     320              : 
     321              : impl<'a> SplitDeltaLayerWriter<'a> {
     322           32 :     pub async fn new(
     323           32 :         conf: &'static PageServerConf,
     324           32 :         timeline_id: TimelineId,
     325           32 :         tenant_shard_id: TenantShardId,
     326           32 :         lsn_range: Range<Lsn>,
     327           32 :         target_layer_size: u64,
     328           32 :         gate: &'a utils::sync::gate::Gate,
     329           32 :         cancel: CancellationToken,
     330           32 :     ) -> anyhow::Result<Self> {
     331           32 :         Ok(Self {
     332           32 :             target_layer_size,
     333           32 :             inner: None,
     334           32 :             conf,
     335           32 :             timeline_id,
     336           32 :             tenant_shard_id,
     337           32 :             lsn_range,
     338           32 :             last_key_written: Key::MIN,
     339           32 :             batches: BatchLayerWriter::new(conf).await?,
     340           32 :             gate,
     341           32 :             cancel,
     342              :         })
     343           32 :     }
     344              : 
     345         6104 :     pub async fn put_value(
     346         6104 :         &mut self,
     347         6104 :         key: Key,
     348         6104 :         lsn: Lsn,
     349         6104 :         val: Value,
     350         6104 :         ctx: &RequestContext,
     351         6104 :     ) -> Result<(), PutError> {
     352         6104 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     353         6104 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     354         6104 :         //
     355         6104 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     356         6104 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     357         6104 : 
     358         6104 :         if self.inner.is_none() {
     359           25 :             self.inner = Some((
     360           25 :                 key,
     361           25 :                 DeltaLayerWriter::new(
     362           25 :                     self.conf,
     363           25 :                     self.timeline_id,
     364           25 :                     self.tenant_shard_id,
     365           25 :                     key,
     366           25 :                     self.lsn_range.clone(),
     367           25 :                     self.gate,
     368           25 :                     self.cancel.clone(),
     369           25 :                     ctx,
     370           25 :                 )
     371           25 :                 .await
     372           25 :                 .map_err(PutError::Other)?,
     373              :             ));
     374         6079 :         }
     375         6104 :         let (_, inner) = self.inner.as_mut().unwrap();
     376         6104 : 
     377         6104 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     378         6104 :         if inner.num_keys() >= 1
     379         6079 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     380              :         {
     381         1498 :             if key != self.last_key_written {
     382            7 :                 let next_delta_writer = DeltaLayerWriter::new(
     383            7 :                     self.conf,
     384            7 :                     self.timeline_id,
     385            7 :                     self.tenant_shard_id,
     386            7 :                     key,
     387            7 :                     self.lsn_range.clone(),
     388            7 :                     self.gate,
     389            7 :                     self.cancel.clone(),
     390            7 :                     ctx,
     391            7 :                 )
     392            7 :                 .await
     393            7 :                 .map_err(PutError::Other)?;
     394            7 :                 let (start_key, prev_delta_writer) =
     395            7 :                     self.inner.replace((key, next_delta_writer)).unwrap();
     396            7 :                 self.batches.add_unfinished_delta_writer(
     397            7 :                     prev_delta_writer,
     398            7 :                     start_key..key,
     399            7 :                     self.lsn_range.clone(),
     400            7 :                 );
     401         1491 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     402              :                 // We have to produce a very large file b/c a key is updated too often.
     403            0 :                 return Err(PutError::Other(anyhow::anyhow!(
     404            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     405            0 :                     key,
     406            0 :                     inner.estimated_size()
     407            0 :                 )));
     408         1491 :             }
     409         4606 :         }
     410         6104 :         self.last_key_written = key;
     411         6104 :         let (_, inner) = self.inner.as_mut().unwrap();
     412         6104 :         inner.put_value(key, lsn, val, ctx).await
     413         6104 :     }
     414              : 
     415           29 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     416           29 :         self,
     417           29 :         tline: &Arc<Timeline>,
     418           29 :         ctx: &RequestContext,
     419           29 :         discard_fn: D,
     420           29 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     421           29 :     where
     422           29 :         D: Fn(&PersistentLayerKey) -> F,
     423           29 :         F: Future<Output = bool>,
     424           29 :     {
     425           29 :         let Self {
     426           29 :             mut batches, inner, ..
     427           29 :         } = self;
     428           29 :         if let Some((start_key, writer)) = inner {
     429           23 :             if writer.num_keys() != 0 {
     430           23 :                 let end_key = self.last_key_written.next();
     431           23 :                 batches.add_unfinished_delta_writer(
     432           23 :                     writer,
     433           23 :                     start_key..end_key,
     434           23 :                     self.lsn_range.clone(),
     435           23 :                 );
     436           23 :             }
     437            6 :         }
     438           29 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     439           29 :     }
     440              : 
     441              :     #[cfg(test)]
     442            3 :     pub(crate) async fn finish(
     443            3 :         self,
     444            3 :         tline: &Arc<Timeline>,
     445            3 :         ctx: &RequestContext,
     446            3 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     447            4 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     448            3 :             .await
     449            3 :     }
     450              : }
     451              : 
     452              : #[cfg(test)]
     453              : mod tests {
     454              :     use itertools::Itertools;
     455              :     use rand::{RngCore, SeedableRng};
     456              : 
     457              :     use super::*;
     458              :     use crate::DEFAULT_PG_VERSION;
     459              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     460              :     use crate::tenant::storage_layer::AsLayerDesc;
     461              : 
     462        10026 :     fn get_key(id: u32) -> Key {
     463        10026 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     464        10026 :         key.field6 = id;
     465        10026 :         key
     466        10026 :     }
     467              : 
     468            4 :     fn get_img(id: u32) -> Bytes {
     469            4 :         format!("{id:064}").into()
     470            4 :     }
     471              : 
     472        10002 :     fn get_large_img() -> Bytes {
     473        10002 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     474        10002 :         let mut data = vec![0; 8192];
     475        10002 :         rng.fill_bytes(&mut data);
     476        10002 :         data.into()
     477        10002 :     }
     478              : 
     479              :     #[tokio::test]
     480            1 :     async fn write_one_image() {
     481            1 :         let harness = TenantHarness::create("split_writer_write_one_image")
     482            1 :             .await
     483            1 :             .unwrap();
     484            1 :         let (tenant, ctx) = harness.load().await;
     485            1 : 
     486            1 :         let tline = tenant
     487            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     488            1 :             .await
     489            1 :             .unwrap();
     490            1 : 
     491            1 :         let mut image_writer = SplitImageLayerWriter::new(
     492            1 :             tenant.conf,
     493            1 :             tline.timeline_id,
     494            1 :             tenant.tenant_shard_id,
     495            1 :             get_key(0),
     496            1 :             Lsn(0x18),
     497            1 :             4 * 1024 * 1024,
     498            1 :             &tline.gate,
     499            1 :             tline.cancel.clone(),
     500            1 :             &ctx,
     501            1 :         )
     502            1 :         .await
     503            1 :         .unwrap();
     504            1 : 
     505            1 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     506            1 :             tenant.conf,
     507            1 :             tline.timeline_id,
     508            1 :             tenant.tenant_shard_id,
     509            1 :             Lsn(0x18)..Lsn(0x20),
     510            1 :             4 * 1024 * 1024,
     511            1 :             &tline.gate,
     512            1 :             tline.cancel.clone(),
     513            1 :         )
     514            1 :         .await
     515            1 :         .unwrap();
     516            1 : 
     517            1 :         image_writer
     518            1 :             .put_image(get_key(0), get_img(0), &ctx)
     519            1 :             .await
     520            1 :             .unwrap();
     521            1 :         let layers = image_writer
     522            1 :             .finish(&tline, &ctx, get_key(10))
     523            1 :             .await
     524            1 :             .unwrap();
     525            1 :         assert_eq!(layers.len(), 1);
     526            1 : 
     527            1 :         delta_writer
     528            1 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     529            1 :             .await
     530            1 :             .unwrap();
     531            1 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     532            1 :         assert_eq!(layers.len(), 1);
     533            1 :         assert_eq!(
     534            1 :             layers
     535            1 :                 .into_iter()
     536            1 :                 .next()
     537            1 :                 .unwrap()
     538            1 :                 .into_resident_layer()
     539            1 :                 .layer_desc()
     540            1 :                 .key(),
     541            1 :             PersistentLayerKey {
     542            1 :                 key_range: get_key(0)..get_key(1),
     543            1 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     544            1 :                 is_delta: true
     545            1 :             }
     546            1 :         );
     547            1 :     }
     548              : 
     549              :     #[tokio::test]
     550            1 :     async fn write_split() {
     551            1 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     552            1 :         write_split_helper("split_writer_write_split", false).await;
     553            1 :     }
     554              : 
     555              :     #[tokio::test]
     556            1 :     async fn write_split_discard() {
     557            1 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     558            1 :         write_split_helper("split_writer_write_split_discard", true).await;
     559            1 :     }
     560              : 
     561              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     562              :     /// set to true, all layers will be discarded.
     563            2 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     564            2 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     565            2 :         let (tenant, ctx) = harness.load().await;
     566              : 
     567            2 :         let tline = tenant
     568            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     569            2 :             .await
     570            2 :             .unwrap();
     571              : 
     572            2 :         let mut image_writer = SplitImageLayerWriter::new(
     573            2 :             tenant.conf,
     574            2 :             tline.timeline_id,
     575            2 :             tenant.tenant_shard_id,
     576            2 :             get_key(0),
     577            2 :             Lsn(0x18),
     578            2 :             4 * 1024 * 1024,
     579            2 :             &tline.gate,
     580            2 :             tline.cancel.clone(),
     581            2 :             &ctx,
     582            2 :         )
     583            2 :         .await
     584            2 :         .unwrap();
     585            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     586            2 :             tenant.conf,
     587            2 :             tline.timeline_id,
     588            2 :             tenant.tenant_shard_id,
     589            2 :             Lsn(0x18)..Lsn(0x20),
     590            2 :             4 * 1024 * 1024,
     591            2 :             &tline.gate,
     592            2 :             tline.cancel.clone(),
     593            2 :         )
     594            2 :         .await
     595            2 :         .unwrap();
     596              :         const N: usize = 2000;
     597         4002 :         for i in 0..N {
     598         4000 :             let i = i as u32;
     599         4000 :             image_writer
     600         4000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     601         4000 :                 .await
     602         4000 :                 .unwrap();
     603         4000 :             delta_writer
     604         4000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     605         4000 :                 .await
     606         4000 :                 .unwrap();
     607              :         }
     608            2 :         let image_layers = image_writer
     609            8 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     610            2 :             .await
     611            2 :             .unwrap();
     612            2 :         let delta_layers = delta_writer
     613            8 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     614            2 :             .await
     615            2 :             .unwrap();
     616            2 :         let image_layers = image_layers
     617            2 :             .into_iter()
     618            8 :             .map(|x| {
     619            8 :                 if discard {
     620            4 :                     x.into_discarded_layer()
     621              :                 } else {
     622            4 :                     x.into_resident_layer().layer_desc().key()
     623              :                 }
     624            8 :             })
     625            2 :             .collect_vec();
     626            2 :         let delta_layers = delta_layers
     627            2 :             .into_iter()
     628            8 :             .map(|x| {
     629            8 :                 if discard {
     630            4 :                     x.into_discarded_layer()
     631              :                 } else {
     632            4 :                     x.into_resident_layer().layer_desc().key()
     633              :                 }
     634            8 :             })
     635            2 :             .collect_vec();
     636            2 :         assert_eq!(image_layers.len(), N / 512 + 1);
     637            2 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     638            2 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     639            2 :         assert_eq!(
     640            2 :             delta_layers.last().unwrap().key_range.end,
     641            2 :             get_key(N as u32)
     642            2 :         );
     643            8 :         for idx in 0..image_layers.len() {
     644            8 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     645            8 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     646            8 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     647            8 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     648            8 :             if idx > 0 {
     649            6 :                 assert_eq!(
     650            6 :                     image_layers[idx - 1].key_range.end,
     651            6 :                     image_layers[idx].key_range.start
     652            6 :                 );
     653            6 :                 assert_eq!(
     654            6 :                     delta_layers[idx - 1].key_range.end,
     655            6 :                     delta_layers[idx].key_range.start
     656            6 :                 );
     657            2 :             }
     658              :         }
     659            2 :     }
     660              : 
     661              :     #[tokio::test]
     662            1 :     async fn write_large_img() {
     663            1 :         let harness = TenantHarness::create("split_writer_write_large_img")
     664            1 :             .await
     665            1 :             .unwrap();
     666            1 :         let (tenant, ctx) = harness.load().await;
     667            1 : 
     668            1 :         let tline = tenant
     669            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     670            1 :             .await
     671            1 :             .unwrap();
     672            1 : 
     673            1 :         let mut image_writer = SplitImageLayerWriter::new(
     674            1 :             tenant.conf,
     675            1 :             tline.timeline_id,
     676            1 :             tenant.tenant_shard_id,
     677            1 :             get_key(0),
     678            1 :             Lsn(0x18),
     679            1 :             4 * 1024,
     680            1 :             &tline.gate,
     681            1 :             tline.cancel.clone(),
     682            1 :             &ctx,
     683            1 :         )
     684            1 :         .await
     685            1 :         .unwrap();
     686            1 : 
     687            1 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     688            1 :             tenant.conf,
     689            1 :             tline.timeline_id,
     690            1 :             tenant.tenant_shard_id,
     691            1 :             Lsn(0x18)..Lsn(0x20),
     692            1 :             4 * 1024,
     693            1 :             &tline.gate,
     694            1 :             tline.cancel.clone(),
     695            1 :         )
     696            1 :         .await
     697            1 :         .unwrap();
     698            1 : 
     699            1 :         image_writer
     700            1 :             .put_image(get_key(0), get_img(0), &ctx)
     701            1 :             .await
     702            1 :             .unwrap();
     703            1 :         image_writer
     704            1 :             .put_image(get_key(1), get_large_img(), &ctx)
     705            1 :             .await
     706            1 :             .unwrap();
     707            1 :         let layers = image_writer
     708            1 :             .finish(&tline, &ctx, get_key(10))
     709            1 :             .await
     710            1 :             .unwrap();
     711            1 :         assert_eq!(layers.len(), 2);
     712            1 : 
     713            1 :         delta_writer
     714            1 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     715            1 :             .await
     716            1 :             .unwrap();
     717            1 :         delta_writer
     718            1 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     719            1 :             .await
     720            1 :             .unwrap();
     721            1 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     722            1 :         assert_eq!(layers.len(), 2);
     723            1 :         let mut layers_iter = layers.into_iter();
     724            1 :         assert_eq!(
     725            1 :             layers_iter
     726            1 :                 .next()
     727            1 :                 .unwrap()
     728            1 :                 .into_resident_layer()
     729            1 :                 .layer_desc()
     730            1 :                 .key(),
     731            1 :             PersistentLayerKey {
     732            1 :                 key_range: get_key(0)..get_key(1),
     733            1 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     734            1 :                 is_delta: true
     735            1 :             }
     736            1 :         );
     737            1 :         assert_eq!(
     738            1 :             layers_iter
     739            1 :                 .next()
     740            1 :                 .unwrap()
     741            1 :                 .into_resident_layer()
     742            1 :                 .layer_desc()
     743            1 :                 .key(),
     744            1 :             PersistentLayerKey {
     745            1 :                 key_range: get_key(1)..get_key(2),
     746            1 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     747            1 :                 is_delta: true
     748            1 :             }
     749            1 :         );
     750            1 :     }
     751              : 
     752              :     #[tokio::test]
     753            1 :     async fn write_split_single_key() {
     754            1 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     755            1 :             .await
     756            1 :             .unwrap();
     757            1 :         let (tenant, ctx) = harness.load().await;
     758            1 : 
     759            1 :         let tline = tenant
     760            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     761            1 :             .await
     762            1 :             .unwrap();
     763            1 : 
     764            1 :         const N: usize = 2000;
     765            1 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     766            1 :             tenant.conf,
     767            1 :             tline.timeline_id,
     768            1 :             tenant.tenant_shard_id,
     769            1 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     770            1 :             4 * 1024 * 1024,
     771            1 :             &tline.gate,
     772            1 :             tline.cancel.clone(),
     773            1 :         )
     774            1 :         .await
     775            1 :         .unwrap();
     776            1 : 
     777         2001 :         for i in 0..N {
     778         2000 :             let i = i as u32;
     779         2000 :             delta_writer
     780         2000 :                 .put_value(
     781         2000 :                     get_key(0),
     782         2000 :                     Lsn(i as u64 * 16 + 0x10),
     783         2000 :                     Value::Image(get_large_img()),
     784         2000 :                     &ctx,
     785         2000 :                 )
     786         2000 :                 .await
     787         2000 :                 .unwrap();
     788            1 :         }
     789            1 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     790            1 :         assert_eq!(delta_layers.len(), 1);
     791            1 :         let delta_layer = delta_layers
     792            1 :             .into_iter()
     793            1 :             .next()
     794            1 :             .unwrap()
     795            1 :             .into_resident_layer();
     796            1 :         assert_eq!(
     797            1 :             delta_layer.layer_desc().key(),
     798            1 :             PersistentLayerKey {
     799            1 :                 key_range: get_key(0)..get_key(1),
     800            1 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     801            1 :                 is_delta: true
     802            1 :             }
     803            1 :         );
     804            1 :     }
     805              : }
        

Generated by: LCOV version 2.1-beta