LCOV - code coverage report
Current view: top level - pageserver/compaction/src - compact_tiered.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 69.5 % 511 355
Test Date: 2025-03-12 16:10:49 Functions: 29.7 % 111 33

            Line data    Source code
       1              : //! # Tiered compaction algorithm.
       2              : //!
       3              : //! Read all the input delta files, and write a new set of delta files that
       4              : //! include all the input WAL records. See retile_deltas().
       5              : //!
       6              : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
       7              : //! later values, but in our system, we keep all the history. So the reshuffling
       8              : //! doesn't remove any garbage, it just reshuffles the records to reduce read
       9              : //! amplification, i.e. the number of files that you need to access to find the
      10              : //! WAL records for a given key.
      11              : //!
      12              : //! If the new delta files would be very "narrow", i.e. each file would cover
      13              : //! only a narrow key range, then we create a new set of image files
      14              : //! instead. The current threshold is that if the estimated total size of the
      15              : //! image layers is smaller than the size of the deltas, then we create image
      16              : //! layers. That amounts to 2x storage amplification, and it means that the
      17              : //! distance of image layers in LSN dimension is roughly equal to the logical
      18              : //! database size. For example, if the logical database size is 10 GB, we would
      19              : //! generate new image layers every 10 GB of WAL.
      20              : use std::collections::{HashSet, VecDeque};
      21              : use std::ops::Range;
      22              : 
      23              : use futures::StreamExt;
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use tracing::{debug, info};
      26              : use utils::lsn::Lsn;
      27              : 
      28              : use crate::helpers::{
      29              :     PAGE_SZ, accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with,
      30              : };
      31              : use crate::identify_levels::identify_level;
      32              : use crate::interface::*;
      33              : 
      34              : /// Main entry point to compaction.
      35              : ///
      36              : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
      37              : /// everything below that point, that needs compaction. The cutoff LSN must
      38              : /// partition the layers so that there are no layers that span across that
      39              : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
      40              : /// written last L0 layer.
      41         1000 : pub async fn compact_tiered<E: CompactionJobExecutor>(
      42         1000 :     executor: &mut E,
      43         1000 :     end_lsn: Lsn,
      44         1000 :     target_file_size: u64,
      45         1000 :     fanout: u64,
      46         1000 :     ctx: &E::RequestContext,
      47         1000 : ) -> anyhow::Result<()> {
      48         1000 :     assert!(fanout >= 1, "fanout needs to be at least 1 but is {fanout}");
      49         1000 :     let exp_base = fanout.max(2);
      50         1000 :     // Start at L0
      51         1000 :     let mut current_level_no = 0;
      52         1000 :     let mut current_level_target_height = target_file_size;
      53              :     loop {
      54              :         // end LSN +1 to include possible image layers exactly at 'end_lsn'.
      55         1003 :         let all_layers = executor
      56         1003 :             .get_layers(
      57         1003 :                 &(E::Key::MIN..E::Key::MAX),
      58         1003 :                 &(Lsn(u64::MIN)..end_lsn + 1),
      59         1003 :                 ctx,
      60         1003 :             )
      61         1003 :             .await?;
      62         1003 :         info!(
      63            0 :             "Compacting L{}, total # of layers: {}",
      64            0 :             current_level_no,
      65            0 :             all_layers.len()
      66              :         );
      67              : 
      68              :         // Identify the range of LSNs that belong to this level. We assume that
      69              :         // each file in this level spans an LSN range up to 1.75x target file
      70              :         // size. That should give us enough slop that if we created a slightly
      71              :         // oversized L0 layer, e.g. because flushing the in-memory layer was
      72              :         // delayed for some reason, we don't consider the oversized layer to
      73              :         // belong to L1. But not too much slop, that we don't accidentally
      74              :         // "skip" levels.
      75         1003 :         let max_height = (current_level_target_height as f64 * 1.75) as u64;
      76         1003 :         let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
      77          271 :             break;
      78              :         };
      79              : 
      80              :         // Calculate the height of this level. If the # of tiers exceeds the
      81              :         // fanout parameter, it's time to compact it.
      82          732 :         let depth = level.depth();
      83          732 :         info!(
      84            0 :             "Level {} identified as LSN range {}-{}: depth {}",
      85              :             current_level_no, level.lsn_range.start, level.lsn_range.end, depth
      86              :         );
      87         2157 :         for l in &level.layers {
      88         1425 :             debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
      89              :         }
      90          732 :         if depth < fanout {
      91          729 :             debug!(
      92              :                 level = current_level_no,
      93              :                 depth = depth,
      94              :                 fanout,
      95            0 :                 "too few deltas to compact"
      96              :             );
      97          729 :             break;
      98            3 :         }
      99            3 : 
     100            3 :         compact_level(
     101            3 :             &level.lsn_range,
     102            3 :             &level.layers,
     103            3 :             executor,
     104            3 :             target_file_size,
     105            3 :             ctx,
     106            3 :         )
     107            3 :         .await?;
     108            3 :         if current_level_target_height == u64::MAX {
     109              :             // our target height includes all possible lsns
     110            0 :             info!(
     111              :                 level = current_level_no,
     112              :                 depth = depth,
     113            0 :                 "compaction loop reached max current_level_target_height"
     114              :             );
     115            0 :             break;
     116            3 :         }
     117            3 :         current_level_no += 1;
     118            3 :         current_level_target_height = current_level_target_height.saturating_mul(exp_base);
     119              :     }
     120         1000 :     Ok(())
     121            0 : }
     122              : 
     123            3 : async fn compact_level<E: CompactionJobExecutor>(
     124            3 :     lsn_range: &Range<Lsn>,
     125            3 :     layers: &[E::Layer],
     126            3 :     executor: &mut E,
     127            3 :     target_file_size: u64,
     128            3 :     ctx: &E::RequestContext,
     129            3 : ) -> anyhow::Result<bool> {
     130            3 :     let mut layer_fragments = Vec::new();
     131           50 :     for l in layers {
     132           47 :         layer_fragments.push(LayerFragment::new(l.clone()));
     133           47 :     }
     134              : 
     135            3 :     let mut state = LevelCompactionState {
     136            3 :         shard_identity: *executor.get_shard_identity(),
     137            3 :         target_file_size,
     138            3 :         _lsn_range: lsn_range.clone(),
     139            3 :         layers: layer_fragments,
     140            3 :         jobs: Vec::new(),
     141            3 :         job_queue: Vec::new(),
     142            3 :         next_level: false,
     143            3 :         executor,
     144            3 :     };
     145            3 : 
     146            3 :     let first_job = CompactionJob {
     147            3 :         key_range: E::Key::MIN..E::Key::MAX,
     148            3 :         lsn_range: lsn_range.clone(),
     149            3 :         strategy: CompactionStrategy::Divide,
     150            3 :         input_layers: state
     151            3 :             .layers
     152            3 :             .iter()
     153            3 :             .enumerate()
     154           47 :             .map(|i| LayerId(i.0))
     155            3 :             .collect(),
     156            3 :         completed: false,
     157            3 :     };
     158            3 : 
     159            3 :     state.jobs.push(first_job);
     160            3 :     state.job_queue.push(JobId(0));
     161            3 :     state.execute(ctx).await?;
     162              : 
     163            3 :     info!(
     164            0 :         "compaction completed! Need to process next level: {}",
     165              :         state.next_level
     166              :     );
     167              : 
     168            3 :     Ok(state.next_level)
     169            0 : }
     170              : 
     171              : /// Blackboard that keeps track of the state of all the jobs and work remaining
     172              : struct LevelCompactionState<'a, E>
     173              : where
     174              :     E: CompactionJobExecutor,
     175              : {
     176              :     shard_identity: ShardIdentity,
     177              : 
     178              :     // parameters
     179              :     target_file_size: u64,
     180              : 
     181              :     _lsn_range: Range<Lsn>,
     182              :     layers: Vec<LayerFragment<E>>,
     183              : 
     184              :     // job queue
     185              :     jobs: Vec<CompactionJob<E>>,
     186              :     job_queue: Vec<JobId>,
     187              : 
     188              :     /// If false, no need to compact levels below this
     189              :     next_level: bool,
     190              : 
     191              :     /// Interface to the outside world
     192              :     executor: &'a mut E,
     193              : }
     194              : 
     195              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     196              : struct LayerId(usize);
     197              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     198              : struct JobId(usize);
     199              : 
     200              : struct PendingJobSet {
     201              :     pending: HashSet<JobId>,
     202              :     completed: HashSet<JobId>,
     203              : }
     204              : 
     205              : impl PendingJobSet {
     206           47 :     fn new() -> Self {
     207           47 :         PendingJobSet {
     208           47 :             pending: HashSet::new(),
     209           47 :             completed: HashSet::new(),
     210           47 :         }
     211           47 :     }
     212              : 
     213         1561 :     fn complete_job(&mut self, job_id: JobId) {
     214         1561 :         self.pending.remove(&job_id);
     215         1561 :         self.completed.insert(job_id);
     216         1561 :     }
     217              : 
     218         1561 :     fn all_completed(&self) -> bool {
     219         1561 :         self.pending.is_empty()
     220         1561 :     }
     221              : }
     222              : 
     223              : // When we decide to rewrite a set of layers, LayerFragment is used to keep
     224              : // track which new layers supersede an old layer. When all the stakeholder jobs
     225              : // have completed, this layer can be deleted.
     226              : struct LayerFragment<E>
     227              : where
     228              :     E: CompactionJobExecutor,
     229              : {
     230              :     layer: E::Layer,
     231              : 
     232              :     // If we will write new layers to replace this one, this keeps track of the
     233              :     // jobs that need to complete before this layer can be deleted. As the jobs
     234              :     // complete, they are moved from 'pending' to 'completed' set. Once the
     235              :     // 'pending' set becomes empty, the layer can be deleted.
     236              :     //
     237              :     // If None, this layer is not rewritten and must not be deleted.
     238              :     deletable_after: Option<PendingJobSet>,
     239              : 
     240              :     deleted: bool,
     241              : }
     242              : 
     243              : impl<E> LayerFragment<E>
     244              : where
     245              :     E: CompactionJobExecutor,
     246              : {
     247           47 :     fn new(layer: E::Layer) -> Self {
     248           47 :         LayerFragment {
     249           47 :             layer,
     250           47 :             deletable_after: None,
     251           47 :             deleted: false,
     252           47 :         }
     253           47 :     }
     254              : }
     255              : 
     256              : #[derive(PartialEq)]
     257              : enum CompactionStrategy {
     258              :     Divide,
     259              :     CreateDelta,
     260              :     CreateImage,
     261              : }
     262              : 
     263              : struct CompactionJob<E: CompactionJobExecutor> {
     264              :     key_range: Range<E::Key>,
     265              :     lsn_range: Range<Lsn>,
     266              : 
     267              :     strategy: CompactionStrategy,
     268              : 
     269              :     input_layers: Vec<LayerId>,
     270              : 
     271              :     completed: bool,
     272              : }
     273              : 
     274              : impl<E> LevelCompactionState<'_, E>
     275              : where
     276              :     E: CompactionJobExecutor,
     277              : {
     278              :     /// Main loop of the executor.
     279              :     ///
     280              :     /// In each iteration, we take the next job from the queue, and execute it.
     281              :     /// The execution might add new jobs to the queue. Keep going until the
     282              :     /// queue is empty.
     283              :     ///
     284              :     /// Initially, the job queue consists of one Divide job over the whole
     285              :     /// level. On first call, it is divided into smaller jobs.
     286            3 :     async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
     287              :         // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
     288           55 :         while let Some(next_job_id) = self.job_queue.pop() {
     289           52 :             info!("executing job {}", next_job_id.0);
     290           52 :             self.execute_job(next_job_id, ctx).await?;
     291              :         }
     292              : 
     293              :         // all done!
     294            3 :         Ok(())
     295            0 :     }
     296              : 
     297           52 :     async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     298           52 :         let job = &self.jobs[job_id.0];
     299           52 :         match job.strategy {
     300              :             CompactionStrategy::Divide => {
     301            3 :                 self.divide_job(job_id, ctx).await?;
     302            3 :                 Ok(())
     303              :             }
     304              :             CompactionStrategy::CreateDelta => {
     305           49 :                 let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     306           49 :                 let mut layer_ids: Vec<LayerId> = Vec::new();
     307         1610 :                 for layer_id in &job.input_layers {
     308         1561 :                     let layer = &self.layers[layer_id.0].layer;
     309         1561 :                     if let Some(dl) = self.executor.downcast_delta_layer(layer, ctx).await? {
     310         1561 :                         deltas.push(dl.clone());
     311         1561 :                         layer_ids.push(*layer_id);
     312         1561 :                     }
     313              :                 }
     314              : 
     315           49 :                 self.executor
     316           49 :                     .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
     317           49 :                     .await?;
     318           49 :                 self.jobs[job_id.0].completed = true;
     319              : 
     320              :                 // did we complete any fragments?
     321         1610 :                 for layer_id in layer_ids {
     322         1561 :                     let l = &mut self.layers[layer_id.0];
     323         1561 :                     if let Some(deletable_after) = l.deletable_after.as_mut() {
     324         1561 :                         deletable_after.complete_job(job_id);
     325         1561 :                         if deletable_after.all_completed() {
     326           47 :                             self.executor.delete_layer(&l.layer, ctx).await?;
     327           47 :                             l.deleted = true;
     328            0 :                         }
     329            0 :                     }
     330              :                 }
     331              : 
     332           49 :                 self.next_level = true;
     333           49 : 
     334           49 :                 Ok(())
     335              :             }
     336              :             CompactionStrategy::CreateImage => {
     337            0 :                 self.executor
     338            0 :                     .create_image(job.lsn_range.end, &job.key_range, ctx)
     339            0 :                     .await?;
     340            0 :                 self.jobs[job_id.0].completed = true;
     341            0 : 
     342            0 :                 // TODO: we could check if any layers < PITR horizon became deletable
     343            0 :                 Ok(())
     344              :             }
     345              :         }
     346            0 :     }
     347              : 
     348           49 :     fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
     349           49 :         let job_id = JobId(self.jobs.len());
     350           49 :         self.jobs.push(job);
     351           49 :         self.job_queue.push(job_id);
     352           49 :         job_id
     353           49 :     }
     354              : 
     355              :     /// Take a partition of the key space, and decide how to compact it.
     356              :     ///
     357              :     /// TODO: Currently, this is called exactly once for the level, and we
     358              :     /// decide whether to create new image layers to cover the whole level, or
     359              :     /// write a new set of deltas. In the future, this should try to partition
     360              :     /// the key space, and make the decision separately for each partition.
     361            3 :     async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     362            3 :         let job = &self.jobs[job_id.0];
     363            3 :         assert!(job.strategy == CompactionStrategy::Divide);
     364              : 
     365              :         // Check for dummy cases
     366            3 :         if job.input_layers.is_empty() {
     367            0 :             return Ok(());
     368            3 :         }
     369            3 : 
     370            3 :         let job = &self.jobs[job_id.0];
     371            3 :         assert!(job.strategy == CompactionStrategy::Divide);
     372              : 
     373              :         // Would it be better to create images for this partition?
     374              :         // Decide based on the average density of the level
     375            3 :         let keyspace_size = keyspace_total_size(
     376            3 :             &self
     377            3 :                 .executor
     378            3 :                 .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     379            3 :                 .await?,
     380            3 :             &self.shard_identity,
     381            3 :         ) * PAGE_SZ;
     382            3 : 
     383            3 :         let wal_size = job
     384            3 :             .input_layers
     385            3 :             .iter()
     386           47 :             .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
     387           47 :             .map(|layer_id| self.layers[layer_id.0].layer.file_size())
     388            3 :             .sum::<u64>();
     389            3 :         if keyspace_size < wal_size {
     390              :             // seems worth it
     391            0 :             info!(
     392            0 :                 "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
     393              :                 keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
     394              :             );
     395            0 :             self.cover_with_images(job_id, ctx).await
     396              :         } else {
     397              :             // do deltas
     398            3 :             info!(
     399            0 :                 "coverage not worth it, keyspace_size {}, wal_size {}",
     400              :                 keyspace_size, wal_size
     401              :             );
     402            3 :             self.retile_deltas(job_id, ctx).await
     403              :         }
     404            0 :     }
     405              : 
     406              :     // LSN
     407              :     //  ^
     408              :     //  |
     409              :     //  |                          ###|###|#####
     410              :     //  | +--+-----+--+            +--+-----+--+
     411              :     //  | |  |     |  |            |  |     |  |
     412              :     //  | +--+--+--+--+            +--+--+--+--+
     413              :     //  | |     |     |            |     |     |
     414              :     //  | +---+-+-+---+     ==>    +---+-+-+---+
     415              :     //  | |   |   |   |            |   |   |   |
     416              :     //  | +---+-+-++--+            +---+-+-++--+
     417              :     //  | |     |  |  |            |     |  |  |
     418              :     //  | +-----+--+--+            +-----+--+--+
     419              :     //  |
     420              :     //  +--------------> key
     421              :     //
     422            0 :     async fn cover_with_images(
     423            0 :         &mut self,
     424            0 :         job_id: JobId,
     425            0 :         ctx: &E::RequestContext,
     426            0 :     ) -> anyhow::Result<()> {
     427            0 :         let job = &self.jobs[job_id.0];
     428            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     429              : 
     430              :         // XXX: do we still need the "holes" stuff?
     431              : 
     432            0 :         let mut new_jobs = Vec::new();
     433              : 
     434              :         // Slide a window through the keyspace
     435            0 :         let keyspace = self
     436            0 :             .executor
     437            0 :             .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     438            0 :             .await?;
     439              : 
     440            0 :         let mut window = KeyspaceWindow::new(
     441            0 :             E::Key::MIN..E::Key::MAX,
     442            0 :             keyspace,
     443            0 :             self.target_file_size / PAGE_SZ,
     444            0 :         );
     445            0 :         while let Some(key_range) = window.choose_next_image(&self.shard_identity) {
     446            0 :             new_jobs.push(CompactionJob::<E> {
     447            0 :                 key_range,
     448            0 :                 lsn_range: job.lsn_range.clone(),
     449            0 :                 strategy: CompactionStrategy::CreateImage,
     450            0 :                 input_layers: Vec::new(), // XXX: Is it OK for  this to be empty for image layer?
     451            0 :                 completed: false,
     452            0 :             });
     453            0 :         }
     454              : 
     455            0 :         for j in new_jobs.into_iter().rev() {
     456            0 :             let _job_id = self.push_job(j);
     457            0 : 
     458            0 :             // TODO: image layers don't let us delete anything. unless < PITR horizon
     459            0 :             //let j = &self.jobs[job_id.0];
     460            0 :             // for layer_id in j.input_layers.iter() {
     461            0 :             //    self.layers[layer_id.0].pending_stakeholders.insert(job_id);
     462            0 :             //}
     463            0 :         }
     464              : 
     465            0 :         Ok(())
     466            0 :     }
     467              : 
     468              :     // Merge the contents of all the input delta layers into a new set
     469              :     // of delta layers, based on the current partitioning.
     470              :     //
     471              :     // We split the new delta layers on the key dimension. We iterate through
     472              :     // the key space, and for each key, check if including the next key to the
     473              :     // current output layer we're building would cause the layer to become too
     474              :     // large. If so, dump the current output layer and start new one.  It's
     475              :     // possible that there is a single key with so many page versions that
     476              :     // storing all of them in a single layer file would be too large. In that
     477              :     // case, we also split on the LSN dimension.
     478              :     //
     479              :     // LSN
     480              :     //  ^
     481              :     //  |
     482              :     //  | +-----------+            +--+--+--+--+
     483              :     //  | |           |            |  |  |  |  |
     484              :     //  | +-----------+            |  |  |  |  |
     485              :     //  | |           |            |  |  |  |  |
     486              :     //  | +-----------+     ==>    |  |  |  |  |
     487              :     //  | |           |            |  |  |  |  |
     488              :     //  | +-----------+            |  |  |  |  |
     489              :     //  | |           |            |  |  |  |  |
     490              :     //  | +-----------+            +--+--+--+--+
     491              :     //  |
     492              :     //  +--------------> key
     493              :     //
     494              :     //
     495              :     // If one key (X) has a lot of page versions:
     496              :     //
     497              :     // LSN
     498              :     //  ^
     499              :     //  |                                 (X)
     500              :     //  | +-----------+            +--+--+--+--+
     501              :     //  | |           |            |  |  |  |  |
     502              :     //  | +-----------+            |  |  +--+  |
     503              :     //  | |           |            |  |  |  |  |
     504              :     //  | +-----------+     ==>    |  |  |  |  |
     505              :     //  | |           |            |  |  +--+  |
     506              :     //  | +-----------+            |  |  |  |  |
     507              :     //  | |           |            |  |  |  |  |
     508              :     //  | +-----------+            +--+--+--+--+
     509              :     //  |
     510              :     //  +--------------> key
     511              :     //
     512              :     // TODO: this actually divides the layers into fixed-size chunks, not
     513              :     // based on the partitioning.
     514              :     //
     515              :     // TODO: we should also opportunistically materialize and
     516              :     // garbage collect what we can.
     517            3 :     async fn retile_deltas(
     518            3 :         &mut self,
     519            3 :         job_id: JobId,
     520            3 :         ctx: &E::RequestContext,
     521            3 :     ) -> anyhow::Result<()> {
     522            3 :         let job = &self.jobs[job_id.0];
     523            3 :         assert!(job.strategy == CompactionStrategy::Divide);
     524              : 
     525              :         // Sweep the key space left to right, running an estimate of how much
     526              :         // disk size and keyspace we have accumulated
     527              :         //
     528              :         // Once the disk size reaches the target threshold, stop and think.
     529              :         // If we have accumulated only a narrow band of keyspace, create an
     530              :         // image layer. Otherwise write a delta layer.
     531              : 
     532              :         // FIXME: we are ignoring images here. Did we already divide the work
     533              :         // so that we won't encounter them here?
     534              : 
     535            3 :         let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     536           50 :         for layer_id in &job.input_layers {
     537           47 :             let l = &self.layers[layer_id.0];
     538           47 :             if let Some(dl) = self.executor.downcast_delta_layer(&l.layer, ctx).await? {
     539           47 :                 deltas.push(dl.clone());
     540           47 :             }
     541              :         }
     542              :         // Open stream
     543            3 :         let key_value_stream = std::pin::pin!(
     544            3 :             merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
     545            3 :                 .await?
     546            3 :                 .map(Result::<_, anyhow::Error>::Ok)
     547            3 :         );
     548            3 :         let mut new_jobs = Vec::new();
     549            3 : 
     550            3 :         // Slide a window through the keyspace
     551            3 :         let mut key_accum =
     552            3 :             std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
     553            3 :         let mut all_in_window: bool = false;
     554            3 :         let mut window = Window::new();
     555            3 : 
     556            3 :         // Helper function to create a job for a new delta layer with given key-lsn
     557            3 :         // rectangle.
     558           49 :         let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
     559           49 :             // The inputs for the job are all the input layers of the original job that
     560           49 :             // overlap with the rectangle.
     561           49 :             let batch_layers: Vec<LayerId> = job
     562           49 :                 .input_layers
     563           49 :                 .iter()
     564         1561 :                 .filter(|layer_id| {
     565         1561 :                     overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
     566         1561 :                 })
     567           49 :                 .cloned()
     568           49 :                 .collect();
     569           49 :             assert!(!batch_layers.is_empty());
     570           49 :             new_jobs.push(CompactionJob {
     571           49 :                 key_range,
     572           49 :                 lsn_range: lsn_range.clone(),
     573           49 :                 strategy: CompactionStrategy::CreateDelta,
     574           49 :                 input_layers: batch_layers,
     575           49 :                 completed: false,
     576           49 :             });
     577           49 :         };
     578              : 
     579              :         loop {
     580        93418 :             if all_in_window && window.is_empty() {
     581              :                 // All done!
     582            3 :                 break;
     583            0 :             }
     584              : 
     585              :             // If we now have enough keyspace for next delta layer in the window, create a
     586              :             // new delta layer
     587        93415 :             if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
     588              :             {
     589           41 :                 create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
     590           41 :                 continue;
     591        93374 :             }
     592        93374 :             assert!(!all_in_window);
     593              : 
     594              :             // Process next key in the key space
     595        93374 :             match key_accum.next().await.transpose()? {
     596            3 :                 None => {
     597            3 :                     all_in_window = true;
     598            3 :                 }
     599        93371 :                 Some(next_key) if next_key.partition_lsns.is_empty() => {
     600        93369 :                     // Normal case: extend the window by the key
     601        93369 :                     window.feed(next_key.key, next_key.size);
     602        93369 :                 }
     603            2 :                 Some(next_key) => {
     604            2 :                     // A key with too large size impact for a single delta layer. This
     605            2 :                     // case occurs if you make a huge number of updates for a single key.
     606            2 :                     //
     607            2 :                     // Drain the window with has_more = false to make a clean cut before
     608            2 :                     // the key, and then make dedicated delta layers for the single key.
     609            2 :                     //
     610            2 :                     // We cannot cluster the key with the others, because we don't want
     611            2 :                     // layer files to overlap with each other in the lsn,key space (no
     612            2 :                     // overlaps for the rectangles).
     613            2 :                     let key = next_key.key;
     614            2 :                     debug!("key {key} with size impact larger than the layer size");
     615            2 :                     while !window.is_empty() {
     616            0 :                         let has_more = false;
     617            0 :                         let key_range = window.choose_next_delta(self.target_file_size, has_more)
     618            0 :                             .expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
     619            0 :                         create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
     620            0 :                     }
     621              : 
     622              :                     // Not really required: but here for future resilience:
     623              :                     // We make a "gap" here, so any structure the window holds should
     624              :                     // probably be reset.
     625            2 :                     window = Window::new();
     626            2 : 
     627            2 :                     let mut prior_lsn = job.lsn_range.start;
     628            2 :                     let mut lsn_ranges = Vec::new();
     629            6 :                     for (lsn, _size) in next_key.partition_lsns.iter() {
     630            6 :                         lsn_ranges.push(prior_lsn..*lsn);
     631            6 :                         prior_lsn = *lsn;
     632            6 :                     }
     633            2 :                     lsn_ranges.push(prior_lsn..job.lsn_range.end);
     634           10 :                     for lsn_range in lsn_ranges {
     635            8 :                         let key_range = key..key.next();
     636            8 :                         create_delta_job(key_range, &lsn_range, &mut new_jobs);
     637            8 :                     }
     638              :                 }
     639              :             }
     640              :         }
     641              : 
     642              :         // All the input files are rewritten. Set up the tracking for when they can
     643              :         // be deleted.
     644           47 :         for layer_id in job.input_layers.iter() {
     645           47 :             let l = &mut self.layers[layer_id.0];
     646           47 :             assert!(l.deletable_after.is_none());
     647           47 :             l.deletable_after = Some(PendingJobSet::new());
     648              :         }
     649           49 :         for j in new_jobs.into_iter().rev() {
     650           49 :             let job_id = self.push_job(j);
     651           49 :             let j = &self.jobs[job_id.0];
     652         1561 :             for layer_id in j.input_layers.iter() {
     653         1561 :                 self.layers[layer_id.0]
     654         1561 :                     .deletable_after
     655         1561 :                     .as_mut()
     656         1561 :                     .unwrap()
     657         1561 :                     .pending
     658         1561 :                     .insert(job_id);
     659         1561 :             }
     660              :         }
     661              : 
     662            3 :         Ok(())
     663            0 :     }
     664              : }
     665              : 
     666              : /// Sliding window through keyspace and values for image layer
     667              : /// This is used by [`LevelCompactionState::cover_with_images`] to decide on good split points
     668              : struct KeyspaceWindow<K> {
     669              :     head: KeyspaceWindowHead<K>,
     670              : 
     671              :     start_pos: KeyspaceWindowPos<K>,
     672              : }
     673              : struct KeyspaceWindowHead<K> {
     674              :     // overall key range to cover
     675              :     key_range: Range<K>,
     676              : 
     677              :     keyspace: Vec<Range<K>>,
     678              :     target_keysize: u64,
     679              : }
     680              : 
     681              : #[derive(Clone)]
     682              : struct KeyspaceWindowPos<K> {
     683              :     end_key: K,
     684              : 
     685              :     keyspace_idx: usize,
     686              : 
     687              :     accum_keysize: u64,
     688              : }
     689              : impl<K: CompactionKey> KeyspaceWindowPos<K> {
     690            0 :     fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
     691            0 :         self.keyspace_idx == w.keyspace.len()
     692            0 :     }
     693              : 
     694              :     // Advance the cursor until it reaches 'target_keysize'.
     695            0 :     fn advance_until_size(
     696            0 :         &mut self,
     697            0 :         w: &KeyspaceWindowHead<K>,
     698            0 :         max_size: u64,
     699            0 :         shard_identity: &ShardIdentity,
     700            0 :     ) {
     701            0 :         while self.accum_keysize < max_size && !self.reached_end(w) {
     702            0 :             let curr_range = &w.keyspace[self.keyspace_idx];
     703            0 :             if self.end_key < curr_range.start {
     704            0 :                 // skip over any unused space
     705            0 :                 self.end_key = curr_range.start;
     706            0 :             }
     707              : 
     708              :             // We're now within 'curr_range'. Can we advance past it completely?
     709            0 :             let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity);
     710            0 :             if (self.accum_keysize + distance as u64) < max_size {
     711            0 :                 // oh yeah, it fits
     712            0 :                 self.end_key = curr_range.end;
     713            0 :                 self.keyspace_idx += 1;
     714            0 :                 self.accum_keysize += distance as u64;
     715            0 :             } else {
     716              :                 // advance within the range
     717            0 :                 let skip_key = self.end_key.skip_some();
     718            0 :                 let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity);
     719            0 :                 if (self.accum_keysize + distance as u64) < max_size {
     720            0 :                     self.end_key = skip_key;
     721            0 :                     self.accum_keysize += distance as u64;
     722            0 :                 } else {
     723            0 :                     self.end_key = self.end_key.next();
     724            0 :                     self.accum_keysize += 1;
     725            0 :                 }
     726              :             }
     727              :         }
     728            0 :     }
     729              : }
     730              : 
     731              : impl<K> KeyspaceWindow<K>
     732              : where
     733              :     K: CompactionKey,
     734              : {
     735            0 :     fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
     736            0 :         assert!(keyspace.first().unwrap().start >= key_range.start);
     737              : 
     738            0 :         let start_key = key_range.start;
     739            0 :         let start_pos = KeyspaceWindowPos::<K> {
     740            0 :             end_key: start_key,
     741            0 :             keyspace_idx: 0,
     742            0 :             accum_keysize: 0,
     743            0 :         };
     744            0 :         Self {
     745            0 :             head: KeyspaceWindowHead::<K> {
     746            0 :                 key_range,
     747            0 :                 keyspace,
     748            0 :                 target_keysize,
     749            0 :             },
     750            0 :             start_pos,
     751            0 :         }
     752            0 :     }
     753              : 
     754            0 :     fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option<Range<K>> {
     755            0 :         if self.start_pos.keyspace_idx == self.head.keyspace.len() {
     756              :             // we've reached the end
     757            0 :             return None;
     758            0 :         }
     759            0 : 
     760            0 :         let mut next_pos = self.start_pos.clone();
     761            0 :         next_pos.advance_until_size(
     762            0 :             &self.head,
     763            0 :             self.start_pos.accum_keysize + self.head.target_keysize,
     764            0 :             shard_identity,
     765            0 :         );
     766            0 : 
     767            0 :         // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
     768            0 :         // 1.25x target size
     769            0 :         let mut end_pos = next_pos.clone();
     770            0 :         end_pos.advance_until_size(
     771            0 :             &self.head,
     772            0 :             self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
     773            0 :             shard_identity,
     774            0 :         );
     775            0 :         if end_pos.reached_end(&self.head) {
     776              :             // gobble up any unused keyspace between the last used key and end of the range
     777            0 :             assert!(end_pos.end_key <= self.head.key_range.end);
     778            0 :             end_pos.end_key = self.head.key_range.end;
     779            0 :             next_pos = end_pos;
     780            0 :         }
     781              : 
     782            0 :         let start_key = self.start_pos.end_key;
     783            0 :         self.start_pos = next_pos;
     784            0 :         Some(start_key..self.start_pos.end_key)
     785            0 :     }
     786              : }
     787              : 
     788              : // Take previous partitioning, based on the image layers below.
     789              : //
     790              : // Candidate is at the front:
     791              : //
     792              : // Consider stretching an image layer to next divider? If it's close enough,
     793              : // that's the image candidate
     794              : //
     795              : // If it's too far, consider splitting at a reasonable point
     796              : //
     797              : // Is the image candidate smaller than the equivalent delta? If so,
     798              : // split off the image. Otherwise, split off one delta.
     799              : // Try to snap off the delta at a reasonable point
     800              : 
     801              : struct WindowElement<K> {
     802              :     start_key: K, // inclusive
     803              :     last_key: K,  // inclusive
     804              :     accum_size: u64,
     805              : }
     806              : 
     807              : /// Sliding window through keyspace and values for delta layer tiling
     808              : ///
     809              : /// This is used to decide which delta layer to write next.
     810              : struct Window<K> {
     811              :     elems: VecDeque<WindowElement<K>>,
     812              : 
     813              :     // last key that was split off, inclusive
     814              :     splitoff_key: Option<K>,
     815              :     splitoff_size: u64,
     816              : }
     817              : 
     818              : impl<K> Window<K>
     819              : where
     820              :     K: CompactionKey,
     821              : {
     822            5 :     fn new() -> Self {
     823            5 :         Self {
     824            5 :             elems: VecDeque::new(),
     825            5 :             splitoff_key: None,
     826            5 :             splitoff_size: 0,
     827            5 :         }
     828            5 :     }
     829              : 
     830        93369 :     fn feed(&mut self, key: K, size: u64) {
     831              :         let last_size;
     832        93369 :         if let Some(last) = self.elems.back_mut() {
     833              :             // We require the keys to be strictly increasing for the window.
     834              :             // Keys should already have been deduplicated by `accum_key_values`
     835        93366 :             assert!(
     836        93366 :                 last.last_key < key,
     837            0 :                 "last_key(={}) >= key(={key})",
     838              :                 last.last_key
     839              :             );
     840        93366 :             last_size = last.accum_size;
     841            3 :         } else {
     842            3 :             last_size = 0;
     843            3 :         }
     844              :         // This is a new key.
     845        93369 :         let elem = WindowElement {
     846        93369 :             start_key: key,
     847        93369 :             last_key: key,
     848        93369 :             accum_size: last_size + size,
     849        93369 :         };
     850        93369 :         self.elems.push_back(elem);
     851        93369 :     }
     852              : 
     853         7957 :     fn remain_size(&self) -> u64 {
     854         7957 :         self.elems.back().unwrap().accum_size - self.splitoff_size
     855         7957 :     }
     856              : 
     857       186714 :     fn peek_size(&self) -> u64 {
     858       186714 :         self.elems.front().unwrap().accum_size - self.splitoff_size
     859       186714 :     }
     860              : 
     861            8 :     fn is_empty(&self) -> bool {
     862            8 :         self.elems.is_empty()
     863            8 :     }
     864              : 
     865        93345 :     fn commit_upto(&mut self, mut upto: usize) {
     866       186673 :         while upto > 1 {
     867        93328 :             let popped = self.elems.pop_front().unwrap();
     868        93328 :             self.elems.front_mut().unwrap().start_key = popped.start_key;
     869        93328 :             upto -= 1;
     870        93328 :         }
     871            0 :     }
     872              : 
     873           38 :     fn find_size_split(&self, target_size: u64) -> usize {
     874           38 :         self.elems
     875          322 :             .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
     876           38 :     }
     877              : 
     878           41 :     fn pop(&mut self) {
     879           41 :         let first = self.elems.pop_front().unwrap();
     880           41 :         self.splitoff_size = first.accum_size;
     881           41 : 
     882           41 :         self.splitoff_key = Some(first.last_key);
     883           41 :     }
     884              : 
     885              :     // the difference between delta and image is that an image covers
     886              :     // any unused keyspace before and after, while a delta tries to
     887              :     // minimize that. TODO: difference not implemented
     888           41 :     fn pop_delta(&mut self) -> Range<K> {
     889           41 :         let first = self.elems.front().unwrap();
     890           41 :         let key_range = first.start_key..first.last_key.next();
     891           41 : 
     892           41 :         self.pop();
     893           41 :         key_range
     894           41 :     }
     895              : 
     896              :     // Prerequisite: we have enough input in the window
     897              :     //
     898              :     // On return None, the caller should feed more data and call again
     899        93415 :     fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
     900        93415 :         if has_more && self.elems.is_empty() {
     901              :             // Starting up
     902            5 :             return None;
     903            0 :         }
     904              : 
     905              :         // If we still have an undersized candidate, just keep going
     906       186714 :         while self.peek_size() < target_size {
     907       178759 :             if self.elems.len() > 1 {
     908        93304 :                 self.commit_upto(2);
     909        93304 :             } else if has_more {
     910        85453 :                 return None;
     911              :             } else {
     912            2 :                 break;
     913              :             }
     914              :         }
     915              : 
     916              :         // Ensure we have enough input in the window to make a good decision
     917         7957 :         if has_more && self.remain_size() < target_size * 5 / 4 {
     918         7916 :             return None;
     919           41 :         }
     920           41 : 
     921           41 :         // The candidate on the front is now large enough, for a delta.
     922           41 :         // And we have enough data in the window to decide.
     923           41 : 
     924           41 :         // If we're willing to stretch it up to 1.25 target size, could we
     925           41 :         // gobble up the rest of the work? This avoids creating very small
     926           41 :         // "tail" layers at the end of the keyspace
     927           41 :         if !has_more && self.remain_size() < target_size * 5 / 4 {
     928            3 :             self.commit_upto(self.elems.len());
     929            3 :         } else {
     930           38 :             let delta_split_at = self.find_size_split(target_size);
     931           38 :             self.commit_upto(delta_split_at);
     932           38 : 
     933           38 :             // If it's still not large enough, request the caller to fill the window
     934           38 :             if self.elems.len() == 1 && has_more {
     935            0 :                 return None;
     936            0 :             }
     937              :         }
     938           41 :         Some(self.pop_delta())
     939            0 :     }
     940              : }
        

Generated by: LCOV version 2.1-beta