LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - split_writer.rs (source / functions) Coverage Total Hit
Test: 7eb96e224e685167ad85f58f858387d8cf253f63.info Lines: 95.3 % 618 589
Test Date: 2024-09-23 21:23:07 Functions: 98.5 % 66 65

            Line data    Source code
       1              : use std::{future::Future, ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
       9              : 
      10              : use super::layer::S3_UPLOAD_LIMIT;
      11              : use super::{
      12              :     DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
      13              : };
      14              : 
      15              : pub(crate) enum SplitWriterResult {
      16              :     Produced(ResidentLayer),
      17              :     Discarded(PersistentLayerKey),
      18              : }
      19              : 
      20              : #[cfg(test)]
      21              : impl SplitWriterResult {
      22          120 :     fn into_resident_layer(self) -> ResidentLayer {
      23          120 :         match self {
      24          120 :             SplitWriterResult::Produced(layer) => layer,
      25            0 :             SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
      26              :         }
      27          120 :     }
      28              : 
      29            0 :     fn into_discarded_layer(self) -> PersistentLayerKey {
      30            0 :         match self {
      31            0 :             SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
      32            0 :             SplitWriterResult::Discarded(layer) => layer,
      33            0 :         }
      34            0 :     }
      35              : }
      36              : 
      37              : /// An image writer that takes images and produces multiple image layers.
      38              : ///
      39              : /// The interface does not guarantee atomicity (i.e., if the image layer generation
      40              : /// fails, there might be leftover files to be cleaned up)
      41              : #[must_use]
      42              : pub struct SplitImageLayerWriter {
      43              :     inner: ImageLayerWriter,
      44              :     target_layer_size: u64,
      45              :     generated_layers: Vec<SplitWriterResult>,
      46              :     conf: &'static PageServerConf,
      47              :     timeline_id: TimelineId,
      48              :     tenant_shard_id: TenantShardId,
      49              :     lsn: Lsn,
      50              :     start_key: Key,
      51              : }
      52              : 
      53              : impl SplitImageLayerWriter {
      54           96 :     pub async fn new(
      55           96 :         conf: &'static PageServerConf,
      56           96 :         timeline_id: TimelineId,
      57           96 :         tenant_shard_id: TenantShardId,
      58           96 :         start_key: Key,
      59           96 :         lsn: Lsn,
      60           96 :         target_layer_size: u64,
      61           96 :         ctx: &RequestContext,
      62           96 :     ) -> anyhow::Result<Self> {
      63           96 :         Ok(Self {
      64           96 :             target_layer_size,
      65           96 :             inner: ImageLayerWriter::new(
      66           96 :                 conf,
      67           96 :                 timeline_id,
      68           96 :                 tenant_shard_id,
      69           96 :                 &(start_key..Key::MAX),
      70           96 :                 lsn,
      71           96 :                 ctx,
      72           96 :             )
      73           48 :             .await?,
      74           96 :             generated_layers: Vec::new(),
      75           96 :             conf,
      76           96 :             timeline_id,
      77           96 :             tenant_shard_id,
      78           96 :             lsn,
      79           96 :             start_key,
      80              :         })
      81           96 :     }
      82              : 
      83        25242 :     pub async fn put_image_with_discard_fn<D, F>(
      84        25242 :         &mut self,
      85        25242 :         key: Key,
      86        25242 :         img: Bytes,
      87        25242 :         tline: &Arc<Timeline>,
      88        25242 :         ctx: &RequestContext,
      89        25242 :         discard: D,
      90        25242 :     ) -> anyhow::Result<()>
      91        25242 :     where
      92        25242 :         D: FnOnce(&PersistentLayerKey) -> F,
      93        25242 :         F: Future<Output = bool>,
      94        25242 :     {
      95        25242 :         // The current estimation is an upper bound of the space that the key/image could take
      96        25242 :         // because we did not consider compression in this estimation. The resulting image layer
      97        25242 :         // could be smaller than the target size.
      98        25242 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
      99        25242 :         if self.inner.num_keys() >= 1
     100        25146 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     101              :         {
     102           42 :             let next_image_writer = ImageLayerWriter::new(
     103           42 :                 self.conf,
     104           42 :                 self.timeline_id,
     105           42 :                 self.tenant_shard_id,
     106           42 :                 &(key..Key::MAX),
     107           42 :                 self.lsn,
     108           42 :                 ctx,
     109           42 :             )
     110           21 :             .await?;
     111           42 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
     112           42 :             let layer_key = PersistentLayerKey {
     113           42 :                 key_range: self.start_key..key,
     114           42 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     115           42 :                 is_delta: false,
     116           42 :             };
     117           42 :             self.start_key = key;
     118           42 : 
     119           42 :             if discard(&layer_key).await {
     120            0 :                 drop(prev_image_writer);
     121            0 :                 self.generated_layers
     122            0 :                     .push(SplitWriterResult::Discarded(layer_key));
     123            0 :             } else {
     124           90 :                 let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?;
     125              : 
     126           42 :                 let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     127           42 :                 self.generated_layers
     128           42 :                     .push(SplitWriterResult::Produced(layer));
     129              :             }
     130        25200 :         }
     131        25620 :         self.inner.put_image(key, img, ctx).await
     132        25242 :     }
     133              : 
     134              :     #[cfg(test)]
     135           18 :     pub async fn put_image(
     136           18 :         &mut self,
     137           18 :         key: Key,
     138           18 :         img: Bytes,
     139           18 :         tline: &Arc<Timeline>,
     140           18 :         ctx: &RequestContext,
     141           18 :     ) -> anyhow::Result<()> {
     142           18 :         self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
     143           33 :             .await
     144           18 :     }
     145              : 
     146           84 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     147           84 :         self,
     148           84 :         tline: &Arc<Timeline>,
     149           84 :         ctx: &RequestContext,
     150           84 :         end_key: Key,
     151           84 :         discard: D,
     152           84 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     153           84 :     where
     154           84 :         D: FnOnce(&PersistentLayerKey) -> F,
     155           84 :         F: Future<Output = bool>,
     156           84 :     {
     157           84 :         let Self {
     158           84 :             mut generated_layers,
     159           84 :             inner,
     160           84 :             ..
     161           84 :         } = self;
     162           84 :         if inner.num_keys() == 0 {
     163            0 :             return Ok(generated_layers);
     164           84 :         }
     165           84 :         let layer_key = PersistentLayerKey {
     166           84 :             key_range: self.start_key..end_key,
     167           84 :             lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
     168           84 :             is_delta: false,
     169           84 :         };
     170           84 :         if discard(&layer_key).await {
     171           24 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     172           24 :         } else {
     173          120 :             let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?;
     174           60 :             let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     175           60 :             generated_layers.push(SplitWriterResult::Produced(layer));
     176              :         }
     177           84 :         Ok(generated_layers)
     178           84 :     }
     179              : 
     180              :     #[cfg(test)]
     181           24 :     pub(crate) async fn finish(
     182           24 :         self,
     183           24 :         tline: &Arc<Timeline>,
     184           24 :         ctx: &RequestContext,
     185           24 :         end_key: Key,
     186           24 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     187           24 :         self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
     188           48 :             .await
     189           24 :     }
     190              : 
     191              :     /// This function will be deprecated with #8841.
     192           12 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
     193           12 :         Ok((self.generated_layers, self.inner))
     194           12 :     }
     195              : }
     196              : 
     197              : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
     198              : ///
     199              : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
     200              : /// there might be leftover files to be cleaned up).
     201              : ///
     202              : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
     203              : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
     204              : /// will split them into multiple files based on size.
     205              : #[must_use]
     206              : pub struct SplitDeltaLayerWriter {
     207              :     inner: Option<(Key, DeltaLayerWriter)>,
     208              :     target_layer_size: u64,
     209              :     generated_layers: Vec<SplitWriterResult>,
     210              :     conf: &'static PageServerConf,
     211              :     timeline_id: TimelineId,
     212              :     tenant_shard_id: TenantShardId,
     213              :     lsn_range: Range<Lsn>,
     214              :     last_key_written: Key,
     215              : }
     216              : 
     217              : impl SplitDeltaLayerWriter {
     218          108 :     pub async fn new(
     219          108 :         conf: &'static PageServerConf,
     220          108 :         timeline_id: TimelineId,
     221          108 :         tenant_shard_id: TenantShardId,
     222          108 :         lsn_range: Range<Lsn>,
     223          108 :         target_layer_size: u64,
     224          108 :     ) -> anyhow::Result<Self> {
     225          108 :         Ok(Self {
     226          108 :             target_layer_size,
     227          108 :             inner: None,
     228          108 :             generated_layers: Vec::new(),
     229          108 :             conf,
     230          108 :             timeline_id,
     231          108 :             tenant_shard_id,
     232          108 :             lsn_range,
     233          108 :             last_key_written: Key::MIN,
     234          108 :         })
     235          108 :     }
     236              : 
     237              :     /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
     238        36402 :     pub async fn put_value_with_discard_fn<D, F>(
     239        36402 :         &mut self,
     240        36402 :         key: Key,
     241        36402 :         lsn: Lsn,
     242        36402 :         val: Value,
     243        36402 :         tline: &Arc<Timeline>,
     244        36402 :         ctx: &RequestContext,
     245        36402 :         discard: D,
     246        36402 :     ) -> anyhow::Result<()>
     247        36402 :     where
     248        36402 :         D: FnOnce(&PersistentLayerKey) -> F,
     249        36402 :         F: Future<Output = bool>,
     250        36402 :     {
     251        36402 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     252        36402 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     253        36402 :         //
     254        36402 :         // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
     255        36402 :         // strategy. https://github.com/neondatabase/neon/issues/8837
     256        36402 : 
     257        36402 :         if self.inner.is_none() {
     258           96 :             self.inner = Some((
     259           96 :                 key,
     260           96 :                 DeltaLayerWriter::new(
     261           96 :                     self.conf,
     262           96 :                     self.timeline_id,
     263           96 :                     self.tenant_shard_id,
     264           96 :                     key,
     265           96 :                     self.lsn_range.clone(),
     266           96 :                     ctx,
     267           96 :                 )
     268           48 :                 .await?,
     269              :             ));
     270        36306 :         }
     271        36402 :         let (_, inner) = self.inner.as_mut().unwrap();
     272        36402 : 
     273        36402 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     274        36402 :         if inner.num_keys() >= 1
     275        36306 :             && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     276              :         {
     277         8988 :             if key != self.last_key_written {
     278           42 :                 let next_delta_writer = DeltaLayerWriter::new(
     279           42 :                     self.conf,
     280           42 :                     self.timeline_id,
     281           42 :                     self.tenant_shard_id,
     282           42 :                     key,
     283           42 :                     self.lsn_range.clone(),
     284           42 :                     ctx,
     285           42 :                 )
     286           21 :                 .await?;
     287           42 :                 let (start_key, prev_delta_writer) =
     288           42 :                     std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
     289           42 :                 let layer_key = PersistentLayerKey {
     290           42 :                     key_range: start_key..key,
     291           42 :                     lsn_range: self.lsn_range.clone(),
     292           42 :                     is_delta: true,
     293           42 :                 };
     294           42 :                 if discard(&layer_key).await {
     295            0 :                     drop(prev_delta_writer);
     296            0 :                     self.generated_layers
     297            0 :                         .push(SplitWriterResult::Discarded(layer_key));
     298            0 :                 } else {
     299          111 :                     let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
     300           42 :                     let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     301           42 :                     self.generated_layers
     302           42 :                         .push(SplitWriterResult::Produced(delta_layer));
     303              :                 }
     304         8946 :             } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
     305              :                 // We have to produce a very large file b/c a key is updated too often.
     306            0 :                 anyhow::bail!(
     307            0 :                     "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
     308            0 :                     key,
     309            0 :                     inner.estimated_size()
     310            0 :                 );
     311         8946 :             }
     312        27414 :         }
     313        36402 :         self.last_key_written = key;
     314        36402 :         let (_, inner) = self.inner.as_mut().unwrap();
     315        36402 :         inner.put_value(key, lsn, val, ctx).await
     316        36402 :     }
     317              : 
     318        12018 :     pub async fn put_value(
     319        12018 :         &mut self,
     320        12018 :         key: Key,
     321        12018 :         lsn: Lsn,
     322        12018 :         val: Value,
     323        12018 :         tline: &Arc<Timeline>,
     324        12018 :         ctx: &RequestContext,
     325        12018 :     ) -> anyhow::Result<()> {
     326        12018 :         self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
     327          786 :             .await
     328        12018 :     }
     329              : 
     330           96 :     pub(crate) async fn finish_with_discard_fn<D, F>(
     331           96 :         self,
     332           96 :         tline: &Arc<Timeline>,
     333           96 :         ctx: &RequestContext,
     334           96 :         discard: D,
     335           96 :     ) -> anyhow::Result<Vec<SplitWriterResult>>
     336           96 :     where
     337           96 :         D: FnOnce(&PersistentLayerKey) -> F,
     338           96 :         F: Future<Output = bool>,
     339           96 :     {
     340           96 :         let Self {
     341           96 :             mut generated_layers,
     342           96 :             inner,
     343           96 :             ..
     344           96 :         } = self;
     345           96 :         let Some((start_key, inner)) = inner else {
     346           12 :             return Ok(generated_layers);
     347              :         };
     348           84 :         if inner.num_keys() == 0 {
     349            0 :             return Ok(generated_layers);
     350           84 :         }
     351           84 :         let end_key = self.last_key_written.next();
     352           84 :         let layer_key = PersistentLayerKey {
     353           84 :             key_range: start_key..end_key,
     354           84 :             lsn_range: self.lsn_range.clone(),
     355           84 :             is_delta: true,
     356           84 :         };
     357           84 :         if discard(&layer_key).await {
     358           24 :             generated_layers.push(SplitWriterResult::Discarded(layer_key));
     359           24 :         } else {
     360          162 :             let (desc, path) = inner.finish(end_key, ctx).await?;
     361           60 :             let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     362           60 :             generated_layers.push(SplitWriterResult::Produced(delta_layer));
     363              :         }
     364           84 :         Ok(generated_layers)
     365           96 :     }
     366              : 
     367              :     #[cfg(test)]
     368           30 :     pub(crate) async fn finish(
     369           30 :         self,
     370           30 :         tline: &Arc<Timeline>,
     371           30 :         ctx: &RequestContext,
     372           30 :     ) -> anyhow::Result<Vec<SplitWriterResult>> {
     373           30 :         self.finish_with_discard_fn(tline, ctx, |_| async { false })
     374           84 :             .await
     375           30 :     }
     376              : 
     377              :     /// This function will be deprecated with #8841.
     378           12 :     pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
     379           12 :         Ok((self.generated_layers, self.inner.map(|x| x.1)))
     380           12 :     }
     381              : }
     382              : 
     383              : #[cfg(test)]
     384              : mod tests {
     385              :     use itertools::Itertools;
     386              :     use rand::{RngCore, SeedableRng};
     387              : 
     388              :     use crate::{
     389              :         tenant::{
     390              :             harness::{TenantHarness, TIMELINE_ID},
     391              :             storage_layer::AsLayerDesc,
     392              :         },
     393              :         DEFAULT_PG_VERSION,
     394              :     };
     395              : 
     396              :     use super::*;
     397              : 
     398        60156 :     fn get_key(id: u32) -> Key {
     399        60156 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     400        60156 :         key.field6 = id;
     401        60156 :         key
     402        60156 :     }
     403              : 
     404           24 :     fn get_img(id: u32) -> Bytes {
     405           24 :         format!("{id:064}").into()
     406           24 :     }
     407              : 
     408        60012 :     fn get_large_img() -> Bytes {
     409        60012 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     410        60012 :         let mut data = vec![0; 8192];
     411        60012 :         rng.fill_bytes(&mut data);
     412        60012 :         data.into()
     413        60012 :     }
     414              : 
     415              :     #[tokio::test]
     416            6 :     async fn write_one_image() {
     417            6 :         let harness = TenantHarness::create("split_writer_write_one_image")
     418            6 :             .await
     419            6 :             .unwrap();
     420           24 :         let (tenant, ctx) = harness.load().await;
     421            6 : 
     422            6 :         let tline = tenant
     423            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     424           12 :             .await
     425            6 :             .unwrap();
     426            6 : 
     427            6 :         let mut image_writer = SplitImageLayerWriter::new(
     428            6 :             tenant.conf,
     429            6 :             tline.timeline_id,
     430            6 :             tenant.tenant_shard_id,
     431            6 :             get_key(0),
     432            6 :             Lsn(0x18),
     433            6 :             4 * 1024 * 1024,
     434            6 :             &ctx,
     435            6 :         )
     436            6 :         .await
     437            6 :         .unwrap();
     438            6 : 
     439            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     440            6 :             tenant.conf,
     441            6 :             tline.timeline_id,
     442            6 :             tenant.tenant_shard_id,
     443            6 :             Lsn(0x18)..Lsn(0x20),
     444            6 :             4 * 1024 * 1024,
     445            6 :         )
     446            6 :         .await
     447            6 :         .unwrap();
     448            6 : 
     449            6 :         image_writer
     450            6 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     451            6 :             .await
     452            6 :             .unwrap();
     453            6 :         let layers = image_writer
     454            6 :             .finish(&tline, &ctx, get_key(10))
     455           12 :             .await
     456            6 :             .unwrap();
     457            6 :         assert_eq!(layers.len(), 1);
     458            6 : 
     459            6 :         delta_writer
     460            6 :             .put_value(
     461            6 :                 get_key(0),
     462            6 :                 Lsn(0x18),
     463            6 :                 Value::Image(get_img(0)),
     464            6 :                 &tline,
     465            6 :                 &ctx,
     466            6 :             )
     467            6 :             .await
     468            6 :             .unwrap();
     469           15 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     470            6 :         assert_eq!(layers.len(), 1);
     471            6 :         assert_eq!(
     472            6 :             layers
     473            6 :                 .into_iter()
     474            6 :                 .next()
     475            6 :                 .unwrap()
     476            6 :                 .into_resident_layer()
     477            6 :                 .layer_desc()
     478            6 :                 .key(),
     479            6 :             PersistentLayerKey {
     480            6 :                 key_range: get_key(0)..get_key(1),
     481            6 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     482            6 :                 is_delta: true
     483            6 :             }
     484            6 :         );
     485            6 :     }
     486              : 
     487              :     #[tokio::test]
     488            6 :     async fn write_split() {
     489        13110 :         write_split_helper("split_writer_write_split", false).await;
     490            6 :     }
     491              : 
     492              :     #[tokio::test]
     493            6 :     async fn write_split_discard() {
     494        13110 :         write_split_helper("split_writer_write_split_discard", false).await;
     495            6 :     }
     496              : 
     497           12 :     async fn write_split_helper(harness_name: &'static str, discard: bool) {
     498           12 :         let harness = TenantHarness::create(harness_name).await.unwrap();
     499           48 :         let (tenant, ctx) = harness.load().await;
     500              : 
     501           12 :         let tline = tenant
     502           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     503           24 :             .await
     504           12 :             .unwrap();
     505              : 
     506           12 :         let mut image_writer = SplitImageLayerWriter::new(
     507           12 :             tenant.conf,
     508           12 :             tline.timeline_id,
     509           12 :             tenant.tenant_shard_id,
     510           12 :             get_key(0),
     511           12 :             Lsn(0x18),
     512           12 :             4 * 1024 * 1024,
     513           12 :             &ctx,
     514           12 :         )
     515            6 :         .await
     516           12 :         .unwrap();
     517           12 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     518           12 :             tenant.conf,
     519           12 :             tline.timeline_id,
     520           12 :             tenant.tenant_shard_id,
     521           12 :             Lsn(0x18)..Lsn(0x20),
     522           12 :             4 * 1024 * 1024,
     523           12 :         )
     524            0 :         .await
     525           12 :         .unwrap();
     526              :         const N: usize = 2000;
     527        24012 :         for i in 0..N {
     528        24000 :             let i = i as u32;
     529        24000 :             image_writer
     530        24000 :                 .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
     531           36 :                     discard
     532        24000 :                 })
     533        24456 :                 .await
     534        24000 :                 .unwrap();
     535        24000 :             delta_writer
     536        24000 :                 .put_value_with_discard_fn(
     537        24000 :                     get_key(i),
     538        24000 :                     Lsn(0x20),
     539        24000 :                     Value::Image(get_large_img()),
     540        24000 :                     &tline,
     541        24000 :                     &ctx,
     542        24000 :                     |_| async { discard },
     543        24000 :                 )
     544         1632 :                 .await
     545        24000 :                 .unwrap();
     546              :         }
     547           12 :         let image_layers = image_writer
     548           12 :             .finish(&tline, &ctx, get_key(N as u32))
     549           24 :             .await
     550           12 :             .unwrap();
     551           30 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     552           12 :         if discard {
     553            0 :             for layer in image_layers {
     554            0 :                 layer.into_discarded_layer();
     555            0 :             }
     556            0 :             for layer in delta_layers {
     557            0 :                 layer.into_discarded_layer();
     558            0 :             }
     559              :         } else {
     560           12 :             let image_layers = image_layers
     561           12 :                 .into_iter()
     562           48 :                 .map(|x| x.into_resident_layer())
     563           12 :                 .collect_vec();
     564           12 :             let delta_layers = delta_layers
     565           12 :                 .into_iter()
     566           48 :                 .map(|x| x.into_resident_layer())
     567           12 :                 .collect_vec();
     568           12 :             assert_eq!(image_layers.len(), N / 512 + 1);
     569           12 :             assert_eq!(delta_layers.len(), N / 512 + 1);
     570           12 :             assert_eq!(
     571           12 :                 delta_layers.first().unwrap().layer_desc().key_range.start,
     572           12 :                 get_key(0)
     573           12 :             );
     574           12 :             assert_eq!(
     575           12 :                 delta_layers.last().unwrap().layer_desc().key_range.end,
     576           12 :                 get_key(N as u32)
     577           12 :             );
     578           48 :             for idx in 0..image_layers.len() {
     579           48 :                 assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
     580           48 :                 assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
     581           48 :                 assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
     582           48 :                 assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
     583           48 :                 if idx > 0 {
     584           36 :                     assert_eq!(
     585           36 :                         image_layers[idx - 1].layer_desc().key_range.end,
     586           36 :                         image_layers[idx].layer_desc().key_range.start
     587           36 :                     );
     588           36 :                     assert_eq!(
     589           36 :                         delta_layers[idx - 1].layer_desc().key_range.end,
     590           36 :                         delta_layers[idx].layer_desc().key_range.start
     591           36 :                     );
     592           12 :                 }
     593              :             }
     594              :         }
     595           12 :     }
     596              : 
     597              :     #[tokio::test]
     598            6 :     async fn write_large_img() {
     599            6 :         let harness = TenantHarness::create("split_writer_write_large_img")
     600            6 :             .await
     601            6 :             .unwrap();
     602           24 :         let (tenant, ctx) = harness.load().await;
     603            6 : 
     604            6 :         let tline = tenant
     605            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     606           12 :             .await
     607            6 :             .unwrap();
     608            6 : 
     609            6 :         let mut image_writer = SplitImageLayerWriter::new(
     610            6 :             tenant.conf,
     611            6 :             tline.timeline_id,
     612            6 :             tenant.tenant_shard_id,
     613            6 :             get_key(0),
     614            6 :             Lsn(0x18),
     615            6 :             4 * 1024,
     616            6 :             &ctx,
     617            6 :         )
     618            6 :         .await
     619            6 :         .unwrap();
     620            6 : 
     621            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     622            6 :             tenant.conf,
     623            6 :             tline.timeline_id,
     624            6 :             tenant.tenant_shard_id,
     625            6 :             Lsn(0x18)..Lsn(0x20),
     626            6 :             4 * 1024,
     627            6 :         )
     628            6 :         .await
     629            6 :         .unwrap();
     630            6 : 
     631            6 :         image_writer
     632            6 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     633            6 :             .await
     634            6 :             .unwrap();
     635            6 :         image_writer
     636            6 :             .put_image(get_key(1), get_large_img(), &tline, &ctx)
     637           21 :             .await
     638            6 :             .unwrap();
     639            6 :         let layers = image_writer
     640            6 :             .finish(&tline, &ctx, get_key(10))
     641           12 :             .await
     642            6 :             .unwrap();
     643            6 :         assert_eq!(layers.len(), 2);
     644            6 : 
     645            6 :         delta_writer
     646            6 :             .put_value(
     647            6 :                 get_key(0),
     648            6 :                 Lsn(0x18),
     649            6 :                 Value::Image(get_img(0)),
     650            6 :                 &tline,
     651            6 :                 &ctx,
     652            6 :             )
     653            6 :             .await
     654            6 :             .unwrap();
     655            6 :         delta_writer
     656            6 :             .put_value(
     657            6 :                 get_key(1),
     658            6 :                 Lsn(0x1A),
     659            6 :                 Value::Image(get_large_img()),
     660            6 :                 &tline,
     661            6 :                 &ctx,
     662            6 :             )
     663           18 :             .await
     664            6 :             .unwrap();
     665           15 :         let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     666            6 :         assert_eq!(layers.len(), 2);
     667            6 :         let mut layers_iter = layers.into_iter();
     668            6 :         assert_eq!(
     669            6 :             layers_iter
     670            6 :                 .next()
     671            6 :                 .unwrap()
     672            6 :                 .into_resident_layer()
     673            6 :                 .layer_desc()
     674            6 :                 .key(),
     675            6 :             PersistentLayerKey {
     676            6 :                 key_range: get_key(0)..get_key(1),
     677            6 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     678            6 :                 is_delta: true
     679            6 :             }
     680            6 :         );
     681            6 :         assert_eq!(
     682            6 :             layers_iter
     683            6 :                 .next()
     684            6 :                 .unwrap()
     685            6 :                 .into_resident_layer()
     686            6 :                 .layer_desc()
     687            6 :                 .key(),
     688            6 :             PersistentLayerKey {
     689            6 :                 key_range: get_key(1)..get_key(2),
     690            6 :                 lsn_range: Lsn(0x18)..Lsn(0x20),
     691            6 :                 is_delta: true
     692            6 :             }
     693            6 :         );
     694            6 :     }
     695              : 
     696              :     #[tokio::test]
     697            6 :     async fn write_split_single_key() {
     698            6 :         let harness = TenantHarness::create("split_writer_write_split_single_key")
     699            6 :             .await
     700            6 :             .unwrap();
     701           24 :         let (tenant, ctx) = harness.load().await;
     702            6 : 
     703            6 :         let tline = tenant
     704            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     705           12 :             .await
     706            6 :             .unwrap();
     707            6 : 
     708            6 :         const N: usize = 2000;
     709            6 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     710            6 :             tenant.conf,
     711            6 :             tline.timeline_id,
     712            6 :             tenant.tenant_shard_id,
     713            6 :             Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     714            6 :             4 * 1024 * 1024,
     715            6 :         )
     716            6 :         .await
     717            6 :         .unwrap();
     718            6 : 
     719        12006 :         for i in 0..N {
     720        12000 :             let i = i as u32;
     721        12000 :             delta_writer
     722        12000 :                 .put_value(
     723        12000 :                     get_key(0),
     724        12000 :                     Lsn(i as u64 * 16 + 0x10),
     725        12000 :                     Value::Image(get_large_img()),
     726        12000 :                     &tline,
     727        12000 :                     &ctx,
     728        12000 :                 )
     729          762 :                 .await
     730        12000 :                 .unwrap();
     731            6 :         }
     732           24 :         let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
     733            6 :         assert_eq!(delta_layers.len(), 1);
     734            6 :         let delta_layer = delta_layers
     735            6 :             .into_iter()
     736            6 :             .next()
     737            6 :             .unwrap()
     738            6 :             .into_resident_layer();
     739            6 :         assert_eq!(
     740            6 :             delta_layer.layer_desc().key(),
     741            6 :             PersistentLayerKey {
     742            6 :                 key_range: get_key(0)..get_key(1),
     743            6 :                 lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
     744            6 :                 is_delta: true
     745            6 :             }
     746            6 :         );
     747            6 :     }
     748              : }
        

Generated by: LCOV version 2.1-beta