LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - split_writer.rs (source / functions) Coverage Total Hit
Test: 895655e4c46f677946f4622dd8690562cfdf7143.info Lines: 95.2 % 623 593
Test Date: 2024-10-22 13:23:20 Functions: 95.5 % 66 63

            Line data    Source code
       1              : use std::{future::Future, ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
       9              : 
      10              : use super::layer::S3_UPLOAD_LIMIT;
      11              : use super::{
      12              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      13              : };
      14              : 
      15              : pub(crate) enum SplitWriterResult {
      16              :     Produced(ResidentLayer),
      17              :     Discarded(PersistentLayerKey),
      18              : }
      19              : 
      20              : #[cfg(test)]
      21              : impl SplitWriterResult {
      22           24 :     fn into_resident_layer(self) -> ResidentLayer {
      23           24 :         match self {
      24           24 :             SplitWriterResult::Produced(layer) => layer,
      25            0 :             SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      26              :         }
      27           24 :     }
      28              : 
      29           16 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      30           16 :         match self {
      31            0 :             SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
      32           16 :             SplitWriterResult::Discarded(layer) => layer,
      33           16 :         }
      34           16 :     }
      35              : }
      36              : 
      37              : /// An image writer that takes images and produces multiple image layers.
      38              : ///
      39              : /// The interface does not guarantee atomicity (i.e., if the image layer generation
      40              : /// fails, there might be leftover files to be cleaned up)
      41              : #[must_use]
      42              : pub struct SplitImageLayerWriter {
      43              :     inner: ImageLayerWriter,
      44              :     target_layer_size: u64,
      45              :     generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
      46              :     conf: &'static PageServerConf,
      47              :     timeline_id: TimelineId,
      48              :     tenant_shard_id: TenantShardId,
      49              :     lsn: Lsn,
      50              :     start_key: Key,
      51              : }
      52              : 
      53              : impl SplitImageLayerWriter {
      54           32 :     pub async fn new(
      55           32 :         conf: &'static PageServerConf,
      56           32 :         timeline_id: TimelineId,
      57           32 :         tenant_shard_id: TenantShardId,
      58           32 :         start_key: Key,
      59           32 :         lsn: Lsn,
      60           32 :         target_layer_size: u64,
      61           32 :         ctx: &RequestContext,
      62           32 :     ) -> anyhow::Result<Self> {
      63           32 :         Ok(Self {
      64           32 :             target_layer_size,
      65           32 :             inner: ImageLayerWriter::new(
      66           32 :                 conf,
      67           32 :                 timeline_id,
      68           32 :                 tenant_shard_id,
      69           32 :                 &(start_key..Key::MAX),
      70           32 :                 lsn,
      71           32 :                 ctx,
      72           32 :             )
      73           16 :             .await?,
      74           32 :             generated_layer_writers: Vec::new(),
      75           32 :             conf,
      76           32 :             timeline_id,
      77           32 :             tenant_shard_id,
      78           32 :             lsn,
      79           32 :             start_key,
      80              :         })
      81           32 :     }
      82              : 
      83         8414 :     pub async fn put_image(
      84         8414 :         &mut self,
      85         8414 :         key: Key,
      86         8414 :         img: Bytes,
      87         8414 :         ctx: &RequestContext,
      88         8414 :     ) -> anyhow::Result<()> {
      89         8414 :         // The current estimation is an upper bound of the space that the key/image could take
      90         8414 :         // because we did not consider compression in this estimation. The resulting image layer
      91         8414 :         // could be smaller than the target size.
      92         8414 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
      93         8414 :         if self.inner.num_keys() >= 1
      94         8382 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
      95              :         {
      96           14 :             let next_image_writer = ImageLayerWriter::new(
      97           14 :                 self.conf,
      98           14 :                 self.timeline_id,
      99           14 :                 self.tenant_shard_id,
     100           14 :                 &(key..Key::MAX),
     101           14 :                 self.lsn,
     102           14 :                 ctx,
     103           14 :             )
     104            7 :             .await?;
     105           14 :             let layer_key = PersistentLayerKey {
     106           14 :                 key_range: self.start_key..key,
     107           14 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     108           14 :                 is_delta: false,
     109           14 :             };
     110           14 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     111           14 :             self.start_key = key;
     112           14 : 
     113           14 :             self.generated_layer_writers
     114           14 :                 .push((prev_image_writer, layer_key));
     115         8400 :         }
     116         8540 :         self.inner.put_image(key, img, ctx).await
     117         8414 :     }
     118              : 
     119           28 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     120           28 :         self,
     121           28 :         tline: &Arc<Timeline>,
     122           28 :         ctx: &RequestContext,
     123           28 :         end_key: Key,
     124           28 :         discard_fn: D,
     125           28 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     126           28 :     where
     127           28 :         D: Fn(&PersistentLayerKey) -> F,
     128           28 :         F: Future<Output = bool>,
     129           28 :     {
     130           28 :         let Self {
     131           28 :             mut generated_layer_writers,
     132           28 :             inner,
     133           28 :             ..
     134           28 :         } = self;
     135           28 :         if inner.num_keys() != 0 {
     136           28 :             let layer_key = PersistentLayerKey {
     137           28 :                 key_range: self.start_key..end_key,
     138           28 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     139           28 :                 is_delta: false,
     140           28 :             };
     141           28 :             generated_layer_writers.push((inner, layer_key));
     142           28 :         }
     143           28 :         let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
     144            0 :             for produced_layer in generated_layers {
     145            0 :                 if let SplitWriterResult::Produced(image_layer) = produced_layer {
     146            0 :                     let layer: Layer = image_layer.into();
     147            0 :                     layer.delete_on_drop();
     148            0 :                 }
     149              :             }
     150            0 :         };
     151              :         // BEGIN: catch every error and do the recovery in the below section
     152           28 :         let mut generated_layers = Vec::new();
     153           70 :         for (inner, layer_key) in generated_layer_writers {
     154           42 :             if discard_fn(&layer_key).await {
     155           16 :                 generated_layers.push(SplitWriterResult::Discarded(layer_key));
     156           16 :             } else {
     157           26 :                 let layer = match inner
     158           26 :                     .finish_with_end_key(layer_key.key_range.end, ctx)
     159           53 :                     .await
     160              :                 {
     161           26 :                     Ok((desc, path)) => {
     162           26 :                         match Layer::finish_creating(self.conf, tline, desc, &path) {
     163           26 :                             Ok(layer) => layer,
     164            0 :                             Err(e) => {
     165            0 :                                 tokio::fs::remove_file(&path).await.ok();
     166            0 :                                 clean_up_layers(generated_layers);
     167            0 :                                 return Err(e);
     168              :                             }
     169              :                         }
     170              :                     }
     171            0 :                     Err(e) => {
     172            0 :                         // ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
     173            0 :                         // so we don't need to remove it by ourselves.
     174            0 :                         clean_up_layers(generated_layers);
     175            0 :                         return Err(e);
     176              :                     }
     177              :                 };
     178           26 :                 generated_layers.push(SplitWriterResult::Produced(layer));
     179              :             }
     180              :         }
     181              :         // END: catch every error and do the recovery in the above section
     182           28 :         Ok(generated_layers)
     183           28 :     }
     184              : 
     185              :     #[cfg(test)]
     186            4 :     pub(crate) async fn finish(
     187            4 :         self,
     188            4 :         tline: &Arc<Timeline>,
     189            4 :         ctx: &RequestContext,
     190            4 :         end_key: Key,
     191            4 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     192            6 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     193           12 :             .await
     194            4 :     }
     195              : }
     196              : 
     197              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     198              : ///
     199              : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
     200              : /// there might be leftover files to be cleaned up).
     201              : ///
     202              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     203              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     204              : /// will split them into multiple files based on size.
     205              : #[must_use]
     206              : pub struct SplitDeltaLayerWriter {
     207              :     inner: Option<(Key, DeltaLayerWriter)>,
     208              :     target_layer_size: u64,
     209              :     generated_layers: Vec<SplitWriterResult>,
     210              :     conf: &'static PageServerConf,
     211              :     timeline_id: TimelineId,
     212              :     tenant_shard_id: TenantShardId,
     213              :     lsn_range: Range<Lsn>,
     214              :     last_key_written: Key,
     215              : }
     216              : 
     217              : impl SplitDeltaLayerWriter {
     218           36 :     pub async fn new(
     219           36 :         conf: &'static PageServerConf,
     220           36 :         timeline_id: TimelineId,
     221           36 :         tenant_shard_id: TenantShardId,
     222           36 :         lsn_range: Range<Lsn>,
     223           36 :         target_layer_size: u64,
     224           36 :     ) -> anyhow::Result<Self> {
     225           36 :         Ok(Self {
     226           36 :             target_layer_size,
     227           36 :             inner: None,
     228           36 :             generated_layers: Vec::new(),
     229           36 :             conf,
     230           36 :             timeline_id,
     231           36 :             tenant_shard_id,
     232           36 :             lsn_range,
     233           36 :             last_key_written: Key::MIN,
     234           36 :         })
     235           36 :     }
     236              : 
     237              :     /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
     238        12134 :     pub async fn put_value_with_discard_fn<D, F>(
     239        12134 :         &mut self,
     240        12134 :         key: Key,
     241        12134 :         lsn: Lsn,
     242        12134 :         val: Value,
     243        12134 :         tline: &Arc<Timeline>,
     244        12134 :         ctx: &RequestContext,
     245        12134 :         discard: D,
     246        12134 :     ) -> anyhow::Result<()>
     247        12134 :     where
     248        12134 :         D: FnOnce(&PersistentLayerKey) -> F,
     249        12134 :         F: Future<Output = bool>,
     250        12134 :     {
     251        12134 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     252        12134 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     253        12134 :         //
     254        12134 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     255        12134 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     256        12134 : 
     257        12134 :         if self.inner.is_none() {
     258           32 :             self.inner = Some((
     259           32 :                 key,
     260           32 :                 DeltaLayerWriter::new(
     261           32 :                     self.conf,
     262           32 :                     self.timeline_id,
     263           32 :                     self.tenant_shard_id,
     264           32 :                     key,
     265           32 :                     self.lsn_range.clone(),
     266           32 :                     ctx,
     267           32 :                 )
     268           16 :                 .await?,
     269              :             ));
     270        12102 :         }
     271        12134 :         let (_, inner) = self.inner.as_mut().unwrap();
     272        12134 : 
     273        12134 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     274        12134 :         if inner.num_keys() >= 1
     275        12102 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     276              :         {
     277         2996 :             if key != self.last_key_written {
     278           14 :                 let next_delta_writer = DeltaLayerWriter::new(
     279           14 :                     self.conf,
     280           14 :                     self.timeline_id,
     281           14 :                     self.tenant_shard_id,
     282           14 :                     key,
     283           14 :                     self.lsn_range.clone(),
     284           14 :                     ctx,
     285           14 :                 )
     286            7 :                 .await?;
     287           14 :                 let (start_key, prev_delta_writer) =
     288           14 :                     std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
     289           14 :                 let layer_key = PersistentLayerKey {
     290           14 :                     key_range: start_key..key,
     291           14 :                     lsn_range: self.lsn_range.clone(),
     292           14 :                     is_delta: true,
     293           14 :                 };
     294           14 :                 if discard(&layer_key).await {
     295            6 :                     drop(prev_delta_writer);
     296            6 :                     self.generated_layers
     297            6 :                         .push(SplitWriterResult::Discarded(layer_key));
     298            6 :                 } else {
     299              :                     // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
     300              :                     // files for `finish_creating`.
     301           20 :                     let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
     302            8 :                     let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
     303            8 :                         Ok(layer) => layer,
     304            0 :                         Err(e) => {
     305            0 :                             tokio::fs::remove_file(&path).await.ok();
     306            0 :                             return Err(e);
     307              :                         }
     308              :                     };
     309            8 :                     self.generated_layers
     310            8 :                         .push(SplitWriterResult::Produced(delta_layer));
     311              :                 }
     312         2982 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     313              :                 // We have to produce a very large file b/c a key is updated too often.
     314            0 :                 anyhow::bail!(
     315            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     316            0 :                     key,
     317            0 :                     inner.estimated_size()
     318            0 :                 );
     319         2982 :             }
     320         9138 :         }
     321        12134 :         self.last_key_written = key;
     322        12134 :         let (_, inner) = self.inner.as_mut().unwrap();
     323        12134 :         inner.put_value(key, lsn, val, ctx).await
     324        12134 :     }
     325              : 
     326         4006 :     pub async fn put_value(
     327         4006 :         &mut self,
     328         4006 :         key: Key,
     329         4006 :         lsn: Lsn,
     330         4006 :         val: Value,
     331         4006 :         tline: &Arc<Timeline>,
     332         4006 :         ctx: &RequestContext,
     333         4006 :     ) -> anyhow::Result<()> {
     334         4006 :         self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
     335          262 :             .await
     336         4006 :     }
     337              : 
     338           32 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     339           32 :         self,
     340           32 :         tline: &Arc<Timeline>,
     341           32 :         ctx: &RequestContext,
     342           32 :         discard: D,
     343           32 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     344           32 :     where
     345           32 :         D: FnOnce(&PersistentLayerKey) -> F,
     346           32 :         F: Future<Output = bool>,
     347           32 :     {
     348           32 :         let Self {
     349           32 :             mut generated_layers,
     350           32 :             inner,
     351           32 :             ..
     352           32 :         } = self;
     353           32 :         let Some((start_key, inner)) = inner else {
     354            4 :             return Ok(generated_layers);
     355              :         };
     356           28 :         if inner.num_keys() == 0 {
     357            0 :             return Ok(generated_layers);
     358           28 :         }
     359           28 :         let end_key = self.last_key_written.next();
     360           28 :         let layer_key = PersistentLayerKey {
     361           28 :             key_range: start_key..end_key,
     362           28 :             lsn_range: self.lsn_range.clone(),
     363           28 :             is_delta: true,
     364           28 :         };
     365           28 :         if discard(&layer_key).await {
     366           10 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     367           10 :         } else {
     368              :             // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
     369              :             // files for `finish_creating`.
     370           48 :             let (desc, path) = inner.finish(end_key, ctx).await?;
     371           18 :             let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
     372           18 :                 Ok(layer) => layer,
     373            0 :                 Err(e) => {
     374            0 :                     tokio::fs::remove_file(&path).await.ok();
     375            0 :                     return Err(e);
     376              :                 }
     377              :             };
     378           18 :             generated_layers.push(SplitWriterResult::Produced(delta_layer));
     379              :         }
     380           28 :         Ok(generated_layers)
     381           32 :     }
     382              : 
     383              :     #[cfg(test)]
     384            6 :     pub(crate) async fn finish(
     385            6 :         self,
     386            6 :         tline: &Arc<Timeline>,
     387            6 :         ctx: &RequestContext,
     388            6 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     389            6 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     390           18 :             .await
     391            6 :     }
     392              : 
     393              :     /// This function will be deprecated with #8841.
     394            4 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
     395            4 :         Ok((self.generated_layers, self.inner.map(|x| x.1)))
     396            4 :     }
     397              : }
     398              : 
     399              : #[cfg(test)]
     400              : mod tests {
     401              :     use itertools::Itertools;
     402              :     use rand::{RngCore, SeedableRng};
     403              : 
     404              :     use crate::{
     405              :         tenant::{
     406              :             harness::{TenantHarness, TIMELINE_ID},
     407              :             storage_layer::AsLayerDesc,
     408              :         },
     409              :         DEFAULT_PG_VERSION,
     410              :     };
     411              : 
     412              :     use super::*;
     413              : 
     414        20052 :     fn get_key(id: u32) -> Key {
     415        20052 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     416        20052 :         key.field6 = id;
     417        20052 :         key
     418        20052 :     }
     419              : 
     420            8 :     fn get_img(id: u32) -> Bytes {
     421            8 :         format!("{id:064}").into()
     422            8 :     }
     423              : 
     424        20004 :     fn get_large_img() -> Bytes {
     425        20004 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     426        20004 :         let mut data = vec![0; 8192];
     427        20004 :         rng.fill_bytes(&mut data);
     428        20004 :         data.into()
     429        20004 :     }
     430              : 
     431              :     #[tokio::test]
     432            2 :     async fn write_one_image() {
     433            2 :         let harness = TenantHarness::create("split_writer_write_one_image")
     434            2 :             .await
     435            2 :             .unwrap();
     436            5 :         let (tenant, ctx) = harness.load().await;
     437            2 : 
     438            2 :         let tline = tenant
     439            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     440            4 :             .await
     441            2 :             .unwrap();
     442            2 : 
     443            2 :         let mut image_writer = SplitImageLayerWriter::new(
     444            2 :             tenant.conf,
     445            2 :             tline.timeline_id,
     446            2 :             tenant.tenant_shard_id,
     447            2 :             get_key(0),
     448            2 :             Lsn(0x18),
     449            2 :             4 * 1024 * 1024,
     450            2 :             &ctx,
     451            2 :         )
     452            2 :         .await
     453            2 :         .unwrap();
     454            2 : 
     455            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     456            2 :             tenant.conf,
     457            2 :             tline.timeline_id,
     458            2 :             tenant.tenant_shard_id,
     459            2 :             Lsn(0x18)..Lsn(0x20),
     460            2 :             4 * 1024 * 1024,
     461            2 :         )
     462            2 :         .await
     463            2 :         .unwrap();
     464            2 : 
     465            2 :         image_writer
     466            2 :             .put_image(get_key(0), get_img(0), &ctx)
     467            2 :             .await
     468            2 :             .unwrap();
     469            2 :         let layers = image_writer
     470            2 :             .finish(&tline, &ctx, get_key(10))
     471            4 :             .await
     472            2 :             .unwrap();
     473            2 :         assert_eq!(layers.len(), 1);
     474            2 : 
     475            2 :         delta_writer
     476            2 :             .put_value(
     477            2 :                 get_key(0),
     478            2 :                 Lsn(0x18),
     479            2 :                 Value::Image(get_img(0)),
     480            2 :                 &tline,
     481            2 :                 &ctx,
     482            2 :             )
     483            2 :             .await
     484            2 :             .unwrap();
     485            5 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     486            2 :         assert_eq!(layers.len(), 1);
     487            2 :         assert_eq!(
     488            2 :             layers
     489            2 :                 .into_iter()
     490            2 :                 .next()
     491            2 :                 .unwrap()
     492            2 :                 .into_resident_layer()
     493            2 :                 .layer_desc()
     494            2 :                 .key(),
     495            2 :             PersistentLayerKey {
     496            2 :                 key_range: get_key(0)..get_key(1),
     497            2 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     498            2 :                 is_delta: true
     499            2 :             }
     500            2 :         );
     501            2 :     }
     502              : 
     503              :     #[tokio::test]
     504            2 :     async fn write_split() {
     505            2 :         // Test the split writer with retaining all the layers we have produced (discard=false)
     506         4370 :         write_split_helper("split_writer_write_split", false).await;
     507            2 :     }
     508              : 
     509              :     #[tokio::test]
     510            2 :     async fn write_split_discard() {
     511            2 :         // Test the split writer with discarding all the layers we have produced (discard=true)
     512         4334 :         write_split_helper("split_writer_write_split_discard", true).await;
     513            2 :     }
     514              : 
     515              :     /// Test the image+delta writer by writing a large number of images and deltas. If discard is
     516              :     /// set to true, all layers will be discarded.
     517            4 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     518            4 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     519           16 :         let (tenant, ctx) = harness.load().await;
     520              : 
     521            4 :         let tline = tenant
     522            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     523            8 :             .await
     524            4 :             .unwrap();
     525              : 
     526            4 :         let mut image_writer = SplitImageLayerWriter::new(
     527            4 :             tenant.conf,
     528            4 :             tline.timeline_id,
     529            4 :             tenant.tenant_shard_id,
     530            4 :             get_key(0),
     531            4 :             Lsn(0x18),
     532            4 :             4 * 1024 * 1024,
     533            4 :             &ctx,
     534            4 :         )
     535            2 :         .await
     536            4 :         .unwrap();
     537            4 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     538            4 :             tenant.conf,
     539            4 :             tline.timeline_id,
     540            4 :             tenant.tenant_shard_id,
     541            4 :             Lsn(0x18)..Lsn(0x20),
     542            4 :             4 * 1024 * 1024,
     543            4 :         )
     544            0 :         .await
     545            4 :         .unwrap();
     546              :         const N: usize = 2000;
     547         8004 :         for i in 0..N {
     548         8000 :             let i = i as u32;
     549         8000 :             image_writer
     550         8000 :                 .put_image(get_key(i), get_large_img(), &ctx)
     551         8130 :                 .await
     552         8000 :                 .unwrap();
     553         8000 :             delta_writer
     554         8000 :                 .put_value_with_discard_fn(
     555         8000 :                     get_key(i),
     556         8000 :                     Lsn(0x20),
     557         8000 :                     Value::Image(get_large_img()),
     558         8000 :                     &tline,
     559         8000 :                     &ctx,
     560         8000 :                     |_| async { discard },
     561         8000 :                 )
     562          527 :                 .await
     563         8000 :                 .unwrap();
     564              :         }
     565            4 :         let image_layers = image_writer
     566           16 :             .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
     567           16 :             .await
     568            4 :             .unwrap();
     569            4 :         let delta_layers = delta_writer
     570            4 :             .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
     571            5 :             .await
     572            4 :             .unwrap();
     573            4 :         let image_layers = image_layers
     574            4 :             .into_iter()
     575           16 :             .map(|x| {
     576           16 :                 if discard {
     577            8 :                     x.into_discarded_layer()
     578              :                 } else {
     579            8 :                     x.into_resident_layer().layer_desc().key()
     580              :                 }
     581           16 :             })
     582            4 :             .collect_vec();
     583            4 :         let delta_layers = delta_layers
     584            4 :             .into_iter()
     585           16 :             .map(|x| {
     586           16 :                 if discard {
     587            8 :                     x.into_discarded_layer()
     588              :                 } else {
     589            8 :                     x.into_resident_layer().layer_desc().key()
     590              :                 }
     591           16 :             })
     592            4 :             .collect_vec();
     593            4 :         assert_eq!(image_layers.len(), N / 512 + 1);
     594            4 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     595            4 :         assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
     596            4 :         assert_eq!(
     597            4 :             delta_layers.last().unwrap().key_range.end,
     598            4 :             get_key(N as u32)
     599            4 :         );
     600           16 :         for idx in 0..image_layers.len() {
     601           16 :             assert_ne!(image_layers[idx].key_range.start, Key::MIN);
     602           16 :             assert_ne!(image_layers[idx].key_range.end, Key::MAX);
     603           16 :             assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
     604           16 :             assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
     605           16 :             if idx > 0 {
     606           12 :                 assert_eq!(
     607           12 :                     image_layers[idx - 1].key_range.end,
     608           12 :                     image_layers[idx].key_range.start
     609           12 :                 );
     610           12 :                 assert_eq!(
     611           12 :                     delta_layers[idx - 1].key_range.end,
     612           12 :                     delta_layers[idx].key_range.start
     613           12 :                 );
     614            4 :             }
     615              :         }
     616            4 :     }
     617              : 
     618              :     #[tokio::test]
     619            2 :     async fn write_large_img() {
     620            2 :         let harness = TenantHarness::create("split_writer_write_large_img")
     621            2 :             .await
     622            2 :             .unwrap();
     623            8 :         let (tenant, ctx) = harness.load().await;
     624            2 : 
     625            2 :         let tline = tenant
     626            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     627            4 :             .await
     628            2 :             .unwrap();
     629            2 : 
     630            2 :         let mut image_writer = SplitImageLayerWriter::new(
     631            2 :             tenant.conf,
     632            2 :             tline.timeline_id,
     633            2 :             tenant.tenant_shard_id,
     634            2 :             get_key(0),
     635            2 :             Lsn(0x18),
     636            2 :             4 * 1024,
     637            2 :             &ctx,
     638            2 :         )
     639            2 :         .await
     640            2 :         .unwrap();
     641            2 : 
     642            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     643            2 :             tenant.conf,
     644            2 :             tline.timeline_id,
     645            2 :             tenant.tenant_shard_id,
     646            2 :             Lsn(0x18)..Lsn(0x20),
     647            2 :             4 * 1024,
     648            2 :         )
     649            2 :         .await
     650            2 :         .unwrap();
     651            2 : 
     652            2 :         image_writer
     653            2 :             .put_image(get_key(0), get_img(0), &ctx)
     654            2 :             .await
     655            2 :             .unwrap();
     656            2 :         image_writer
     657            2 :             .put_image(get_key(1), get_large_img(), &ctx)
     658            3 :             .await
     659            2 :             .unwrap();
     660            2 :         let layers = image_writer
     661            2 :             .finish(&tline, &ctx, get_key(10))
     662            8 :             .await
     663            2 :             .unwrap();
     664            2 :         assert_eq!(layers.len(), 2);
     665            2 : 
     666            2 :         delta_writer
     667            2 :             .put_value(
     668            2 :                 get_key(0),
     669            2 :                 Lsn(0x18),
     670            2 :                 Value::Image(get_img(0)),
     671            2 :                 &tline,
     672            2 :                 &ctx,
     673            2 :             )
     674            2 :             .await
     675            2 :             .unwrap();
     676            2 :         delta_writer
     677            2 :             .put_value(
     678            2 :                 get_key(1),
     679            2 :                 Lsn(0x1A),
     680            2 :                 Value::Image(get_large_img()),
     681            2 :                 &tline,
     682            2 :                 &ctx,
     683            2 :             )
     684            6 :             .await
     685            2 :             .unwrap();
     686            5 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     687            2 :         assert_eq!(layers.len(), 2);
     688            2 :         let mut layers_iter = layers.into_iter();
     689            2 :         assert_eq!(
     690            2 :             layers_iter
     691            2 :                 .next()
     692            2 :                 .unwrap()
     693            2 :                 .into_resident_layer()
     694            2 :                 .layer_desc()
     695            2 :                 .key(),
     696            2 :             PersistentLayerKey {
     697            2 :                 key_range: get_key(0)..get_key(1),
     698            2 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     699            2 :                 is_delta: true
     700            2 :             }
     701            2 :         );
     702            2 :         assert_eq!(
     703            2 :             layers_iter
     704            2 :                 .next()
     705            2 :                 .unwrap()
     706            2 :                 .into_resident_layer()
     707            2 :                 .layer_desc()
     708            2 :                 .key(),
     709            2 :             PersistentLayerKey {
     710            2 :                 key_range: get_key(1)..get_key(2),
     711            2 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     712            2 :                 is_delta: true
     713            2 :             }
     714            2 :         );
     715            2 :     }
     716              : 
     717              :     #[tokio::test]
     718            2 :     async fn write_split_single_key() {
     719            2 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     720            2 :             .await
     721            2 :             .unwrap();
     722            8 :         let (tenant, ctx) = harness.load().await;
     723            2 : 
     724            2 :         let tline = tenant
     725            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     726            4 :             .await
     727            2 :             .unwrap();
     728            2 : 
     729            2 :         const N: usize = 2000;
     730            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     731            2 :             tenant.conf,
     732            2 :             tline.timeline_id,
     733            2 :             tenant.tenant_shard_id,
     734            2 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     735            2 :             4 * 1024 * 1024,
     736            2 :         )
     737            2 :         .await
     738            2 :         .unwrap();
     739            2 : 
     740         4002 :         for i in 0..N {
     741         4000 :             let i = i as u32;
     742         4000 :             delta_writer
     743         4000 :                 .put_value(
     744         4000 :                     get_key(0),
     745         4000 :                     Lsn(i as u64 * 16 + 0x10),
     746         4000 :                     Value::Image(get_large_img()),
     747         4000 :                     &tline,
     748         4000 :                     &ctx,
     749         4000 :                 )
     750          254 :                 .await
     751         4000 :                 .unwrap();
     752            2 :         }
     753            8 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     754            2 :         assert_eq!(delta_layers.len(), 1);
     755            2 :         let delta_layer = delta_layers
     756            2 :             .into_iter()
     757            2 :             .next()
     758            2 :             .unwrap()
     759            2 :             .into_resident_layer();
     760            2 :         assert_eq!(
     761            2 :             delta_layer.layer_desc().key(),
     762            2 :             PersistentLayerKey {
     763            2 :                 key_range: get_key(0)..get_key(1),
     764            2 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     765            2 :                 is_delta: true
     766            2 :             }
     767            2 :         );
     768            2 :     }
     769              : }
        

Generated by: LCOV version 2.1-beta