LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - split_writer.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 98.4 % 375 369
Test Date: 2024-08-21 17:32:46 Functions: 84.0 % 25 21

            Line data    Source code
       1              : use std::{ops::Range, sync::Arc};
       2              : 
       3              : use bytes::Bytes;
       4              : use pageserver_api::key::{Key, KEY_SIZE};
       5              : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
       6              : 
       7              : use crate::tenant::storage_layer::Layer;
       8              : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
       9              : 
      10              : use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
      11              : 
      12              : /// An image writer that takes images and produces multiple image layers. The interface does not
      13              : /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
      14              : /// to be cleaned up)
      15              : #[must_use]
      16              : pub struct SplitImageLayerWriter {
      17              :     inner: ImageLayerWriter,
      18              :     target_layer_size: u64,
      19              :     generated_layers: Vec<ResidentLayer>,
      20              :     conf: &'static PageServerConf,
      21              :     timeline_id: TimelineId,
      22              :     tenant_shard_id: TenantShardId,
      23              :     lsn: Lsn,
      24              : }
      25              : 
      26              : impl SplitImageLayerWriter {
      27            6 :     pub async fn new(
      28            6 :         conf: &'static PageServerConf,
      29            6 :         timeline_id: TimelineId,
      30            6 :         tenant_shard_id: TenantShardId,
      31            6 :         start_key: Key,
      32            6 :         lsn: Lsn,
      33            6 :         target_layer_size: u64,
      34            6 :         ctx: &RequestContext,
      35            6 :     ) -> anyhow::Result<Self> {
      36            6 :         Ok(Self {
      37            6 :             target_layer_size,
      38            6 :             inner: ImageLayerWriter::new(
      39            6 :                 conf,
      40            6 :                 timeline_id,
      41            6 :                 tenant_shard_id,
      42            6 :                 &(start_key..Key::MAX),
      43            6 :                 lsn,
      44            6 :                 ctx,
      45            6 :             )
      46            3 :             .await?,
      47            6 :             generated_layers: Vec::new(),
      48            6 :             conf,
      49            6 :             timeline_id,
      50            6 :             tenant_shard_id,
      51            6 :             lsn,
      52              :         })
      53            6 :     }
      54              : 
      55         4006 :     pub async fn put_image(
      56         4006 :         &mut self,
      57         4006 :         key: Key,
      58         4006 :         img: Bytes,
      59         4006 :         tline: &Arc<Timeline>,
      60         4006 :         ctx: &RequestContext,
      61         4006 :     ) -> anyhow::Result<()> {
      62         4006 :         // The current estimation is an upper bound of the space that the key/image could take
      63         4006 :         // because we did not consider compression in this estimation. The resulting image layer
      64         4006 :         // could be smaller than the target size.
      65         4006 :         let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
      66         4006 :         if self.inner.num_keys() >= 1
      67         4000 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
      68              :         {
      69            8 :             let next_image_writer = ImageLayerWriter::new(
      70            8 :                 self.conf,
      71            8 :                 self.timeline_id,
      72            8 :                 self.tenant_shard_id,
      73            8 :                 &(key..Key::MAX),
      74            8 :                 self.lsn,
      75            8 :                 ctx,
      76            8 :             )
      77            4 :             .await?;
      78            8 :             let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
      79            8 :             self.generated_layers.push(
      80            8 :                 prev_image_writer
      81            8 :                     .finish_with_end_key(tline, key, ctx)
      82           17 :                     .await?,
      83              :             );
      84         3998 :         }
      85         4066 :         self.inner.put_image(key, img, ctx).await
      86         4006 :     }
      87              : 
      88            6 :     pub(crate) async fn finish(
      89            6 :         self,
      90            6 :         tline: &Arc<Timeline>,
      91            6 :         ctx: &RequestContext,
      92            6 :         end_key: Key,
      93            6 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
      94            6 :         let Self {
      95            6 :             mut generated_layers,
      96            6 :             inner,
      97            6 :             ..
      98            6 :         } = self;
      99           12 :         generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
     100            6 :         Ok(generated_layers)
     101            6 :     }
     102              : 
     103              :     /// When split writer fails, the caller should call this function and handle partially generated layers.
     104              :     #[allow(dead_code)]
     105            0 :     pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, ImageLayerWriter)> {
     106            0 :         Ok((self.generated_layers, self.inner))
     107            0 :     }
     108              : }
     109              : 
     110              : /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
     111              : /// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
     112              : /// to be cleaned up).
     113              : #[must_use]
     114              : pub struct SplitDeltaLayerWriter {
     115              :     inner: DeltaLayerWriter,
     116              :     target_layer_size: u64,
     117              :     generated_layers: Vec<ResidentLayer>,
     118              :     conf: &'static PageServerConf,
     119              :     timeline_id: TimelineId,
     120              :     tenant_shard_id: TenantShardId,
     121              :     lsn_range: Range<Lsn>,
     122              : }
     123              : 
     124              : impl SplitDeltaLayerWriter {
     125            6 :     pub async fn new(
     126            6 :         conf: &'static PageServerConf,
     127            6 :         timeline_id: TimelineId,
     128            6 :         tenant_shard_id: TenantShardId,
     129            6 :         start_key: Key,
     130            6 :         lsn_range: Range<Lsn>,
     131            6 :         target_layer_size: u64,
     132            6 :         ctx: &RequestContext,
     133            6 :     ) -> anyhow::Result<Self> {
     134            6 :         Ok(Self {
     135            6 :             target_layer_size,
     136            6 :             inner: DeltaLayerWriter::new(
     137            6 :                 conf,
     138            6 :                 timeline_id,
     139            6 :                 tenant_shard_id,
     140            6 :                 start_key,
     141            6 :                 lsn_range.clone(),
     142            6 :                 ctx,
     143            6 :             )
     144            3 :             .await?,
     145            6 :             generated_layers: Vec::new(),
     146            6 :             conf,
     147            6 :             timeline_id,
     148            6 :             tenant_shard_id,
     149            6 :             lsn_range,
     150              :         })
     151            6 :     }
     152              : 
     153         4006 :     pub async fn put_value(
     154         4006 :         &mut self,
     155         4006 :         key: Key,
     156         4006 :         lsn: Lsn,
     157         4006 :         val: Value,
     158         4006 :         tline: &Arc<Timeline>,
     159         4006 :         ctx: &RequestContext,
     160         4006 :     ) -> anyhow::Result<()> {
     161         4006 :         // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
     162         4006 :         // number, and therefore the final layer size could be a little bit larger or smaller than the target.
     163         4006 :         let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
     164         4006 :         if self.inner.num_keys() >= 1
     165         4000 :             && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
     166              :         {
     167            8 :             let next_delta_writer = DeltaLayerWriter::new(
     168            8 :                 self.conf,
     169            8 :                 self.timeline_id,
     170            8 :                 self.tenant_shard_id,
     171            8 :                 key,
     172            8 :                 self.lsn_range.clone(),
     173            8 :                 ctx,
     174            8 :             )
     175            4 :             .await?;
     176            8 :             let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
     177           21 :             let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
     178            8 :             let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     179            8 :             self.generated_layers.push(delta_layer);
     180         3998 :         }
     181         4006 :         self.inner.put_value(key, lsn, val, ctx).await
     182         4006 :     }
     183              : 
     184            6 :     pub(crate) async fn finish(
     185            6 :         self,
     186            6 :         tline: &Arc<Timeline>,
     187            6 :         ctx: &RequestContext,
     188            6 :         end_key: Key,
     189            6 :     ) -> anyhow::Result<Vec<ResidentLayer>> {
     190            6 :         let Self {
     191            6 :             mut generated_layers,
     192            6 :             inner,
     193            6 :             ..
     194            6 :         } = self;
     195              : 
     196           15 :         let (desc, path) = inner.finish(end_key, ctx).await?;
     197            6 :         let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
     198            6 :         generated_layers.push(delta_layer);
     199            6 :         Ok(generated_layers)
     200            6 :     }
     201              : 
     202              :     /// When split writer fails, the caller should call this function and handle partially generated layers.
     203              :     #[allow(dead_code)]
     204            0 :     pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, DeltaLayerWriter)> {
     205            0 :         Ok((self.generated_layers, self.inner))
     206            0 :     }
     207              : }
     208              : 
     209              : #[cfg(test)]
     210              : mod tests {
     211              :     use rand::{RngCore, SeedableRng};
     212              : 
     213              :     use crate::{
     214              :         tenant::{
     215              :             harness::{TenantHarness, TIMELINE_ID},
     216              :             storage_layer::AsLayerDesc,
     217              :         },
     218              :         DEFAULT_PG_VERSION,
     219              :     };
     220              : 
     221              :     use super::*;
     222              : 
     223         8036 :     fn get_key(id: u32) -> Key {
     224         8036 :         let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     225         8036 :         key.field6 = id;
     226         8036 :         key
     227         8036 :     }
     228              : 
     229            8 :     fn get_img(id: u32) -> Bytes {
     230            8 :         format!("{id:064}").into()
     231            8 :     }
     232              : 
     233         8004 :     fn get_large_img() -> Bytes {
     234         8004 :         let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
     235         8004 :         let mut data = vec![0; 8192];
     236         8004 :         rng.fill_bytes(&mut data);
     237         8004 :         data.into()
     238         8004 :     }
     239              : 
     240              :     #[tokio::test]
     241            2 :     async fn write_one_image() {
     242            2 :         let harness = TenantHarness::create("split_writer_write_one_image")
     243            2 :             .await
     244            2 :             .unwrap();
     245            8 :         let (tenant, ctx) = harness.load().await;
     246            2 : 
     247            2 :         let tline = tenant
     248            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     249            4 :             .await
     250            2 :             .unwrap();
     251            2 : 
     252            2 :         let mut image_writer = SplitImageLayerWriter::new(
     253            2 :             tenant.conf,
     254            2 :             tline.timeline_id,
     255            2 :             tenant.tenant_shard_id,
     256            2 :             get_key(0),
     257            2 :             Lsn(0x18),
     258            2 :             4 * 1024 * 1024,
     259            2 :             &ctx,
     260            2 :         )
     261            2 :         .await
     262            2 :         .unwrap();
     263            2 : 
     264            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     265            2 :             tenant.conf,
     266            2 :             tline.timeline_id,
     267            2 :             tenant.tenant_shard_id,
     268            2 :             get_key(0),
     269            2 :             Lsn(0x18)..Lsn(0x20),
     270            2 :             4 * 1024 * 1024,
     271            2 :             &ctx,
     272            2 :         )
     273            2 :         .await
     274            2 :         .unwrap();
     275            2 : 
     276            2 :         image_writer
     277            2 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     278            2 :             .await
     279            2 :             .unwrap();
     280            2 :         let layers = image_writer
     281            2 :             .finish(&tline, &ctx, get_key(10))
     282            4 :             .await
     283            2 :             .unwrap();
     284            2 :         assert_eq!(layers.len(), 1);
     285            2 : 
     286            2 :         delta_writer
     287            2 :             .put_value(
     288            2 :                 get_key(0),
     289            2 :                 Lsn(0x18),
     290            2 :                 Value::Image(get_img(0)),
     291            2 :                 &tline,
     292            2 :                 &ctx,
     293            2 :             )
     294            2 :             .await
     295            2 :             .unwrap();
     296            2 :         let layers = delta_writer
     297            2 :             .finish(&tline, &ctx, get_key(10))
     298            5 :             .await
     299            2 :             .unwrap();
     300            2 :         assert_eq!(layers.len(), 1);
     301            2 :     }
     302              : 
     303              :     #[tokio::test]
     304            2 :     async fn write_split() {
     305            2 :         let harness = TenantHarness::create("split_writer_write_split")
     306            2 :             .await
     307            2 :             .unwrap();
     308            8 :         let (tenant, ctx) = harness.load().await;
     309            2 : 
     310            2 :         let tline = tenant
     311            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     312            4 :             .await
     313            2 :             .unwrap();
     314            2 : 
     315            2 :         let mut image_writer = SplitImageLayerWriter::new(
     316            2 :             tenant.conf,
     317            2 :             tline.timeline_id,
     318            2 :             tenant.tenant_shard_id,
     319            2 :             get_key(0),
     320            2 :             Lsn(0x18),
     321            2 :             4 * 1024 * 1024,
     322            2 :             &ctx,
     323            2 :         )
     324            2 :         .await
     325            2 :         .unwrap();
     326            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     327            2 :             tenant.conf,
     328            2 :             tline.timeline_id,
     329            2 :             tenant.tenant_shard_id,
     330            2 :             get_key(0),
     331            2 :             Lsn(0x18)..Lsn(0x20),
     332            2 :             4 * 1024 * 1024,
     333            2 :             &ctx,
     334            2 :         )
     335            2 :         .await
     336            2 :         .unwrap();
     337            2 :         const N: usize = 2000;
     338         4002 :         for i in 0..N {
     339         4000 :             let i = i as u32;
     340         4000 :             image_writer
     341         4000 :                 .put_image(get_key(i), get_large_img(), &tline, &ctx)
     342         4076 :                 .await
     343         4000 :                 .unwrap();
     344         4000 :             delta_writer
     345         4000 :                 .put_value(
     346         4000 :                     get_key(i),
     347         4000 :                     Lsn(0x20),
     348         4000 :                     Value::Image(get_large_img()),
     349         4000 :                     &tline,
     350         4000 :                     &ctx,
     351         4000 :                 )
     352          271 :                 .await
     353         4000 :                 .unwrap();
     354            2 :         }
     355            2 :         let image_layers = image_writer
     356            2 :             .finish(&tline, &ctx, get_key(N as u32))
     357            4 :             .await
     358            2 :             .unwrap();
     359            2 :         let delta_layers = delta_writer
     360            2 :             .finish(&tline, &ctx, get_key(N as u32))
     361            5 :             .await
     362            2 :             .unwrap();
     363            2 :         assert_eq!(image_layers.len(), N / 512 + 1);
     364            2 :         assert_eq!(delta_layers.len(), N / 512 + 1);
     365            8 :         for idx in 0..image_layers.len() {
     366            8 :             assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
     367            8 :             assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
     368            8 :             assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
     369            8 :             assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
     370            8 :             if idx > 0 {
     371            6 :                 assert_eq!(
     372            6 :                     image_layers[idx - 1].layer_desc().key_range.end,
     373            6 :                     image_layers[idx].layer_desc().key_range.start
     374            6 :                 );
     375            6 :                 assert_eq!(
     376            6 :                     delta_layers[idx - 1].layer_desc().key_range.end,
     377            6 :                     delta_layers[idx].layer_desc().key_range.start
     378            6 :                 );
     379            2 :             }
     380            2 :         }
     381            2 :     }
     382              : 
     383              :     #[tokio::test]
     384            2 :     async fn write_large_img() {
     385            2 :         let harness = TenantHarness::create("split_writer_write_large_img")
     386            2 :             .await
     387            2 :             .unwrap();
     388            8 :         let (tenant, ctx) = harness.load().await;
     389            2 : 
     390            2 :         let tline = tenant
     391            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     392            4 :             .await
     393            2 :             .unwrap();
     394            2 : 
     395            2 :         let mut image_writer = SplitImageLayerWriter::new(
     396            2 :             tenant.conf,
     397            2 :             tline.timeline_id,
     398            2 :             tenant.tenant_shard_id,
     399            2 :             get_key(0),
     400            2 :             Lsn(0x18),
     401            2 :             4 * 1024,
     402            2 :             &ctx,
     403            2 :         )
     404            2 :         .await
     405            2 :         .unwrap();
     406            2 : 
     407            2 :         let mut delta_writer = SplitDeltaLayerWriter::new(
     408            2 :             tenant.conf,
     409            2 :             tline.timeline_id,
     410            2 :             tenant.tenant_shard_id,
     411            2 :             get_key(0),
     412            2 :             Lsn(0x18)..Lsn(0x20),
     413            2 :             4 * 1024,
     414            2 :             &ctx,
     415            2 :         )
     416            2 :         .await
     417            2 :         .unwrap();
     418            2 : 
     419            2 :         image_writer
     420            2 :             .put_image(get_key(0), get_img(0), &tline, &ctx)
     421            2 :             .await
     422            2 :             .unwrap();
     423            2 :         image_writer
     424            2 :             .put_image(get_key(1), get_large_img(), &tline, &ctx)
     425            7 :             .await
     426            2 :             .unwrap();
     427            2 :         let layers = image_writer
     428            2 :             .finish(&tline, &ctx, get_key(10))
     429            4 :             .await
     430            2 :             .unwrap();
     431            2 :         assert_eq!(layers.len(), 2);
     432            2 : 
     433            2 :         delta_writer
     434            2 :             .put_value(
     435            2 :                 get_key(0),
     436            2 :                 Lsn(0x18),
     437            2 :                 Value::Image(get_img(0)),
     438            2 :                 &tline,
     439            2 :                 &ctx,
     440            2 :             )
     441            2 :             .await
     442            2 :             .unwrap();
     443            2 :         delta_writer
     444            2 :             .put_value(
     445            2 :                 get_key(1),
     446            2 :                 Lsn(0x1A),
     447            2 :                 Value::Image(get_large_img()),
     448            2 :                 &tline,
     449            2 :                 &ctx,
     450            2 :             )
     451            6 :             .await
     452            2 :             .unwrap();
     453            2 :         let layers = delta_writer
     454            2 :             .finish(&tline, &ctx, get_key(10))
     455            5 :             .await
     456            2 :             .unwrap();
     457            2 :         assert_eq!(layers.len(), 2);
     458            2 :     }
     459              : }
        

Generated by: LCOV version 2.1-beta