LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - split_writer.rs (source / functions) Coverage Total Hit
Test: 2453312769e0b6b061a2008879e6693300d0b938.info Lines: 95.2 % 567 540
Test Date: 2024-09-06 16:40:18 Functions: 98.5 % 65 64

            Line data    Source code
       1              : use std::{future::Future, ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
       9              : 
      10              : use super::layer::S3_UPLOAD_LIMIT;
      11              : use super::{
      12              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      13              : };
      14              : 
      15              : pub(crate) enum SplitWriterResult {
      16              :     Produced(ResidentLayer),
      17              :     Discarded(PersistentLayerKey),
      18              : }
      19              : 
      20              : #[cfg(test)]
      21              : impl SplitWriterResult {
      22           96 :     fn into_resident_layer(self) -> ResidentLayer {
      23           96 :         match self {
      24           96 :             SplitWriterResult::Produced(layer) => layer,
      25            0 :             SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      26              :         }
      27           96 :     }
      28              : 
      29            0 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      30            0 :         match self {
      31            0 :             SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
      32            0 :             SplitWriterResult::Discarded(layer) => layer,
      33            0 :         }
      34            0 :     }
      35              : }
      36              : 
      37              : /// An image writer that takes images and produces multiple image layers.
      38              : ///
      39              : /// The interface does not guarantee atomicity (i.e., if the image layer generation
      40              : /// fails, there might be leftover files to be cleaned up)
      41              : #[must_use]
      42              : pub struct SplitImageLayerWriter {
      43              :     inner: ImageLayerWriter,
      44              :     target_layer_size: u64,
      45              :     generated_layers: Vec<SplitWriterResult>,
      46              :     conf: &'static PageServerConf,
      47              :     timeline_id: TimelineId,
      48              :     tenant_shard_id: TenantShardId,
      49              :     lsn: Lsn,
      50              :     start_key: Key,
      51              : }
      52              : 
      53              : impl SplitImageLayerWriter {
      54           96 :     pub async fn new(
      55           96 :         conf: &'static PageServerConf,
      56           96 :         timeline_id: TimelineId,
      57           96 :         tenant_shard_id: TenantShardId,
      58           96 :         start_key: Key,
      59           96 :         lsn: Lsn,
      60           96 :         target_layer_size: u64,
      61           96 :         ctx: &RequestContext,
      62           96 :     ) -> anyhow::Result<Self> {
      63           96 :         Ok(Self {
      64           96 :             target_layer_size,
      65           96 :             inner: ImageLayerWriter::new(
      66           96 :                 conf,
      67           96 :                 timeline_id,
      68           96 :                 tenant_shard_id,
      69           96 :                 &(start_key..Key::MAX),
      70           96 :                 lsn,
      71           96 :                 ctx,
      72           96 :             )
      73           48 :             .await?,
      74           96 :             generated_layers: Vec::new(),
      75           96 :             conf,
      76           96 :             timeline_id,
      77           96 :             tenant_shard_id,
      78           96 :             lsn,
      79           96 :             start_key,
      80              :         })
      81           96 :     }
      82              : 
      83        25242 :     pub async fn put_image_with_discard_fn<D, F>(
      84        25242 :         &mut self,
      85        25242 :         key: Key,
      86        25242 :         img: Bytes,
      87        25242 :         tline: &Arc<Timeline>,
      88        25242 :         ctx: &RequestContext,
      89        25242 :         discard: D,
      90        25242 :     ) -> anyhow::Result<()>
      91        25242 :     where
      92        25242 :         D: FnOnce(&PersistentLayerKey) -> F,
      93        25242 :         F: Future<Output = bool>,
      94        25242 :     {
      95        25242 :         // The current estimation is an upper bound of the space that the key/image could take
      96        25242 :         // because we did not consider compression in this estimation. The resulting image layer
      97        25242 :         // could be smaller than the target size.
      98        25242 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
      99        25242 :         if self.inner.num_keys() >= 1
     100        25146 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     101              :         {
     102           42 :             let next_image_writer = ImageLayerWriter::new(
     103           42 :                 self.conf,
     104           42 :                 self.timeline_id,
     105           42 :                 self.tenant_shard_id,
     106           42 :                 &(key..Key::MAX),
     107           42 :                 self.lsn,
     108           42 :                 ctx,
     109           42 :             )
     110           21 :             .await?;
     111           42 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     112           42 :             let layer_key = PersistentLayerKey {
     113           42 :                 key_range: self.start_key..key,
     114           42 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     115           42 :                 is_delta: false,
     116           42 :             };
     117           42 :             self.start_key = key;
     118           42 : 
     119           42 :             if discard(&layer_key).await {
     120            0 :                 drop(prev_image_writer);
     121            0 :                 self.generated_layers
     122            0 :                     .push(SplitWriterResult::Discarded(layer_key));
     123            0 :             } else {
     124           42 :                 self.generated_layers.push(SplitWriterResult::Produced(
     125           42 :                     prev_image_writer
     126           42 :                         .finish_with_end_key(tline, key, ctx)
     127           90 :                         .await?,
     128              :                 ));
     129              :             }
     130        25200 :         }
     131        25620 :         self.inner.put_image(key, img, ctx).await
     132        25242 :     }
     133              : 
     134              :     #[cfg(test)]
     135           18 :     pub async fn put_image(
     136           18 :         &mut self,
     137           18 :         key: Key,
     138           18 :         img: Bytes,
     139           18 :         tline: &Arc<Timeline>,
     140           18 :         ctx: &RequestContext,
     141           18 :     ) -> anyhow::Result<()> {
     142           18 :         self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
     143           33 :             .await
     144           18 :     }
     145              : 
     146           84 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     147           84 :         self,
     148           84 :         tline: &Arc<Timeline>,
     149           84 :         ctx: &RequestContext,
     150           84 :         end_key: Key,
     151           84 :         discard: D,
     152           84 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     153           84 :     where
     154           84 :         D: FnOnce(&PersistentLayerKey) -> F,
     155           84 :         F: Future<Output = bool>,
     156           84 :     {
     157           84 :         let Self {
     158           84 :             mut generated_layers,
     159           84 :             inner,
     160           84 :             ..
     161           84 :         } = self;
     162           84 :         if inner.num_keys() == 0 {
     163            0 :             return Ok(generated_layers);
     164           84 :         }
     165           84 :         let layer_key = PersistentLayerKey {
     166           84 :             key_range: self.start_key..end_key,
     167           84 :             lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     168           84 :             is_delta: false,
     169           84 :         };
     170           84 :         if discard(&layer_key).await {
     171           24 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     172           24 :         } else {
     173           60 :             generated_layers.push(SplitWriterResult::Produced(
     174          120 :                 inner.finish_with_end_key(tline, end_key, ctx).await?,
     175              :             ));
     176              :         }
     177           84 :         Ok(generated_layers)
     178           84 :     }
     179              : 
     180              :     #[cfg(test)]
     181           24 :     pub(crate) async fn finish(
     182           24 :         self,
     183           24 :         tline: &Arc<Timeline>,
     184           24 :         ctx: &RequestContext,
     185           24 :         end_key: Key,
     186           24 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     187           24 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     188           48 :             .await
     189           24 :     }
     190              : 
     191              :     /// When split writer fails, the caller should call this function and handle partially generated layers.
     192           12 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
     193           12 :         Ok((self.generated_layers, self.inner))
     194           12 :     }
     195              : }
     196              : 
     197              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     198              : ///
     199              : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
     200              : /// there might be leftover files to be cleaned up).
     201              : ///
     202              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     203              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     204              : /// will split them into multiple files based on size.
     205              : #[must_use]
     206              : pub struct SplitDeltaLayerWriter {
     207              :     inner: DeltaLayerWriter,
     208              :     target_layer_size: u64,
     209              :     generated_layers: Vec<SplitWriterResult>,
     210              :     conf: &'static PageServerConf,
     211              :     timeline_id: TimelineId,
     212              :     tenant_shard_id: TenantShardId,
     213              :     lsn_range: Range<Lsn>,
     214              :     last_key_written: Key,
     215              :     start_key: Key,
     216              : }
     217              : 
     218              : impl SplitDeltaLayerWriter {
     219          108 :     pub async fn new(
     220          108 :         conf: &'static PageServerConf,
     221          108 :         timeline_id: TimelineId,
     222          108 :         tenant_shard_id: TenantShardId,
     223          108 :         start_key: Key,
     224          108 :         lsn_range: Range<Lsn>,
     225          108 :         target_layer_size: u64,
     226          108 :         ctx: &RequestContext,
     227          108 :     ) -> anyhow::Result<Self> {
     228          108 :         Ok(Self {
     229          108 :             target_layer_size,
     230          108 :             inner: DeltaLayerWriter::new(
     231          108 :                 conf,
     232          108 :                 timeline_id,
     233          108 :                 tenant_shard_id,
     234          108 :                 start_key,
     235          108 :                 lsn_range.clone(),
     236          108 :                 ctx,
     237          108 :             )
     238           54 :             .await?,
     239          108 :             generated_layers: Vec::new(),
     240          108 :             conf,
     241          108 :             timeline_id,
     242          108 :             tenant_shard_id,
     243          108 :             lsn_range,
     244          108 :             last_key_written: Key::MIN,
     245          108 :             start_key,
     246              :         })
     247          108 :     }
     248              : 
     249              :     /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
     250        36402 :     pub async fn put_value_with_discard_fn<D, F>(
     251        36402 :         &mut self,
     252        36402 :         key: Key,
     253        36402 :         lsn: Lsn,
     254        36402 :         val: Value,
     255        36402 :         tline: &Arc<Timeline>,
     256        36402 :         ctx: &RequestContext,
     257        36402 :         discard: D,
     258        36402 :     ) -> anyhow::Result<()>
     259        36402 :     where
     260        36402 :         D: FnOnce(&PersistentLayerKey) -> F,
     261        36402 :         F: Future<Output = bool>,
     262        36402 :     {
     263        36402 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     264        36402 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     265        36402 :         //
     266        36402 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     267        36402 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     268        36402 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     269        36402 :         if self.inner.num_keys() >= 1
     270        36306 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     271              :         {
     272         8988 :             if key != self.last_key_written {
     273           42 :                 let next_delta_writer = DeltaLayerWriter::new(
     274           42 :                     self.conf,
     275           42 :                     self.timeline_id,
     276           42 :                     self.tenant_shard_id,
     277           42 :                     key,
     278           42 :                     self.lsn_range.clone(),
     279           42 :                     ctx,
     280           42 :                 )
     281           21 :                 .await?;
     282           42 :                 let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
     283           42 :                 let layer_key = PersistentLayerKey {
     284           42 :                     key_range: self.start_key..key,
     285           42 :                     lsn_range: self.lsn_range.clone(),
     286           42 :                     is_delta: true,
     287           42 :                 };
     288           42 :                 self.start_key = key;
     289           42 :                 if discard(&layer_key).await {
     290            0 :                     drop(prev_delta_writer);
     291            0 :                     self.generated_layers
     292            0 :                         .push(SplitWriterResult::Discarded(layer_key));
     293            0 :                 } else {
     294          111 :                     let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
     295           42 :                     let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     296           42 :                     self.generated_layers
     297           42 :                         .push(SplitWriterResult::Produced(delta_layer));
     298              :                 }
     299         8946 :             } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
     300              :                 // We have to produce a very large file b/c a key is updated too often.
     301            0 :                 anyhow::bail!(
     302            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     303            0 :                     key,
     304            0 :                     self.inner.estimated_size()
     305            0 :                 );
     306         8946 :             }
     307        27414 :         }
     308        36402 :         self.last_key_written = key;
     309        36402 :         self.inner.put_value(key, lsn, val, ctx).await
     310        36402 :     }
     311              : 
     312        12018 :     pub async fn put_value(
     313        12018 :         &mut self,
     314        12018 :         key: Key,
     315        12018 :         lsn: Lsn,
     316        12018 :         val: Value,
     317        12018 :         tline: &Arc<Timeline>,
     318        12018 :         ctx: &RequestContext,
     319        12018 :     ) -> anyhow::Result<()> {
     320        12018 :         self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
     321          777 :             .await
     322        12018 :     }
     323              : 
     324           96 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     325           96 :         self,
     326           96 :         tline: &Arc<Timeline>,
     327           96 :         ctx: &RequestContext,
     328           96 :         end_key: Key,
     329           96 :         discard: D,
     330           96 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     331           96 :     where
     332           96 :         D: FnOnce(&PersistentLayerKey) -> F,
     333           96 :         F: Future<Output = bool>,
     334           96 :     {
     335           96 :         let Self {
     336           96 :             mut generated_layers,
     337           96 :             inner,
     338           96 :             ..
     339           96 :         } = self;
     340           96 :         if inner.num_keys() == 0 {
     341           12 :             return Ok(generated_layers);
     342           84 :         }
     343           84 :         let layer_key = PersistentLayerKey {
     344           84 :             key_range: self.start_key..end_key,
     345           84 :             lsn_range: self.lsn_range.clone(),
     346           84 :             is_delta: true,
     347           84 :         };
     348           84 :         if discard(&layer_key).await {
     349           24 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     350           24 :         } else {
     351          162 :             let (desc, path) = inner.finish(end_key, ctx).await?;
     352           60 :             let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     353           60 :             generated_layers.push(SplitWriterResult::Produced(delta_layer));
     354              :         }
     355           84 :         Ok(generated_layers)
     356           96 :     }
     357              : 
     358              :     #[cfg(test)]
     359           30 :     pub(crate) async fn finish(
     360           30 :         self,
     361           30 :         tline: &Arc<Timeline>,
     362           30 :         ctx: &RequestContext,
     363           30 :         end_key: Key,
     364           30 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     365           30 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     366           84 :             .await
     367           30 :     }
     368              : 
     369              :     /// When split writer fails, the caller should call this function and handle partially generated layers.
     370           12 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
     371           12 :         Ok((self.generated_layers, self.inner))
     372           12 :     }
     373              : }
     374              : 
     375              : #[cfg(test)]
     376              : mod tests {
     377              :     use itertools::Itertools;
     378              :     use rand::{RngCore, SeedableRng};
     379              : 
     380              :     use crate::{
     381              :         tenant::{
     382              :             harness::{TenantHarness, TIMELINE_ID},
     383              :             storage_layer::AsLayerDesc,
     384              :         },
     385              :         DEFAULT_PG_VERSION,
     386              :     };
     387              : 
     388              :     use super::*;
     389              : 
     390        60144 :     fn get_key(id: u32) -> Key {
     391        60144 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     392        60144 :         key.field6 = id;
     393        60144 :         key
     394        60144 :     }
     395              : 
     396           24 :     fn get_img(id: u32) -> Bytes {
     397           24 :         format!("{id:064}").into()
     398           24 :     }
     399              : 
     400        60012 :     fn get_large_img() -> Bytes {
     401        60012 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     402        60012 :         let mut data = vec![0; 8192];
     403        60012 :         rng.fill_bytes(&mut data);
     404        60012 :         data.into()
     405        60012 :     }
     406              : 
     407              :     #[tokio::test]
     408            6 :     async fn write_one_image() {
     409            6 :         let harness = TenantHarness::create("split_writer_write_one_image")
     410            6 :             .await
     411            6 :             .unwrap();
     412           24 :         let (tenant, ctx) = harness.load().await;
     413            6 : 
     414            6 :         let tline = tenant
     415            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     416           12 :             .await
     417            6 :             .unwrap();
     418            6 : 
     419            6 :         let mut image_writer = SplitImageLayerWriter::new(
     420            6 :             tenant.conf,
     421            6 :             tline.timeline_id,
     422            6 :             tenant.tenant_shard_id,
     423            6 :             get_key(0),
     424            6 :             Lsn(0x18),
     425            6 :             4 * 1024 * 1024,
     426            6 :             &ctx,
     427            6 :         )
     428            6 :         .await
     429            6 :         .unwrap();
     430            6 : 
     431            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     432            6 :             tenant.conf,
     433            6 :             tline.timeline_id,
     434            6 :             tenant.tenant_shard_id,
     435            6 :             get_key(0),
     436            6 :             Lsn(0x18)..Lsn(0x20),
     437            6 :             4 * 1024 * 1024,
     438            6 :             &ctx,
     439            6 :         )
     440            6 :         .await
     441            6 :         .unwrap();
     442            6 : 
     443            6 :         image_writer
     444            6 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     445            6 :             .await
     446            6 :             .unwrap();
     447            6 :         let layers = image_writer
     448            6 :             .finish(&tline, &ctx, get_key(10))
     449           12 :             .await
     450            6 :             .unwrap();
     451            6 :         assert_eq!(layers.len(), 1);
     452            6 : 
     453            6 :         delta_writer
     454            6 :             .put_value(
     455            6 :                 get_key(0),
     456            6 :                 Lsn(0x18),
     457            6 :                 Value::Image(get_img(0)),
     458            6 :                 &tline,
     459            6 :                 &ctx,
     460            6 :             )
     461            6 :             .await
     462            6 :             .unwrap();
     463            6 :         let layers = delta_writer
     464            6 :             .finish(&tline, &ctx, get_key(10))
     465           15 :             .await
     466            6 :             .unwrap();
     467            6 :         assert_eq!(layers.len(), 1);
     468            6 :     }
     469              : 
     470              :     #[tokio::test]
     471            6 :     async fn write_split() {
     472        13110 :         write_split_helper("split_writer_write_split", false).await;
     473            6 :     }
     474              : 
     475              :     #[tokio::test]
     476            6 :     async fn write_split_discard() {
     477        13110 :         write_split_helper("split_writer_write_split_discard", false).await;
     478            6 :     }
     479              : 
     480           12 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     481           12 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     482           48 :         let (tenant, ctx) = harness.load().await;
     483              : 
     484           12 :         let tline = tenant
     485           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     486           24 :             .await
     487           12 :             .unwrap();
     488              : 
     489           12 :         let mut image_writer = SplitImageLayerWriter::new(
     490           12 :             tenant.conf,
     491           12 :             tline.timeline_id,
     492           12 :             tenant.tenant_shard_id,
     493           12 :             get_key(0),
     494           12 :             Lsn(0x18),
     495           12 :             4 * 1024 * 1024,
     496           12 :             &ctx,
     497           12 :         )
     498            6 :         .await
     499           12 :         .unwrap();
     500           12 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     501           12 :             tenant.conf,
     502           12 :             tline.timeline_id,
     503           12 :             tenant.tenant_shard_id,
     504           12 :             get_key(0),
     505           12 :             Lsn(0x18)..Lsn(0x20),
     506           12 :             4 * 1024 * 1024,
     507           12 :             &ctx,
     508           12 :         )
     509            6 :         .await
     510           12 :         .unwrap();
     511              :         const N: usize = 2000;
     512        24012 :         for i in 0..N {
     513        24000 :             let i = i as u32;
     514        24000 :             image_writer
     515        24000 :                 .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
     516           36 :                     discard
     517        24000 :                 })
     518        24456 :                 .await
     519        24000 :                 .unwrap();
     520        24000 :             delta_writer
     521        24000 :                 .put_value_with_discard_fn(
     522        24000 :                     get_key(i),
     523        24000 :                     Lsn(0x20),
     524        24000 :                     Value::Image(get_large_img()),
     525        24000 :                     &tline,
     526        24000 :                     &ctx,
     527        24000 :                     |_| async { discard },
     528        24000 :                 )
     529         1626 :                 .await
     530        24000 :                 .unwrap();
     531              :         }
     532           12 :         let image_layers = image_writer
     533           12 :             .finish(&tline, &ctx, get_key(N as u32))
     534           24 :             .await
     535           12 :             .unwrap();
     536           12 :         let delta_layers = delta_writer
     537           12 :             .finish(&tline, &ctx, get_key(N as u32))
     538           30 :             .await
     539           12 :             .unwrap();
     540           12 :         if discard {
     541            0 :             for layer in image_layers {
     542            0 :                 layer.into_discarded_layer();
     543            0 :             }
     544            0 :             for layer in delta_layers {
     545            0 :                 layer.into_discarded_layer();
     546            0 :             }
     547              :         } else {
     548           12 :             let image_layers = image_layers
     549           12 :                 .into_iter()
     550           48 :                 .map(|x| x.into_resident_layer())
     551           12 :                 .collect_vec();
     552           12 :             let delta_layers = delta_layers
     553           12 :                 .into_iter()
     554           48 :                 .map(|x| x.into_resident_layer())
     555           12 :                 .collect_vec();
     556           12 :             assert_eq!(image_layers.len(), N / 512 + 1);
     557           12 :             assert_eq!(delta_layers.len(), N / 512 + 1);
     558           48 :             for idx in 0..image_layers.len() {
     559           48 :                 assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
     560           48 :                 assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
     561           48 :                 assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
     562           48 :                 assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
     563           48 :                 if idx > 0 {
     564           36 :                     assert_eq!(
     565           36 :                         image_layers[idx - 1].layer_desc().key_range.end,
     566           36 :                         image_layers[idx].layer_desc().key_range.start
     567           36 :                     );
     568           36 :                     assert_eq!(
     569           36 :                         delta_layers[idx - 1].layer_desc().key_range.end,
     570           36 :                         delta_layers[idx].layer_desc().key_range.start
     571           36 :                     );
     572           12 :                 }
     573              :             }
     574              :         }
     575           12 :     }
     576              : 
     577              :     #[tokio::test]
     578            6 :     async fn write_large_img() {
     579            6 :         let harness = TenantHarness::create("split_writer_write_large_img")
     580            6 :             .await
     581            6 :             .unwrap();
     582           24 :         let (tenant, ctx) = harness.load().await;
     583            6 : 
     584            6 :         let tline = tenant
     585            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     586           12 :             .await
     587            6 :             .unwrap();
     588            6 : 
     589            6 :         let mut image_writer = SplitImageLayerWriter::new(
     590            6 :             tenant.conf,
     591            6 :             tline.timeline_id,
     592            6 :             tenant.tenant_shard_id,
     593            6 :             get_key(0),
     594            6 :             Lsn(0x18),
     595            6 :             4 * 1024,
     596            6 :             &ctx,
     597            6 :         )
     598            6 :         .await
     599            6 :         .unwrap();
     600            6 : 
     601            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     602            6 :             tenant.conf,
     603            6 :             tline.timeline_id,
     604            6 :             tenant.tenant_shard_id,
     605            6 :             get_key(0),
     606            6 :             Lsn(0x18)..Lsn(0x20),
     607            6 :             4 * 1024,
     608            6 :             &ctx,
     609            6 :         )
     610            6 :         .await
     611            6 :         .unwrap();
     612            6 : 
     613            6 :         image_writer
     614            6 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     615            6 :             .await
     616            6 :             .unwrap();
     617            6 :         image_writer
     618            6 :             .put_image(get_key(1), get_large_img(), &tline, &ctx)
     619           21 :             .await
     620            6 :             .unwrap();
     621            6 :         let layers = image_writer
     622            6 :             .finish(&tline, &ctx, get_key(10))
     623           12 :             .await
     624            6 :             .unwrap();
     625            6 :         assert_eq!(layers.len(), 2);
     626            6 : 
     627            6 :         delta_writer
     628            6 :             .put_value(
     629            6 :                 get_key(0),
     630            6 :                 Lsn(0x18),
     631            6 :                 Value::Image(get_img(0)),
     632            6 :                 &tline,
     633            6 :                 &ctx,
     634            6 :             )
     635            6 :             .await
     636            6 :             .unwrap();
     637            6 :         delta_writer
     638            6 :             .put_value(
     639            6 :                 get_key(1),
     640            6 :                 Lsn(0x1A),
     641            6 :                 Value::Image(get_large_img()),
     642            6 :                 &tline,
     643            6 :                 &ctx,
     644            6 :             )
     645           18 :             .await
     646            6 :             .unwrap();
     647            6 :         let layers = delta_writer
     648            6 :             .finish(&tline, &ctx, get_key(10))
     649           15 :             .await
     650            6 :             .unwrap();
     651            6 :         assert_eq!(layers.len(), 2);
     652            6 :     }
     653              : 
     654              :     #[tokio::test]
     655            6 :     async fn write_split_single_key() {
     656            6 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     657            6 :             .await
     658            6 :             .unwrap();
     659           24 :         let (tenant, ctx) = harness.load().await;
     660            6 : 
     661            6 :         let tline = tenant
     662            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     663           12 :             .await
     664            6 :             .unwrap();
     665            6 : 
     666            6 :         const N: usize = 2000;
     667            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     668            6 :             tenant.conf,
     669            6 :             tline.timeline_id,
     670            6 :             tenant.tenant_shard_id,
     671            6 :             get_key(0),
     672            6 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     673            6 :             4 * 1024 * 1024,
     674            6 :             &ctx,
     675            6 :         )
     676            6 :         .await
     677            6 :         .unwrap();
     678            6 : 
     679        12006 :         for i in 0..N {
     680        12000 :             let i = i as u32;
     681        12000 :             delta_writer
     682        12000 :                 .put_value(
     683        12000 :                     get_key(0),
     684        12000 :                     Lsn(i as u64 * 16 + 0x10),
     685        12000 :                     Value::Image(get_large_img()),
     686        12000 :                     &tline,
     687        12000 :                     &ctx,
     688        12000 :                 )
     689          759 :                 .await
     690        12000 :                 .unwrap();
     691            6 :         }
     692            6 :         let delta_layers = delta_writer
     693            6 :             .finish(&tline, &ctx, get_key(N as u32))
     694           24 :             .await
     695            6 :             .unwrap();
     696            6 :         assert_eq!(delta_layers.len(), 1);
     697            6 :     }
     698              : }
        

Generated by: LCOV version 2.1-beta