LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - batch_split_writer.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 96.1 % 645 620
Test Date: 2025-04-24 20:31:15 Functions: 91.0 % 78 71

            Line data    Source code
       1              : use std::future::Future;
       2              : use std::ops::Range;
       3              : use std::sync::Arc;
       4              : 
       5              : use bytes::Bytes;
       6              : use pageserver_api::key::{KEY_SIZE, Key};
       7              : use pageserver_api::value::Value;
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::TimelineId;
      10              : use utils::lsn::Lsn;
      11              : use utils::shard::TenantShardId;
      12              : 
      13              : use super::layer::S3_UPLOAD_LIMIT;
      14              : use super::{
      15              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      16              : };
      17              : use crate::config::PageServerConf;
      18              : use crate::context::RequestContext;
      19              : use crate::tenant::Timeline;
      20              : use crate::tenant::storage_layer::Layer;
      21              : 
      22              : pub(crate) enum BatchWriterResult {
      23              :     Produced(ResidentLayer),
      24              :     Discarded(PersistentLayerKey),
      25              : }
      26              : 
      27              : #[cfg(test)]
      28              : impl BatchWriterResult {
      29          144 :     fn into_resident_layer(self) -> ResidentLayer {
      30          144 :         match self {
      31          144 :             BatchWriterResult::Produced(layer) => layer,
      32            0 :             BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      33              :         }
      34          144 :     }
      35              : 
      36           96 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      37           96 :         match self {
      38            0 :             BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
      39           96 :             BatchWriterResult::Discarded(layer) => layer,
      40           96 :         }
      41           96 :     }
      42              : }
      43              : 
      44              : enum LayerWriterWrapper {
      45              :     Image(ImageLayerWriter),
      46              :     Delta(DeltaLayerWriter),
      47              : }
      48              : 
      49              : /// An layer writer that takes unfinished layers and finish them atomically.
      50              : #[must_use]
      51              : pub struct BatchLayerWriter {
      52              :     generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
      53              :     conf: &'static PageServerConf,
      54              : }
      55              : 
      56              : impl BatchLayerWriter {
      57         4176 :     pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
      58         4176 :         Ok(Self {
      59         4176 :             generated_layer_writers: Vec::new(),
      60         4176 :             conf,
      61         4176 :         })
      62         4176 :     }
      63              : 
      64         1884 :     pub fn add_unfinished_image_writer(
      65         1884 :         &mut self,
      66         1884 :         writer: ImageLayerWriter,
      67         1884 :         key_range: Range<Key>,
      68         1884 :         lsn: Lsn,
      69         1884 :     ) {
      70         1884 :         self.generated_layer_writers.push((
      71         1884 :             LayerWriterWrapper::Image(writer),
      72         1884 :             PersistentLayerKey {
      73         1884 :                 key_range,
      74         1884 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
      75         1884 :                 is_delta: false,
      76         1884 :             },
      77         1884 :         ));
      78         1884 :     }
      79              : 
      80          360 :     pub fn add_unfinished_delta_writer(
      81          360 :         &mut self,
      82          360 :         writer: DeltaLayerWriter,
      83          360 :         key_range: Range<Key>,
      84          360 :         lsn_range: Range<Lsn>,
      85          360 :     ) {
      86          360 :         self.generated_layer_writers.push((
      87          360 :             LayerWriterWrapper::Delta(writer),
      88          360 :             PersistentLayerKey {
      89          360 :                 key_range,
      90          360 :                 lsn_range,
      91          360 :                 is_delta: true,
      92          360 :             },
      93          360 :         ));
      94          360 :     }
      95              : 
      96         3480 :     pub(crate) async fn finish(
      97         3480 :         self,
      98         3480 :         tline: &Arc<Timeline>,
      99         3480 :         ctx: &RequestContext,
     100         3480 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
     101         3480 :         let res = self
     102         3480 :             .finish_with_discard_fn(tline, ctx, |_| async { false })
     103         3480 :             .await?;
     104         3480 :         let mut output = Vec::new();
     105         5004 :         for r in res {
     106         1524 :             if let BatchWriterResult::Produced(layer) = r {
     107         1524 :                 output.push(layer);
     108         1524 :             }
     109              :         }
     110         3480 :         Ok(output)
     111         3480 :     }
     112              : 
     113         4104 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     114         4104 :         self,
     115         4104 :         tline: &Arc<Timeline>,
     116         4104 :         ctx: &RequestContext,
     117         4104 :         discard_fn: D,
     118         4104 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     119         4104 :     where
     120         4104 :         D: Fn(&PersistentLayerKey) -> F,
     121         4104 :         F: Future<Output = bool>,
     122         4104 :     {
     123         4104 :         let Self {
     124         4104 :             generated_layer_writers,
     125         4104 :             ..
     126         4104 :         } = self;
     127         4104 :         let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
     128            0 :             for produced_layer in generated_layers {
     129            0 :                 if let BatchWriterResult::Produced(resident_layer) = produced_layer {
     130            0 :                     let layer: Layer = resident_layer.into();
     131            0 :                     layer.delete_on_drop();
     132            0 :                 }
     133              :             }
     134            0 :         };
     135              :         // BEGIN: catch every error and do the recovery in the below section
     136         4104 :         let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
     137         6348 :         for (inner, layer_key) in generated_layer_writers {
     138         2244 :             if discard_fn(&layer_key).await {
     139          228 :                 generated_layers.push(BatchWriterResult::Discarded(layer_key));
     140          228 :             } else {
     141         2016 :                 let res = match inner {
     142          228 :                     LayerWriterWrapper::Delta(writer) => {
     143          228 :                         writer.finish(layer_key.key_range.end, ctx).await
     144              :                     }
     145         1788 :                     LayerWriterWrapper::Image(writer) => {
     146         1788 :                         writer
     147         1788 :                             .finish_with_end_key(layer_key.key_range.end, ctx)
     148         1788 :                             .await
     149              :                     }
     150              :                 };
     151         2016 :                 let layer = match res {
     152         2016 :                     Ok((desc, path)) => {
     153         2016 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     154         2016 :                             Ok(layer) => layer,
     155            0 :                             Err(e) => {
     156            0 :                                 tokio::fs::remove_file(&path).await.ok();
     157            0 :                                 clean_up_layers(generated_layers);
     158            0 :                                 return Err(e);
     159              :                             }
     160              :                         }
     161              :                     }
     162            0 :                     Err(e) => {
     163            0 :                         // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     164            0 :                         // so we don't need to remove the layer we just failed to create by ourselves.
     165            0 :                         clean_up_layers(generated_layers);
     166            0 :                         return Err(e);
     167              :                     }
     168              :                 };
     169         2016 :                 generated_layers.push(BatchWriterResult::Produced(layer));
     170              :             }
     171              :         }
     172              :         // END: catch every error and do the recovery in the above section
     173         4104 :         Ok(generated_layers)
     174         4104 :     }
     175              : 
     176            0 :     pub fn pending_layer_num(&self) -> usize {
     177            0 :         self.generated_layer_writers.len()
     178            0 :     }
     179              : }
     180              : 
     181              : /// An image writer that takes images and produces multiple image layers.
     182              : #[must_use]
     183              : pub struct SplitImageLayerWriter<'a> {
     184              :     inner: ImageLayerWriter,
     185              :     target_layer_size: u64,
     186              :     lsn: Lsn,
     187              :     conf: &'static PageServerConf,
     188              :     timeline_id: TimelineId,
     189              :     tenant_shard_id: TenantShardId,
     190              :     batches: BatchLayerWriter,
     191              :     start_key: Key,
     192              :     gate: &'a utils::sync::gate::Gate,
     193              :     cancel: CancellationToken,
     194              : }
     195              : 
     196              : impl<'a> SplitImageLayerWriter<'a> {
     197              :     #[allow(clippy::too_many_arguments)]
     198          312 :     pub async fn new(
     199          312 :         conf: &'static PageServerConf,
     200          312 :         timeline_id: TimelineId,
     201          312 :         tenant_shard_id: TenantShardId,
     202          312 :         start_key: Key,
     203          312 :         lsn: Lsn,
     204          312 :         target_layer_size: u64,
     205          312 :         gate: &'a utils::sync::gate::Gate,
     206          312 :         cancel: CancellationToken,
     207          312 :         ctx: &RequestContext,
     208          312 :     ) -> anyhow::Result<Self> {
     209          312 :         Ok(Self {
     210          312 :             target_layer_size,
     211          312 :             inner: ImageLayerWriter::new(
     212          312 :                 conf,
     213          312 :                 timeline_id,
     214          312 :                 tenant_shard_id,
     215          312 :                 &(start_key..Key::MAX),
     216          312 :                 lsn,
     217          312 :                 gate,
     218          312 :                 cancel.clone(),
     219          312 :                 ctx,
     220          312 :             )
     221          312 :             .await?,
     222          312 :             conf,
     223          312 :             timeline_id,
     224          312 :             tenant_shard_id,
     225          312 :             batches: BatchLayerWriter::new(conf).await?,
     226          312 :             lsn,
     227          312 :             start_key,
     228          312 :             gate,
     229          312 :             cancel,
     230              :         })
     231          312 :     }
     232              : 
     233        51636 :     pub async fn put_image(
     234        51636 :         &mut self,
     235        51636 :         key: Key,
     236        51636 :         img: Bytes,
     237        51636 :         ctx: &RequestContext,
     238        51636 :     ) -> anyhow::Result<()> {
     239        51636 :         // The current estimation is an upper bound of the space that the key/image could take
     240        51636 :         // because we did not consider compression in this estimation. The resulting image layer
     241        51636 :         // could be smaller than the target size.
     242        51636 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
     243        51636 :         if self.inner.num_keys() >= 1
     244        51324 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     245              :         {
     246           84 :             let next_image_writer = ImageLayerWriter::new(
     247           84 :                 self.conf,
     248           84 :                 self.timeline_id,
     249           84 :                 self.tenant_shard_id,
     250           84 :                 &(key..Key::MAX),
     251           84 :                 self.lsn,
     252           84 :                 self.gate,
     253           84 :                 self.cancel.clone(),
     254           84 :                 ctx,
     255           84 :             )
     256           84 :             .await?;
     257           84 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     258           84 :             self.batches.add_unfinished_image_writer(
     259           84 :                 prev_image_writer,
     260           84 :                 self.start_key..key,
     261           84 :                 self.lsn,
     262           84 :             );
     263           84 :             self.start_key = key;
     264        51552 :         }
     265        51636 :         self.inner.put_image(key, img, ctx).await
     266        51636 :     }
     267              : 
     268          276 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     269          276 :         self,
     270          276 :         tline: &Arc<Timeline>,
     271          276 :         ctx: &RequestContext,
     272          276 :         end_key: Key,
     273          276 :         discard_fn: D,
     274          276 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     275          276 :     where
     276          276 :         D: Fn(&PersistentLayerKey) -> F,
     277          276 :         F: Future<Output = bool>,
     278          276 :     {
     279          276 :         let Self {
     280          276 :             mut batches, inner, ..
     281          276 :         } = self;
     282          276 :         if inner.num_keys() != 0 {
     283          276 :             batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
     284          276 :         }
     285          276 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     286          276 :     }
     287              : 
     288              :     #[cfg(test)]
     289           24 :     pub(crate) async fn finish(
     290           24 :         self,
     291           24 :         tline: &Arc<Timeline>,
     292           24 :         ctx: &RequestContext,
     293           24 :         end_key: Key,
     294           24 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     295           36 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     296           24 :             .await
     297           24 :     }
     298              : }
     299              : 
     300              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     301              : ///
     302              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     303              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     304              : /// will split them into multiple files based on size.
     305              : #[must_use]
     306              : pub struct SplitDeltaLayerWriter<'a> {
     307              :     inner: Option<(Key, DeltaLayerWriter)>,
     308              :     target_layer_size: u64,
     309              :     conf: &'static PageServerConf,
     310              :     timeline_id: TimelineId,
     311              :     tenant_shard_id: TenantShardId,
     312              :     lsn_range: Range<Lsn>,
     313              :     last_key_written: Key,
     314              :     batches: BatchLayerWriter,
     315              :     gate: &'a utils::sync::gate::Gate,
     316              :     cancel: CancellationToken,
     317              : }
     318              : 
     319              : impl<'a> SplitDeltaLayerWriter<'a> {
     320          384 :     pub async fn new(
     321          384 :         conf: &'static PageServerConf,
     322          384 :         timeline_id: TimelineId,
     323          384 :         tenant_shard_id: TenantShardId,
     324          384 :         lsn_range: Range<Lsn>,
     325          384 :         target_layer_size: u64,
     326          384 :         gate: &'a utils::sync::gate::Gate,
     327          384 :         cancel: CancellationToken,
     328          384 :     ) -> anyhow::Result<Self> {
     329          384 :         Ok(Self {
     330          384 :             target_layer_size,
     331          384 :             inner: None,
     332          384 :             conf,
     333          384 :             timeline_id,
     334          384 :             tenant_shard_id,
     335          384 :             lsn_range,
     336          384 :             last_key_written: Key::MIN,
     337          384 :             batches: BatchLayerWriter::new(conf).await?,
     338          384 :             gate,
     339          384 :             cancel,
     340              :         })
     341          384 :     }
     342              : 
     343        73248 :     pub async fn put_value(
     344        73248 :         &mut self,
     345        73248 :         key: Key,
     346        73248 :         lsn: Lsn,
     347        73248 :         val: Value,
     348        73248 :         ctx: &RequestContext,
     349        73248 :     ) -> anyhow::Result<()> {
     350        73248 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     351        73248 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     352        73248 :         //
     353        73248 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     354        73248 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     355        73248 : 
     356        73248 :         if self.inner.is_none() {
     357          300 :             self.inner = Some((
     358          300 :                 key,
     359          300 :                 DeltaLayerWriter::new(
     360          300 :                     self.conf,
     361          300 :                     self.timeline_id,
     362          300 :                     self.tenant_shard_id,
     363          300 :                     key,
     364          300 :                     self.lsn_range.clone(),
     365          300 :                     self.gate,
     366          300 :                     self.cancel.clone(),
     367          300 :                     ctx,
     368          300 :                 )
     369          300 :                 .await?,
     370              :             ));
     371        72948 :         }
     372        73248 :         let (_, inner) = self.inner.as_mut().unwrap();
     373        73248 : 
     374        73248 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     375        73248 :         if inner.num_keys() >= 1
     376        72948 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     377              :         {
     378        17976 :             if key != self.last_key_written {
     379           84 :                 let next_delta_writer = DeltaLayerWriter::new(
     380           84 :                     self.conf,
     381           84 :                     self.timeline_id,
     382           84 :                     self.tenant_shard_id,
     383           84 :                     key,
     384           84 :                     self.lsn_range.clone(),
     385           84 :                     self.gate,
     386           84 :                     self.cancel.clone(),
     387           84 :                     ctx,
     388           84 :                 )
     389           84 :                 .await?;
     390           84 :                 let (start_key, prev_delta_writer) =
     391           84 :                     self.inner.replace((key, next_delta_writer)).unwrap();
     392           84 :                 self.batches.add_unfinished_delta_writer(
     393           84 :                     prev_delta_writer,
     394           84 :                     start_key..key,
     395           84 :                     self.lsn_range.clone(),
     396           84 :                 );
     397        17892 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     398              :                 // We have to produce a very large file b/c a key is updated too often.
     399            0 :                 anyhow::bail!(
     400            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     401            0 :                     key,
     402            0 :                     inner.estimated_size()
     403            0 :                 );
     404        17892 :             }
     405        55272 :         }
     406        73248 :         self.last_key_written = key;
     407        73248 :         let (_, inner) = self.inner.as_mut().unwrap();
     408        73248 :         inner.put_value(key, lsn, val, ctx).await
     409        73248 :     }
     410              : 
     411          348 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     412          348 :         self,
     413          348 :         tline: &Arc<Timeline>,
     414          348 :         ctx: &RequestContext,
     415          348 :         discard_fn: D,
     416          348 :     ) -> anyhow::Result<Vec<BatchWriterResult>>
     417          348 :     where
     418          348 :         D: Fn(&PersistentLayerKey) -> F,
     419          348 :         F: Future<Output = bool>,
     420          348 :     {
     421          348 :         let Self {
     422          348 :             mut batches, inner, ..
     423          348 :         } = self;
     424          348 :         if let Some((start_key, writer)) = inner {
     425          276 :             if writer.num_keys() != 0 {
     426          276 :                 let end_key = self.last_key_written.next();
     427          276 :                 batches.add_unfinished_delta_writer(
     428          276 :                     writer,
     429          276 :                     start_key..end_key,
     430          276 :                     self.lsn_range.clone(),
     431          276 :                 );
     432          276 :             }
     433           72 :         }
     434          348 :         batches.finish_with_discard_fn(tline, ctx, discard_fn).await
     435          348 :     }
     436              : 
     437              :     #[cfg(test)]
     438           36 :     pub(crate) async fn finish(
     439           36 :         self,
     440           36 :         tline: &Arc<Timeline>,
     441           36 :         ctx: &RequestContext,
     442           36 :     ) -> anyhow::Result<Vec<BatchWriterResult>> {
     443           48 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     444           36 :             .await
     445           36 :     }
     446              : }
     447              : 
     448              : #[cfg(test)]
     449              : mod tests {
     450              :     use itertools::Itertools;
     451              :     use rand::{RngCore, SeedableRng};
     452              : 
     453              :     use super::*;
     454              :     use crate::DEFAULT_PG_VERSION;
     455              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     456              :     use crate::tenant::storage_layer::AsLayerDesc;
     457              : 
     458       120312 :     fn get_key(id: u32) -> Key {
     459       120312 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     460       120312 :         key.field6 = id;
     461       120312 :         key
     462       120312 :     }
     463              : 
     464           48 :     fn get_img(id: u32) -> Bytes {
     465           48 :         format!("{id:064}").into()
     466           48 :     }
     467              : 
     468       120024 :     fn get_large_img() -> Bytes {
     469       120024 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     470       120024 :         let mut data = vec![0; 8192];
     471       120024 :         rng.fill_bytes(&mut data);
     472       120024 :         data.into()
     473       120024 :     }
     474              : 
     475              :     #[tokio::test]
     476           12 :     async fn write_one_image() {
     477           12 :         let harness = TenantHarness::create("split_writer_write_one_image")
     478           12 :             .await
     479           12 :             .unwrap();
     480           12 :         let (tenant, ctx) = harness.load().await;
     481           12 : 
     482           12 :         let tline = tenant
     483           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     484           12 :             .await
     485           12 :             .unwrap();
     486           12 : 
     487           12 :         let mut image_writer = SplitImageLayerWriter::new(
     488           12 :             tenant.conf,
     489           12 :             tline.timeline_id,
     490           12 :             tenant.tenant_shard_id,
     491           12 :             get_key(0),
     492           12 :             Lsn(0x18),
     493           12 :             4 * 1024 * 1024,
     494           12 :             &tline.gate,
     495           12 :             tline.cancel.clone(),
     496           12 :             &ctx,
     497           12 :         )
     498           12 :         .await
     499           12 :         .unwrap();
     500           12 : 
     501           12 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     502           12 :             tenant.conf,
     503           12 :             tline.timeline_id,
     504           12 :             tenant.tenant_shard_id,
     505           12 :             Lsn(0x18)..Lsn(0x20),
     506           12 :             4 * 1024 * 1024,
     507           12 :             &tline.gate,
     508           12 :             tline.cancel.clone(),
     509           12 :         )
     510           12 :         .await
     511           12 :         .unwrap();
     512           12 : 
     513           12 :         image_writer
     514           12 :             .put_image(get_key(0), get_img(0), &ctx)
     515           12 :             .await
     516           12 :             .unwrap();
     517           12 :         let layers = image_writer
     518           12 :             .finish(&tline, &ctx, get_key(10))
     519           12 :             .await
     520           12 :             .unwrap();
     521           12 :         assert_eq!(layers.len(), 1);
     522           12 : 
     523           12 :         delta_writer
     524           12 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     525           12 :             .await
     526           12 :             .unwrap();
     527           12 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     528           12 :         assert_eq!(layers.len(), 1);
     529           12 :         assert_eq!(
     530           12 :             layers
     531           12 :                 .into_iter()
     532           12 :                 .next()
     533           12 :                 .unwrap()
     534           12 :                 .into_resident_layer()
     535           12 :                 .layer_desc()
     536           12 :                 .key(),
     537           12 :             PersistentLayerKey {
     538           12 :                 key_range: get_key(0)..get_key(1),
     539           12 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     540           12 :                 is_delta: true
     541           12 :             }
     542           12 :         );
     543           12 :     }
     544              : 
     545              :     #[tokio::test]
     546           12 :     async fn write_split() {
     547           12 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     548           12 :         write_split_helper("split_writer_write_split", false).await;
     549           12 :     }
     550              : 
     551              :     #[tokio::test]
     552           12 :     async fn write_split_discard() {
     553           12 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     554           12 :         write_split_helper("split_writer_write_split_discard", true).await;
     555           12 :     }
     556              : 
     557              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     558              :     /// set to true, all layers will be discarded.
     559           24 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     560           24 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     561           24 :         let (tenant, ctx) = harness.load().await;
     562              : 
     563           24 :         let tline = tenant
     564           24 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     565           24 :             .await
     566           24 :             .unwrap();
     567              : 
     568           24 :         let mut image_writer = SplitImageLayerWriter::new(
     569           24 :             tenant.conf,
     570           24 :             tline.timeline_id,
     571           24 :             tenant.tenant_shard_id,
     572           24 :             get_key(0),
     573           24 :             Lsn(0x18),
     574           24 :             4 * 1024 * 1024,
     575           24 :             &tline.gate,
     576           24 :             tline.cancel.clone(),
     577           24 :             &ctx,
     578           24 :         )
     579           24 :         .await
     580           24 :         .unwrap();
     581           24 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     582           24 :             tenant.conf,
     583           24 :             tline.timeline_id,
     584           24 :             tenant.tenant_shard_id,
     585           24 :             Lsn(0x18)..Lsn(0x20),
     586           24 :             4 * 1024 * 1024,
     587           24 :             &tline.gate,
     588           24 :             tline.cancel.clone(),
     589           24 :         )
     590           24 :         .await
     591           24 :         .unwrap();
     592              :         const N: usize = 2000;
     593        48024 :         for i in 0..N {
     594        48000 :             let i = i as u32;
     595        48000 :             image_writer
     596        48000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     597        48000 :                 .await
     598        48000 :                 .unwrap();
     599        48000 :             delta_writer
     600        48000 :                 .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
     601        48000 :                 .await
     602        48000 :                 .unwrap();
     603              :         }
     604           24 :         let image_layers = image_writer
     605           96 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     606           24 :             .await
     607           24 :             .unwrap();
     608           24 :         let delta_layers = delta_writer
     609           96 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     610           24 :             .await
     611           24 :             .unwrap();
     612           24 :         let image_layers = image_layers
     613           24 :             .into_iter()
     614           96 :             .map(|x| {
     615           96 :                 if discard {
     616           48 :                     x.into_discarded_layer()
     617              :                 } else {
     618           48 :                     x.into_resident_layer().layer_desc().key()
     619              :                 }
     620           96 :             })
     621           24 :             .collect_vec();
     622           24 :         let delta_layers = delta_layers
     623           24 :             .into_iter()
     624           96 :             .map(|x| {
     625           96 :                 if discard {
     626           48 :                     x.into_discarded_layer()
     627              :                 } else {
     628           48 :                     x.into_resident_layer().layer_desc().key()
     629              :                 }
     630           96 :             })
     631           24 :             .collect_vec();
     632           24 :         assert_eq!(image_layers.len(), N / 512 + 1);
     633           24 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     634           24 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     635           24 :         assert_eq!(
     636           24 :             delta_layers.last().unwrap().key_range.end,
     637           24 :             get_key(N as u32)
     638           24 :         );
     639           96 :         for idx in 0..image_layers.len() {
     640           96 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     641           96 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     642           96 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     643           96 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     644           96 :             if idx > 0 {
     645           72 :                 assert_eq!(
     646           72 :                     image_layers[idx - 1].key_range.end,
     647           72 :                     image_layers[idx].key_range.start
     648           72 :                 );
     649           72 :                 assert_eq!(
     650           72 :                     delta_layers[idx - 1].key_range.end,
     651           72 :                     delta_layers[idx].key_range.start
     652           72 :                 );
     653           24 :             }
     654              :         }
     655           24 :     }
     656              : 
     657              :     #[tokio::test]
     658           12 :     async fn write_large_img() {
     659           12 :         let harness = TenantHarness::create("split_writer_write_large_img")
     660           12 :             .await
     661           12 :             .unwrap();
     662           12 :         let (tenant, ctx) = harness.load().await;
     663           12 : 
     664           12 :         let tline = tenant
     665           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     666           12 :             .await
     667           12 :             .unwrap();
     668           12 : 
     669           12 :         let mut image_writer = SplitImageLayerWriter::new(
     670           12 :             tenant.conf,
     671           12 :             tline.timeline_id,
     672           12 :             tenant.tenant_shard_id,
     673           12 :             get_key(0),
     674           12 :             Lsn(0x18),
     675           12 :             4 * 1024,
     676           12 :             &tline.gate,
     677           12 :             tline.cancel.clone(),
     678           12 :             &ctx,
     679           12 :         )
     680           12 :         .await
     681           12 :         .unwrap();
     682           12 : 
     683           12 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     684           12 :             tenant.conf,
     685           12 :             tline.timeline_id,
     686           12 :             tenant.tenant_shard_id,
     687           12 :             Lsn(0x18)..Lsn(0x20),
     688           12 :             4 * 1024,
     689           12 :             &tline.gate,
     690           12 :             tline.cancel.clone(),
     691           12 :         )
     692           12 :         .await
     693           12 :         .unwrap();
     694           12 : 
     695           12 :         image_writer
     696           12 :             .put_image(get_key(0), get_img(0), &ctx)
     697           12 :             .await
     698           12 :             .unwrap();
     699           12 :         image_writer
     700           12 :             .put_image(get_key(1), get_large_img(), &ctx)
     701           12 :             .await
     702           12 :             .unwrap();
     703           12 :         let layers = image_writer
     704           12 :             .finish(&tline, &ctx, get_key(10))
     705           12 :             .await
     706           12 :             .unwrap();
     707           12 :         assert_eq!(layers.len(), 2);
     708           12 : 
     709           12 :         delta_writer
     710           12 :             .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
     711           12 :             .await
     712           12 :             .unwrap();
     713           12 :         delta_writer
     714           12 :             .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
     715           12 :             .await
     716           12 :             .unwrap();
     717           12 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     718           12 :         assert_eq!(layers.len(), 2);
     719           12 :         let mut layers_iter = layers.into_iter();
     720           12 :         assert_eq!(
     721           12 :             layers_iter
     722           12 :                 .next()
     723           12 :                 .unwrap()
     724           12 :                 .into_resident_layer()
     725           12 :                 .layer_desc()
     726           12 :                 .key(),
     727           12 :             PersistentLayerKey {
     728           12 :                 key_range: get_key(0)..get_key(1),
     729           12 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     730           12 :                 is_delta: true
     731           12 :             }
     732           12 :         );
     733           12 :         assert_eq!(
     734           12 :             layers_iter
     735           12 :                 .next()
     736           12 :                 .unwrap()
     737           12 :                 .into_resident_layer()
     738           12 :                 .layer_desc()
     739           12 :                 .key(),
     740           12 :             PersistentLayerKey {
     741           12 :                 key_range: get_key(1)..get_key(2),
     742           12 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     743           12 :                 is_delta: true
     744           12 :             }
     745           12 :         );
     746           12 :     }
     747              : 
     748              :     #[tokio::test]
     749           12 :     async fn write_split_single_key() {
     750           12 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     751           12 :             .await
     752           12 :             .unwrap();
     753           12 :         let (tenant, ctx) = harness.load().await;
     754           12 : 
     755           12 :         let tline = tenant
     756           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     757           12 :             .await
     758           12 :             .unwrap();
     759           12 : 
     760           12 :         const N: usize = 2000;
     761           12 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     762           12 :             tenant.conf,
     763           12 :             tline.timeline_id,
     764           12 :             tenant.tenant_shard_id,
     765           12 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     766           12 :             4 * 1024 * 1024,
     767           12 :             &tline.gate,
     768           12 :             tline.cancel.clone(),
     769           12 :         )
     770           12 :         .await
     771           12 :         .unwrap();
     772           12 : 
     773        24012 :         for i in 0..N {
     774        24000 :             let i = i as u32;
     775        24000 :             delta_writer
     776        24000 :                 .put_value(
     777        24000 :                     get_key(0),
     778        24000 :                     Lsn(i as u64 * 16 + 0x10),
     779        24000 :                     Value::Image(get_large_img()),
     780        24000 :                     &ctx,
     781        24000 :                 )
     782        24000 :                 .await
     783        24000 :                 .unwrap();
     784           12 :         }
     785           12 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     786           12 :         assert_eq!(delta_layers.len(), 1);
     787           12 :         let delta_layer = delta_layers
     788           12 :             .into_iter()
     789           12 :             .next()
     790           12 :             .unwrap()
     791           12 :             .into_resident_layer();
     792           12 :         assert_eq!(
     793           12 :             delta_layer.layer_desc().key(),
     794           12 :             PersistentLayerKey {
     795           12 :                 key_range: get_key(0)..get_key(1),
     796           12 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     797           12 :                 is_delta: true
     798           12 :             }
     799           12 :         );
     800           12 :     }
     801              : }
        

Generated by: LCOV version 2.1-beta