LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - split_writer.rs (source / functions) Coverage Total Hit
Test: 2a9d99866121f170b43760bd62e1e2431e597707.info Lines: 95.2 % 567 540
Test Date: 2024-09-02 14:10:37 Functions: 98.5 % 65 64

            Line data    Source code
       1              : use std::{future::Future, ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
       9              : 
      10              : use super::layer::S3_UPLOAD_LIMIT;
      11              : use super::{
      12              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      13              : };
      14              : 
      15              : pub(crate) enum SplitWriterResult {
      16              :     Produced(ResidentLayer),
      17              :     Discarded(PersistentLayerKey),
      18              : }
      19              : 
      20              : #[cfg(test)]
      21              : impl SplitWriterResult {
      22           96 :     fn into_resident_layer(self) -> ResidentLayer {
      23           96 :         match self {
      24           96 :             SplitWriterResult::Produced(layer) => layer,
      25            0 :             SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      26              :         }
      27           96 :     }
      28              : 
      29            0 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      30            0 :         match self {
      31            0 :             SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
      32            0 :             SplitWriterResult::Discarded(layer) => layer,
      33            0 :         }
      34            0 :     }
      35              : }
      36              : 
      37              : /// An image writer that takes images and produces multiple image layers. The interface does not
      38              : /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
      39              : /// to be cleaned up)
      40              : #[must_use]
      41              : pub struct SplitImageLayerWriter {
      42              :     inner: ImageLayerWriter,
      43              :     target_layer_size: u64,
      44              :     generated_layers: Vec<SplitWriterResult>,
      45              :     conf: &'static PageServerConf,
      46              :     timeline_id: TimelineId,
      47              :     tenant_shard_id: TenantShardId,
      48              :     lsn: Lsn,
      49              :     start_key: Key,
      50              : }
      51              : 
      52              : impl SplitImageLayerWriter {
      53           96 :     pub async fn new(
      54           96 :         conf: &'static PageServerConf,
      55           96 :         timeline_id: TimelineId,
      56           96 :         tenant_shard_id: TenantShardId,
      57           96 :         start_key: Key,
      58           96 :         lsn: Lsn,
      59           96 :         target_layer_size: u64,
      60           96 :         ctx: &RequestContext,
      61           96 :     ) -> anyhow::Result<Self> {
      62           96 :         Ok(Self {
      63           96 :             target_layer_size,
      64           96 :             inner: ImageLayerWriter::new(
      65           96 :                 conf,
      66           96 :                 timeline_id,
      67           96 :                 tenant_shard_id,
      68           96 :                 &(start_key..Key::MAX),
      69           96 :                 lsn,
      70           96 :                 ctx,
      71           96 :             )
      72           48 :             .await?,
      73           96 :             generated_layers: Vec::new(),
      74           96 :             conf,
      75           96 :             timeline_id,
      76           96 :             tenant_shard_id,
      77           96 :             lsn,
      78           96 :             start_key,
      79              :         })
      80           96 :     }
      81              : 
      82        25242 :     pub async fn put_image_with_discard_fn<D, F>(
      83        25242 :         &mut self,
      84        25242 :         key: Key,
      85        25242 :         img: Bytes,
      86        25242 :         tline: &Arc<Timeline>,
      87        25242 :         ctx: &RequestContext,
      88        25242 :         discard: D,
      89        25242 :     ) -> anyhow::Result<()>
      90        25242 :     where
      91        25242 :         D: FnOnce(&PersistentLayerKey) -> F,
      92        25242 :         F: Future<Output = bool>,
      93        25242 :     {
      94        25242 :         // The current estimation is an upper bound of the space that the key/image could take
      95        25242 :         // because we did not consider compression in this estimation. The resulting image layer
      96        25242 :         // could be smaller than the target size.
      97        25242 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
      98        25242 :         if self.inner.num_keys() >= 1
      99        25146 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     100              :         {
     101           42 :             let next_image_writer = ImageLayerWriter::new(
     102           42 :                 self.conf,
     103           42 :                 self.timeline_id,
     104           42 :                 self.tenant_shard_id,
     105           42 :                 &(key..Key::MAX),
     106           42 :                 self.lsn,
     107           42 :                 ctx,
     108           42 :             )
     109           21 :             .await?;
     110           42 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     111           42 :             let layer_key = PersistentLayerKey {
     112           42 :                 key_range: self.start_key..key,
     113           42 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     114           42 :                 is_delta: false,
     115           42 :             };
     116           42 :             self.start_key = key;
     117           42 : 
     118           42 :             if discard(&layer_key).await {
     119            0 :                 drop(prev_image_writer);
     120            0 :                 self.generated_layers
     121            0 :                     .push(SplitWriterResult::Discarded(layer_key));
     122            0 :             } else {
     123           42 :                 self.generated_layers.push(SplitWriterResult::Produced(
     124           42 :                     prev_image_writer
     125           42 :                         .finish_with_end_key(tline, key, ctx)
     126           90 :                         .await?,
     127              :                 ));
     128              :             }
     129        25200 :         }
     130        25620 :         self.inner.put_image(key, img, ctx).await
     131        25242 :     }
     132              : 
     133              :     #[cfg(test)]
     134           18 :     pub async fn put_image(
     135           18 :         &mut self,
     136           18 :         key: Key,
     137           18 :         img: Bytes,
     138           18 :         tline: &Arc<Timeline>,
     139           18 :         ctx: &RequestContext,
     140           18 :     ) -> anyhow::Result<()> {
     141           18 :         self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
     142           33 :             .await
     143           18 :     }
     144              : 
     145           84 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     146           84 :         self,
     147           84 :         tline: &Arc<Timeline>,
     148           84 :         ctx: &RequestContext,
     149           84 :         end_key: Key,
     150           84 :         discard: D,
     151           84 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     152           84 :     where
     153           84 :         D: FnOnce(&PersistentLayerKey) -> F,
     154           84 :         F: Future<Output = bool>,
     155           84 :     {
     156           84 :         let Self {
     157           84 :             mut generated_layers,
     158           84 :             inner,
     159           84 :             ..
     160           84 :         } = self;
     161           84 :         if inner.num_keys() == 0 {
     162            0 :             return Ok(generated_layers);
     163           84 :         }
     164           84 :         let layer_key = PersistentLayerKey {
     165           84 :             key_range: self.start_key..end_key,
     166           84 :             lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     167           84 :             is_delta: false,
     168           84 :         };
     169           84 :         if discard(&layer_key).await {
     170           24 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     171           24 :         } else {
     172           60 :             generated_layers.push(SplitWriterResult::Produced(
     173          120 :                 inner.finish_with_end_key(tline, end_key, ctx).await?,
     174              :             ));
     175              :         }
     176           84 :         Ok(generated_layers)
     177           84 :     }
     178              : 
     179              :     #[cfg(test)]
     180           24 :     pub(crate) async fn finish(
     181           24 :         self,
     182           24 :         tline: &Arc<Timeline>,
     183           24 :         ctx: &RequestContext,
     184           24 :         end_key: Key,
     185           24 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     186           24 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     187           48 :             .await
     188           24 :     }
     189              : 
     190              :     /// When split writer fails, the caller should call this function and handle partially generated layers.
     191           12 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
     192           12 :         Ok((self.generated_layers, self.inner))
     193           12 :     }
     194              : }
     195              : 
     196              : /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
     197              : /// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
     198              : /// to be cleaned up).
     199              : ///
     200              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     201              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     202              : /// will split them into multiple files based on size.
     203              : #[must_use]
     204              : pub struct SplitDeltaLayerWriter {
     205              :     inner: DeltaLayerWriter,
     206              :     target_layer_size: u64,
     207              :     generated_layers: Vec<SplitWriterResult>,
     208              :     conf: &'static PageServerConf,
     209              :     timeline_id: TimelineId,
     210              :     tenant_shard_id: TenantShardId,
     211              :     lsn_range: Range<Lsn>,
     212              :     last_key_written: Key,
     213              :     start_key: Key,
     214              : }
     215              : 
     216              : impl SplitDeltaLayerWriter {
     217          108 :     pub async fn new(
     218          108 :         conf: &'static PageServerConf,
     219          108 :         timeline_id: TimelineId,
     220          108 :         tenant_shard_id: TenantShardId,
     221          108 :         start_key: Key,
     222          108 :         lsn_range: Range<Lsn>,
     223          108 :         target_layer_size: u64,
     224          108 :         ctx: &RequestContext,
     225          108 :     ) -> anyhow::Result<Self> {
     226          108 :         Ok(Self {
     227          108 :             target_layer_size,
     228          108 :             inner: DeltaLayerWriter::new(
     229          108 :                 conf,
     230          108 :                 timeline_id,
     231          108 :                 tenant_shard_id,
     232          108 :                 start_key,
     233          108 :                 lsn_range.clone(),
     234          108 :                 ctx,
     235          108 :             )
     236           54 :             .await?,
     237          108 :             generated_layers: Vec::new(),
     238          108 :             conf,
     239          108 :             timeline_id,
     240          108 :             tenant_shard_id,
     241          108 :             lsn_range,
     242          108 :             last_key_written: Key::MIN,
     243          108 :             start_key,
     244              :         })
     245          108 :     }
     246              : 
     247              :     /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
     248        36402 :     pub async fn put_value_with_discard_fn<D, F>(
     249        36402 :         &mut self,
     250        36402 :         key: Key,
     251        36402 :         lsn: Lsn,
     252        36402 :         val: Value,
     253        36402 :         tline: &Arc<Timeline>,
     254        36402 :         ctx: &RequestContext,
     255        36402 :         discard: D,
     256        36402 :     ) -> anyhow::Result<()>
     257        36402 :     where
     258        36402 :         D: FnOnce(&PersistentLayerKey) -> F,
     259        36402 :         F: Future<Output = bool>,
     260        36402 :     {
     261        36402 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     262        36402 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     263        36402 :         //
     264        36402 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     265        36402 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     266        36402 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     267        36402 :         if self.inner.num_keys() >= 1
     268        36306 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     269              :         {
     270         8988 :             if key != self.last_key_written {
     271           42 :                 let next_delta_writer = DeltaLayerWriter::new(
     272           42 :                     self.conf,
     273           42 :                     self.timeline_id,
     274           42 :                     self.tenant_shard_id,
     275           42 :                     key,
     276           42 :                     self.lsn_range.clone(),
     277           42 :                     ctx,
     278           42 :                 )
     279           21 :                 .await?;
     280           42 :                 let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
     281           42 :                 let layer_key = PersistentLayerKey {
     282           42 :                     key_range: self.start_key..key,
     283           42 :                     lsn_range: self.lsn_range.clone(),
     284           42 :                     is_delta: true,
     285           42 :                 };
     286           42 :                 self.start_key = key;
     287           42 :                 if discard(&layer_key).await {
     288            0 :                     drop(prev_delta_writer);
     289            0 :                     self.generated_layers
     290            0 :                         .push(SplitWriterResult::Discarded(layer_key));
     291            0 :                 } else {
     292          111 :                     let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
     293           42 :                     let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     294           42 :                     self.generated_layers
     295           42 :                         .push(SplitWriterResult::Produced(delta_layer));
     296              :                 }
     297         8946 :             } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
     298              :                 // We have to produce a very large file b/c a key is updated too often.
     299            0 :                 anyhow::bail!(
     300            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     301            0 :                     key,
     302            0 :                     self.inner.estimated_size()
     303            0 :                 );
     304         8946 :             }
     305        27414 :         }
     306        36402 :         self.last_key_written = key;
     307        36402 :         self.inner.put_value(key, lsn, val, ctx).await
     308        36402 :     }
     309              : 
     310        12018 :     pub async fn put_value(
     311        12018 :         &mut self,
     312        12018 :         key: Key,
     313        12018 :         lsn: Lsn,
     314        12018 :         val: Value,
     315        12018 :         tline: &Arc<Timeline>,
     316        12018 :         ctx: &RequestContext,
     317        12018 :     ) -> anyhow::Result<()> {
     318        12018 :         self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
     319          777 :             .await
     320        12018 :     }
     321              : 
     322           96 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     323           96 :         self,
     324           96 :         tline: &Arc<Timeline>,
     325           96 :         ctx: &RequestContext,
     326           96 :         end_key: Key,
     327           96 :         discard: D,
     328           96 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     329           96 :     where
     330           96 :         D: FnOnce(&PersistentLayerKey) -> F,
     331           96 :         F: Future<Output = bool>,
     332           96 :     {
     333           96 :         let Self {
     334           96 :             mut generated_layers,
     335           96 :             inner,
     336           96 :             ..
     337           96 :         } = self;
     338           96 :         if inner.num_keys() == 0 {
     339           12 :             return Ok(generated_layers);
     340           84 :         }
     341           84 :         let layer_key = PersistentLayerKey {
     342           84 :             key_range: self.start_key..end_key,
     343           84 :             lsn_range: self.lsn_range.clone(),
     344           84 :             is_delta: true,
     345           84 :         };
     346           84 :         if discard(&layer_key).await {
     347           24 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     348           24 :         } else {
     349          162 :             let (desc, path) = inner.finish(end_key, ctx).await?;
     350           60 :             let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     351           60 :             generated_layers.push(SplitWriterResult::Produced(delta_layer));
     352              :         }
     353           84 :         Ok(generated_layers)
     354           96 :     }
     355              : 
     356              :     #[allow(dead_code)]
     357           30 :     pub(crate) async fn finish(
     358           30 :         self,
     359           30 :         tline: &Arc<Timeline>,
     360           30 :         ctx: &RequestContext,
     361           30 :         end_key: Key,
     362           30 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     363           30 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     364           84 :             .await
     365           30 :     }
     366              : 
     367              :     /// When split writer fails, the caller should call this function and handle partially generated layers.
     368           12 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
     369           12 :         Ok((self.generated_layers, self.inner))
     370           12 :     }
     371              : }
     372              : 
     373              : #[cfg(test)]
     374              : mod tests {
     375              :     use itertools::Itertools;
     376              :     use rand::{RngCore, SeedableRng};
     377              : 
     378              :     use crate::{
     379              :         tenant::{
     380              :             harness::{TenantHarness, TIMELINE_ID},
     381              :             storage_layer::AsLayerDesc,
     382              :         },
     383              :         DEFAULT_PG_VERSION,
     384              :     };
     385              : 
     386              :     use super::*;
     387              : 
     388        60144 :     fn get_key(id: u32) -> Key {
     389        60144 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     390        60144 :         key.field6 = id;
     391        60144 :         key
     392        60144 :     }
     393              : 
     394           24 :     fn get_img(id: u32) -> Bytes {
     395           24 :         format!("{id:064}").into()
     396           24 :     }
     397              : 
     398        60012 :     fn get_large_img() -> Bytes {
     399        60012 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     400        60012 :         let mut data = vec![0; 8192];
     401        60012 :         rng.fill_bytes(&mut data);
     402        60012 :         data.into()
     403        60012 :     }
     404              : 
     405              :     #[tokio::test]
     406            6 :     async fn write_one_image() {
     407            6 :         let harness = TenantHarness::create("split_writer_write_one_image")
     408            6 :             .await
     409            6 :             .unwrap();
     410           24 :         let (tenant, ctx) = harness.load().await;
     411            6 : 
     412            6 :         let tline = tenant
     413            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     414           12 :             .await
     415            6 :             .unwrap();
     416            6 : 
     417            6 :         let mut image_writer = SplitImageLayerWriter::new(
     418            6 :             tenant.conf,
     419            6 :             tline.timeline_id,
     420            6 :             tenant.tenant_shard_id,
     421            6 :             get_key(0),
     422            6 :             Lsn(0x18),
     423            6 :             4 * 1024 * 1024,
     424            6 :             &ctx,
     425            6 :         )
     426            6 :         .await
     427            6 :         .unwrap();
     428            6 : 
     429            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     430            6 :             tenant.conf,
     431            6 :             tline.timeline_id,
     432            6 :             tenant.tenant_shard_id,
     433            6 :             get_key(0),
     434            6 :             Lsn(0x18)..Lsn(0x20),
     435            6 :             4 * 1024 * 1024,
     436            6 :             &ctx,
     437            6 :         )
     438            6 :         .await
     439            6 :         .unwrap();
     440            6 : 
     441            6 :         image_writer
     442            6 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     443            6 :             .await
     444            6 :             .unwrap();
     445            6 :         let layers = image_writer
     446            6 :             .finish(&tline, &ctx, get_key(10))
     447           12 :             .await
     448            6 :             .unwrap();
     449            6 :         assert_eq!(layers.len(), 1);
     450            6 : 
     451            6 :         delta_writer
     452            6 :             .put_value(
     453            6 :                 get_key(0),
     454            6 :                 Lsn(0x18),
     455            6 :                 Value::Image(get_img(0)),
     456            6 :                 &tline,
     457            6 :                 &ctx,
     458            6 :             )
     459            6 :             .await
     460            6 :             .unwrap();
     461            6 :         let layers = delta_writer
     462            6 :             .finish(&tline, &ctx, get_key(10))
     463           15 :             .await
     464            6 :             .unwrap();
     465            6 :         assert_eq!(layers.len(), 1);
     466            6 :     }
     467              : 
     468              :     #[tokio::test]
     469            6 :     async fn write_split() {
     470        13110 :         write_split_helper("split_writer_write_split", false).await;
     471            6 :     }
     472              : 
     473              :     #[tokio::test]
     474            6 :     async fn write_split_discard() {
     475        13110 :         write_split_helper("split_writer_write_split_discard", false).await;
     476            6 :     }
     477              : 
     478           12 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     479           12 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     480           48 :         let (tenant, ctx) = harness.load().await;
     481              : 
     482           12 :         let tline = tenant
     483           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     484           24 :             .await
     485           12 :             .unwrap();
     486              : 
     487           12 :         let mut image_writer = SplitImageLayerWriter::new(
     488           12 :             tenant.conf,
     489           12 :             tline.timeline_id,
     490           12 :             tenant.tenant_shard_id,
     491           12 :             get_key(0),
     492           12 :             Lsn(0x18),
     493           12 :             4 * 1024 * 1024,
     494           12 :             &ctx,
     495           12 :         )
     496            6 :         .await
     497           12 :         .unwrap();
     498           12 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     499           12 :             tenant.conf,
     500           12 :             tline.timeline_id,
     501           12 :             tenant.tenant_shard_id,
     502           12 :             get_key(0),
     503           12 :             Lsn(0x18)..Lsn(0x20),
     504           12 :             4 * 1024 * 1024,
     505           12 :             &ctx,
     506           12 :         )
     507            6 :         .await
     508           12 :         .unwrap();
     509              :         const N: usize = 2000;
     510        24012 :         for i in 0..N {
     511        24000 :             let i = i as u32;
     512        24000 :             image_writer
     513        24000 :                 .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
     514           36 :                     discard
     515        24000 :                 })
     516        24456 :                 .await
     517        24000 :                 .unwrap();
     518        24000 :             delta_writer
     519        24000 :                 .put_value_with_discard_fn(
     520        24000 :                     get_key(i),
     521        24000 :                     Lsn(0x20),
     522        24000 :                     Value::Image(get_large_img()),
     523        24000 :                     &tline,
     524        24000 :                     &ctx,
     525        24000 :                     |_| async { discard },
     526        24000 :                 )
     527         1626 :                 .await
     528        24000 :                 .unwrap();
     529              :         }
     530           12 :         let image_layers = image_writer
     531           12 :             .finish(&tline, &ctx, get_key(N as u32))
     532           24 :             .await
     533           12 :             .unwrap();
     534           12 :         let delta_layers = delta_writer
     535           12 :             .finish(&tline, &ctx, get_key(N as u32))
     536           30 :             .await
     537           12 :             .unwrap();
     538           12 :         if discard {
     539            0 :             for layer in image_layers {
     540            0 :                 layer.into_discarded_layer();
     541            0 :             }
     542            0 :             for layer in delta_layers {
     543            0 :                 layer.into_discarded_layer();
     544            0 :             }
     545              :         } else {
     546           12 :             let image_layers = image_layers
     547           12 :                 .into_iter()
     548           48 :                 .map(|x| x.into_resident_layer())
     549           12 :                 .collect_vec();
     550           12 :             let delta_layers = delta_layers
     551           12 :                 .into_iter()
     552           48 :                 .map(|x| x.into_resident_layer())
     553           12 :                 .collect_vec();
     554           12 :             assert_eq!(image_layers.len(), N / 512 + 1);
     555           12 :             assert_eq!(delta_layers.len(), N / 512 + 1);
     556           48 :             for idx in 0..image_layers.len() {
     557           48 :                 assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
     558           48 :                 assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
     559           48 :                 assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
     560           48 :                 assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
     561           48 :                 if idx > 0 {
     562           36 :                     assert_eq!(
     563           36 :                         image_layers[idx - 1].layer_desc().key_range.end,
     564           36 :                         image_layers[idx].layer_desc().key_range.start
     565           36 :                     );
     566           36 :                     assert_eq!(
     567           36 :                         delta_layers[idx - 1].layer_desc().key_range.end,
     568           36 :                         delta_layers[idx].layer_desc().key_range.start
     569           36 :                     );
     570           12 :                 }
     571              :             }
     572              :         }
     573           12 :     }
     574              : 
     575              :     #[tokio::test]
     576            6 :     async fn write_large_img() {
     577            6 :         let harness = TenantHarness::create("split_writer_write_large_img")
     578            6 :             .await
     579            6 :             .unwrap();
     580           24 :         let (tenant, ctx) = harness.load().await;
     581            6 : 
     582            6 :         let tline = tenant
     583            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     584           12 :             .await
     585            6 :             .unwrap();
     586            6 : 
     587            6 :         let mut image_writer = SplitImageLayerWriter::new(
     588            6 :             tenant.conf,
     589            6 :             tline.timeline_id,
     590            6 :             tenant.tenant_shard_id,
     591            6 :             get_key(0),
     592            6 :             Lsn(0x18),
     593            6 :             4 * 1024,
     594            6 :             &ctx,
     595            6 :         )
     596            6 :         .await
     597            6 :         .unwrap();
     598            6 : 
     599            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     600            6 :             tenant.conf,
     601            6 :             tline.timeline_id,
     602            6 :             tenant.tenant_shard_id,
     603            6 :             get_key(0),
     604            6 :             Lsn(0x18)..Lsn(0x20),
     605            6 :             4 * 1024,
     606            6 :             &ctx,
     607            6 :         )
     608            6 :         .await
     609            6 :         .unwrap();
     610            6 : 
     611            6 :         image_writer
     612            6 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     613            6 :             .await
     614            6 :             .unwrap();
     615            6 :         image_writer
     616            6 :             .put_image(get_key(1), get_large_img(), &tline, &ctx)
     617           21 :             .await
     618            6 :             .unwrap();
     619            6 :         let layers = image_writer
     620            6 :             .finish(&tline, &ctx, get_key(10))
     621           12 :             .await
     622            6 :             .unwrap();
     623            6 :         assert_eq!(layers.len(), 2);
     624            6 : 
     625            6 :         delta_writer
     626            6 :             .put_value(
     627            6 :                 get_key(0),
     628            6 :                 Lsn(0x18),
     629            6 :                 Value::Image(get_img(0)),
     630            6 :                 &tline,
     631            6 :                 &ctx,
     632            6 :             )
     633            6 :             .await
     634            6 :             .unwrap();
     635            6 :         delta_writer
     636            6 :             .put_value(
     637            6 :                 get_key(1),
     638            6 :                 Lsn(0x1A),
     639            6 :                 Value::Image(get_large_img()),
     640            6 :                 &tline,
     641            6 :                 &ctx,
     642            6 :             )
     643           18 :             .await
     644            6 :             .unwrap();
     645            6 :         let layers = delta_writer
     646            6 :             .finish(&tline, &ctx, get_key(10))
     647           15 :             .await
     648            6 :             .unwrap();
     649            6 :         assert_eq!(layers.len(), 2);
     650            6 :     }
     651              : 
     652              :     #[tokio::test]
     653            6 :     async fn write_split_single_key() {
     654            6 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     655            6 :             .await
     656            6 :             .unwrap();
     657           24 :         let (tenant, ctx) = harness.load().await;
     658            6 : 
     659            6 :         let tline = tenant
     660            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     661           12 :             .await
     662            6 :             .unwrap();
     663            6 : 
     664            6 :         const N: usize = 2000;
     665            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     666            6 :             tenant.conf,
     667            6 :             tline.timeline_id,
     668            6 :             tenant.tenant_shard_id,
     669            6 :             get_key(0),
     670            6 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     671            6 :             4 * 1024 * 1024,
     672            6 :             &ctx,
     673            6 :         )
     674            6 :         .await
     675            6 :         .unwrap();
     676            6 : 
     677        12006 :         for i in 0..N {
     678        12000 :             let i = i as u32;
     679        12000 :             delta_writer
     680        12000 :                 .put_value(
     681        12000 :                     get_key(0),
     682        12000 :                     Lsn(i as u64 * 16 + 0x10),
     683        12000 :                     Value::Image(get_large_img()),
     684        12000 :                     &tline,
     685        12000 :                     &ctx,
     686        12000 :                 )
     687          759 :                 .await
     688        12000 :                 .unwrap();
     689            6 :         }
     690            6 :         let delta_layers = delta_writer
     691            6 :             .finish(&tline, &ctx, get_key(N as u32))
     692           24 :             .await
     693            6 :             .unwrap();
     694            6 :         assert_eq!(delta_layers.len(), 1);
     695            6 :     }
     696              : }
        

Generated by: LCOV version 2.1-beta