LCOV - code coverage report
Current view: top level - pageserver/compaction/src - compact_tiered.rs (source / functions) Coverage Total Hit
Test: b9d67f908f91f00e353a27440ba89f642a869959.info Lines: 70.8 % 510 361
Test Date: 2024-11-19 21:44:13 Functions: 29.7 % 111 33

            Line data    Source code
       1              : //! # Tiered compaction algorithm.
       2              : //!
       3              : //! Read all the input delta files, and write a new set of delta files that
       4              : //! include all the input WAL records. See retile_deltas().
       5              : //!
       6              : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
       7              : //! later values, but in our system, we keep all the history. So the reshuffling
       8              : //! doesn't remove any garbage, it just reshuffles the records to reduce read
       9              : //! amplification, i.e. the number of files that you need to access to find the
      10              : //! WAL records for a given key.
      11              : //!
      12              : //! If the new delta files would be very "narrow", i.e. each file would cover
      13              : //! only a narrow key range, then we create a new set of image files
      14              : //! instead. The current threshold is that if the estimated total size of the
      15              : //! image layers is smaller than the size of the deltas, then we create image
      16              : //! layers. That amounts to 2x storage amplification, and it means that the
      17              : //! distance of image layers in LSN dimension is roughly equal to the logical
      18              : //! database size. For example, if the logical database size is 10 GB, we would
      19              : //! generate new image layers every 10 GB of WAL.
      20              : use futures::StreamExt;
      21              : use pageserver_api::shard::ShardIdentity;
      22              : use tracing::{debug, info};
      23              : 
      24              : use std::collections::{HashSet, VecDeque};
      25              : use std::ops::Range;
      26              : 
      27              : use crate::helpers::{
      28              :     accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with, PAGE_SZ,
      29              : };
      30              : use crate::interface::*;
      31              : use utils::lsn::Lsn;
      32              : 
      33              : use crate::identify_levels::identify_level;
      34              : 
      35              : /// Main entry point to compaction.
      36              : ///
      37              : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
      38              : /// everything below that point, that needs compaction. The cutoff LSN must
      39              : /// partition the layers so that there are no layers that span across that
      40              : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
      41              : /// written last L0 layer.
      42         1000 : pub async fn compact_tiered<E: CompactionJobExecutor>(
      43         1000 :     executor: &mut E,
      44         1000 :     end_lsn: Lsn,
      45         1000 :     target_file_size: u64,
      46         1000 :     fanout: u64,
      47         1000 :     ctx: &E::RequestContext,
      48         1000 : ) -> anyhow::Result<()> {
      49         1000 :     assert!(fanout >= 1, "fanout needs to be at least 1 but is {fanout}");
      50         1000 :     let exp_base = fanout.max(2);
      51         1000 :     // Start at L0
      52         1000 :     let mut current_level_no = 0;
      53         1000 :     let mut current_level_target_height = target_file_size;
      54              :     loop {
      55              :         // end LSN +1 to include possible image layers exactly at 'end_lsn'.
      56         1003 :         let all_layers = executor
      57         1003 :             .get_layers(
      58         1003 :                 &(E::Key::MIN..E::Key::MAX),
      59         1003 :                 &(Lsn(u64::MIN)..end_lsn + 1),
      60         1003 :                 ctx,
      61         1003 :             )
      62            0 :             .await?;
      63         1003 :         info!(
      64            0 :             "Compacting L{}, total # of layers: {}",
      65            0 :             current_level_no,
      66            0 :             all_layers.len()
      67              :         );
      68              : 
      69              :         // Identify the range of LSNs that belong to this level. We assume that
      70              :         // each file in this level spans an LSN range up to 1.75x target file
      71              :         // size. That should give us enough slop that if we created a slightly
      72              :         // oversized L0 layer, e.g. because flushing the in-memory layer was
      73              :         // delayed for some reason, we don't consider the oversized layer to
      74              :         // belong to L1. But not too much slop, that we don't accidentally
      75              :         // "skip" levels.
      76         1003 :         let max_height = (current_level_target_height as f64 * 1.75) as u64;
      77         1003 :         let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
      78          271 :             break;
      79              :         };
      80              : 
      81              :         // Calculate the height of this level. If the # of tiers exceeds the
      82              :         // fanout parameter, it's time to compact it.
      83          732 :         let depth = level.depth();
      84          732 :         info!(
      85            0 :             "Level {} identified as LSN range {}-{}: depth {}",
      86              :             current_level_no, level.lsn_range.start, level.lsn_range.end, depth
      87              :         );
      88         2157 :         for l in &level.layers {
      89         1425 :             debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
      90              :         }
      91          732 :         if depth < fanout {
      92          729 :             debug!(
      93              :                 level = current_level_no,
      94              :                 depth = depth,
      95              :                 fanout,
      96            0 :                 "too few deltas to compact"
      97              :             );
      98          729 :             break;
      99            3 :         }
     100            3 : 
     101            3 :         compact_level(
     102            3 :             &level.lsn_range,
     103            3 :             &level.layers,
     104            3 :             executor,
     105            3 :             target_file_size,
     106            3 :             ctx,
     107            3 :         )
     108            0 :         .await?;
     109            3 :         if current_level_target_height == u64::MAX {
     110              :             // our target height includes all possible lsns
     111            0 :             info!(
     112              :                 level = current_level_no,
     113              :                 depth = depth,
     114            0 :                 "compaction loop reached max current_level_target_height"
     115              :             );
     116            0 :             break;
     117            3 :         }
     118            3 :         current_level_no += 1;
     119            3 :         current_level_target_height = current_level_target_height.saturating_mul(exp_base);
     120              :     }
     121         1000 :     Ok(())
     122         1000 : }
     123              : 
     124            3 : async fn compact_level<E: CompactionJobExecutor>(
     125            3 :     lsn_range: &Range<Lsn>,
     126            3 :     layers: &[E::Layer],
     127            3 :     executor: &mut E,
     128            3 :     target_file_size: u64,
     129            3 :     ctx: &E::RequestContext,
     130            3 : ) -> anyhow::Result<bool> {
     131            3 :     let mut layer_fragments = Vec::new();
     132           50 :     for l in layers {
     133           47 :         layer_fragments.push(LayerFragment::new(l.clone()));
     134           47 :     }
     135              : 
     136            3 :     let mut state = LevelCompactionState {
     137            3 :         shard_identity: *executor.get_shard_identity(),
     138            3 :         target_file_size,
     139            3 :         _lsn_range: lsn_range.clone(),
     140            3 :         layers: layer_fragments,
     141            3 :         jobs: Vec::new(),
     142            3 :         job_queue: Vec::new(),
     143            3 :         next_level: false,
     144            3 :         executor,
     145            3 :     };
     146            3 : 
     147            3 :     let first_job = CompactionJob {
     148            3 :         key_range: E::Key::MIN..E::Key::MAX,
     149            3 :         lsn_range: lsn_range.clone(),
     150            3 :         strategy: CompactionStrategy::Divide,
     151            3 :         input_layers: state
     152            3 :             .layers
     153            3 :             .iter()
     154            3 :             .enumerate()
     155           47 :             .map(|i| LayerId(i.0))
     156            3 :             .collect(),
     157            3 :         completed: false,
     158            3 :     };
     159            3 : 
     160            3 :     state.jobs.push(first_job);
     161            3 :     state.job_queue.push(JobId(0));
     162            3 :     state.execute(ctx).await?;
     163              : 
     164            3 :     info!(
     165            0 :         "compaction completed! Need to process next level: {}",
     166              :         state.next_level
     167              :     );
     168              : 
     169            3 :     Ok(state.next_level)
     170            3 : }
     171              : 
     172              : /// Blackboard that keeps track of the state of all the jobs and work remaining
     173              : struct LevelCompactionState<'a, E>
     174              : where
     175              :     E: CompactionJobExecutor,
     176              : {
     177              :     shard_identity: ShardIdentity,
     178              : 
     179              :     // parameters
     180              :     target_file_size: u64,
     181              : 
     182              :     _lsn_range: Range<Lsn>,
     183              :     layers: Vec<LayerFragment<E>>,
     184              : 
     185              :     // job queue
     186              :     jobs: Vec<CompactionJob<E>>,
     187              :     job_queue: Vec<JobId>,
     188              : 
     189              :     /// If false, no need to compact levels below this
     190              :     next_level: bool,
     191              : 
     192              :     /// Interface to the outside world
     193              :     executor: &'a mut E,
     194              : }
     195              : 
     196              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     197              : struct LayerId(usize);
     198              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     199              : struct JobId(usize);
     200              : 
     201              : struct PendingJobSet {
     202              :     pending: HashSet<JobId>,
     203              :     completed: HashSet<JobId>,
     204              : }
     205              : 
     206              : impl PendingJobSet {
     207           47 :     fn new() -> Self {
     208           47 :         PendingJobSet {
     209           47 :             pending: HashSet::new(),
     210           47 :             completed: HashSet::new(),
     211           47 :         }
     212           47 :     }
     213              : 
     214         1561 :     fn complete_job(&mut self, job_id: JobId) {
     215         1561 :         self.pending.remove(&job_id);
     216         1561 :         self.completed.insert(job_id);
     217         1561 :     }
     218              : 
     219         1561 :     fn all_completed(&self) -> bool {
     220         1561 :         self.pending.is_empty()
     221         1561 :     }
     222              : }
     223              : 
     224              : // When we decide to rewrite a set of layers, LayerFragment is used to keep
     225              : // track which new layers supersede an old layer. When all the stakeholder jobs
     226              : // have completed, this layer can be deleted.
     227              : struct LayerFragment<E>
     228              : where
     229              :     E: CompactionJobExecutor,
     230              : {
     231              :     layer: E::Layer,
     232              : 
     233              :     // If we will write new layers to replace this one, this keeps track of the
     234              :     // jobs that need to complete before this layer can be deleted. As the jobs
     235              :     // complete, they are moved from 'pending' to 'completed' set. Once the
     236              :     // 'pending' set becomes empty, the layer can be deleted.
     237              :     //
     238              :     // If None, this layer is not rewritten and must not be deleted.
     239              :     deletable_after: Option<PendingJobSet>,
     240              : 
     241              :     deleted: bool,
     242              : }
     243              : 
     244              : impl<E> LayerFragment<E>
     245              : where
     246              :     E: CompactionJobExecutor,
     247              : {
     248           47 :     fn new(layer: E::Layer) -> Self {
     249           47 :         LayerFragment {
     250           47 :             layer,
     251           47 :             deletable_after: None,
     252           47 :             deleted: false,
     253           47 :         }
     254           47 :     }
     255              : }
     256              : 
     257              : #[derive(PartialEq)]
     258              : enum CompactionStrategy {
     259              :     Divide,
     260              :     CreateDelta,
     261              :     CreateImage,
     262              : }
     263              : 
     264              : struct CompactionJob<E: CompactionJobExecutor> {
     265              :     key_range: Range<E::Key>,
     266              :     lsn_range: Range<Lsn>,
     267              : 
     268              :     strategy: CompactionStrategy,
     269              : 
     270              :     input_layers: Vec<LayerId>,
     271              : 
     272              :     completed: bool,
     273              : }
     274              : 
     275              : impl<'a, E> LevelCompactionState<'a, E>
     276              : where
     277              :     E: CompactionJobExecutor,
     278              : {
     279              :     /// Main loop of the executor.
     280              :     ///
     281              :     /// In each iteration, we take the next job from the queue, and execute it.
     282              :     /// The execution might add new jobs to the queue. Keep going until the
     283              :     /// queue is empty.
     284              :     ///
     285              :     /// Initially, the job queue consists of one Divide job over the whole
     286              :     /// level. On first call, it is divided into smaller jobs.
     287            3 :     async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
     288              :         // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
     289           55 :         while let Some(next_job_id) = self.job_queue.pop() {
     290           52 :             info!("executing job {}", next_job_id.0);
     291           52 :             self.execute_job(next_job_id, ctx).await?;
     292              :         }
     293              : 
     294              :         // all done!
     295            3 :         Ok(())
     296            3 :     }
     297              : 
     298           52 :     async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     299           52 :         let job = &self.jobs[job_id.0];
     300           52 :         match job.strategy {
     301              :             CompactionStrategy::Divide => {
     302            3 :                 self.divide_job(job_id, ctx).await?;
     303            3 :                 Ok(())
     304              :             }
     305              :             CompactionStrategy::CreateDelta => {
     306           49 :                 let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     307           49 :                 let mut layer_ids: Vec<LayerId> = Vec::new();
     308         1610 :                 for layer_id in &job.input_layers {
     309         1561 :                     let layer = &self.layers[layer_id.0].layer;
     310         1561 :                     if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
     311         1561 :                         deltas.push(dl.clone());
     312         1561 :                         layer_ids.push(*layer_id);
     313         1561 :                     }
     314              :                 }
     315              : 
     316           49 :                 self.executor
     317           49 :                     .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
     318            0 :                     .await?;
     319           49 :                 self.jobs[job_id.0].completed = true;
     320              : 
     321              :                 // did we complete any fragments?
     322         1610 :                 for layer_id in layer_ids {
     323         1561 :                     let l = &mut self.layers[layer_id.0];
     324         1561 :                     if let Some(deletable_after) = l.deletable_after.as_mut() {
     325         1561 :                         deletable_after.complete_job(job_id);
     326         1561 :                         if deletable_after.all_completed() {
     327           47 :                             self.executor.delete_layer(&l.layer, ctx).await?;
     328           47 :                             l.deleted = true;
     329         1514 :                         }
     330            0 :                     }
     331              :                 }
     332              : 
     333           49 :                 self.next_level = true;
     334           49 : 
     335           49 :                 Ok(())
     336              :             }
     337              :             CompactionStrategy::CreateImage => {
     338            0 :                 self.executor
     339            0 :                     .create_image(job.lsn_range.end, &job.key_range, ctx)
     340            0 :                     .await?;
     341            0 :                 self.jobs[job_id.0].completed = true;
     342            0 : 
     343            0 :                 // TODO: we could check if any layers < PITR horizon became deletable
     344            0 :                 Ok(())
     345              :             }
     346              :         }
     347           52 :     }
     348              : 
     349           49 :     fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
     350           49 :         let job_id = JobId(self.jobs.len());
     351           49 :         self.jobs.push(job);
     352           49 :         self.job_queue.push(job_id);
     353           49 :         job_id
     354           49 :     }
     355              : 
     356              :     /// Take a partition of the key space, and decide how to compact it.
     357              :     ///
     358              :     /// TODO: Currently, this is called exactly once for the level, and we
     359              :     /// decide whether to create new image layers to cover the whole level, or
     360              :     /// write a new set of deltas. In the future, this should try to partition
     361              :     /// the key space, and make the decision separately for each partition.
     362            3 :     async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     363            3 :         let job = &self.jobs[job_id.0];
     364            3 :         assert!(job.strategy == CompactionStrategy::Divide);
     365              : 
     366              :         // Check for dummy cases
     367            3 :         if job.input_layers.is_empty() {
     368            0 :             return Ok(());
     369            3 :         }
     370            3 : 
     371            3 :         let job = &self.jobs[job_id.0];
     372            3 :         assert!(job.strategy == CompactionStrategy::Divide);
     373              : 
     374              :         // Would it be better to create images for this partition?
     375              :         // Decide based on the average density of the level
     376            3 :         let keyspace_size = keyspace_total_size(
     377            3 :             &self
     378            3 :                 .executor
     379            3 :                 .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     380            0 :                 .await?,
     381            3 :             &self.shard_identity,
     382            3 :         ) * PAGE_SZ;
     383            3 : 
     384            3 :         let wal_size = job
     385            3 :             .input_layers
     386            3 :             .iter()
     387           47 :             .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
     388           47 :             .map(|layer_id| self.layers[layer_id.0].layer.file_size())
     389            3 :             .sum::<u64>();
     390            3 :         if keyspace_size < wal_size {
     391              :             // seems worth it
     392            0 :             info!(
     393            0 :                 "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
     394              :                 keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
     395              :             );
     396            0 :             self.cover_with_images(job_id, ctx).await
     397              :         } else {
     398              :             // do deltas
     399            3 :             info!(
     400            0 :                 "coverage not worth it, keyspace_size {}, wal_size {}",
     401              :                 keyspace_size, wal_size
     402              :             );
     403            3 :             self.retile_deltas(job_id, ctx).await
     404              :         }
     405            3 :     }
     406              : 
     407              :     // LSN
     408              :     //  ^
     409              :     //  |
     410              :     //  |                          ###|###|#####
     411              :     //  | +--+-----+--+            +--+-----+--+
     412              :     //  | |  |     |  |            |  |     |  |
     413              :     //  | +--+--+--+--+            +--+--+--+--+
     414              :     //  | |     |     |            |     |     |
     415              :     //  | +---+-+-+---+     ==>    +---+-+-+---+
     416              :     //  | |   |   |   |            |   |   |   |
     417              :     //  | +---+-+-++--+            +---+-+-++--+
     418              :     //  | |     |  |  |            |     |  |  |
     419              :     //  | +-----+--+--+            +-----+--+--+
     420              :     //  |
     421              :     //  +--------------> key
     422              :     //
     423            0 :     async fn cover_with_images(
     424            0 :         &mut self,
     425            0 :         job_id: JobId,
     426            0 :         ctx: &E::RequestContext,
     427            0 :     ) -> anyhow::Result<()> {
     428            0 :         let job = &self.jobs[job_id.0];
     429            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     430              : 
     431              :         // XXX: do we still need the "holes" stuff?
     432              : 
     433            0 :         let mut new_jobs = Vec::new();
     434              : 
     435              :         // Slide a window through the keyspace
     436            0 :         let keyspace = self
     437            0 :             .executor
     438            0 :             .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     439            0 :             .await?;
     440              : 
     441            0 :         let mut window = KeyspaceWindow::new(
     442            0 :             E::Key::MIN..E::Key::MAX,
     443            0 :             keyspace,
     444            0 :             self.target_file_size / PAGE_SZ,
     445            0 :         );
     446            0 :         while let Some(key_range) = window.choose_next_image(&self.shard_identity) {
     447            0 :             new_jobs.push(CompactionJob::<E> {
     448            0 :                 key_range,
     449            0 :                 lsn_range: job.lsn_range.clone(),
     450            0 :                 strategy: CompactionStrategy::CreateImage,
     451            0 :                 input_layers: Vec::new(), // XXX: Is it OK for  this to be empty for image layer?
     452            0 :                 completed: false,
     453            0 :             });
     454            0 :         }
     455              : 
     456            0 :         for j in new_jobs.into_iter().rev() {
     457            0 :             let _job_id = self.push_job(j);
     458            0 : 
     459            0 :             // TODO: image layers don't let us delete anything. unless < PITR horizon
     460            0 :             //let j = &self.jobs[job_id.0];
     461            0 :             // for layer_id in j.input_layers.iter() {
     462            0 :             //    self.layers[layer_id.0].pending_stakeholders.insert(job_id);
     463            0 :             //}
     464            0 :         }
     465              : 
     466            0 :         Ok(())
     467            0 :     }
     468              : 
     469              :     // Merge the contents of all the input delta layers into a new set
     470              :     // of delta layers, based on the current partitioning.
     471              :     //
     472              :     // We split the new delta layers on the key dimension. We iterate through
     473              :     // the key space, and for each key, check if including the next key to the
     474              :     // current output layer we're building would cause the layer to become too
     475              :     // large. If so, dump the current output layer and start new one.  It's
     476              :     // possible that there is a single key with so many page versions that
     477              :     // storing all of them in a single layer file would be too large. In that
     478              :     // case, we also split on the LSN dimension.
     479              :     //
     480              :     // LSN
     481              :     //  ^
     482              :     //  |
     483              :     //  | +-----------+            +--+--+--+--+
     484              :     //  | |           |            |  |  |  |  |
     485              :     //  | +-----------+            |  |  |  |  |
     486              :     //  | |           |            |  |  |  |  |
     487              :     //  | +-----------+     ==>    |  |  |  |  |
     488              :     //  | |           |            |  |  |  |  |
     489              :     //  | +-----------+            |  |  |  |  |
     490              :     //  | |           |            |  |  |  |  |
     491              :     //  | +-----------+            +--+--+--+--+
     492              :     //  |
     493              :     //  +--------------> key
     494              :     //
     495              :     //
     496              :     // If one key (X) has a lot of page versions:
     497              :     //
     498              :     // LSN
     499              :     //  ^
     500              :     //  |                                 (X)
     501              :     //  | +-----------+            +--+--+--+--+
     502              :     //  | |           |            |  |  |  |  |
     503              :     //  | +-----------+            |  |  +--+  |
     504              :     //  | |           |            |  |  |  |  |
     505              :     //  | +-----------+     ==>    |  |  |  |  |
     506              :     //  | |           |            |  |  +--+  |
     507              :     //  | +-----------+            |  |  |  |  |
     508              :     //  | |           |            |  |  |  |  |
     509              :     //  | +-----------+            +--+--+--+--+
     510              :     //  |
     511              :     //  +--------------> key
     512              :     //
     513              :     // TODO: this actually divides the layers into fixed-size chunks, not
     514              :     // based on the partitioning.
     515              :     //
     516              :     // TODO: we should also opportunistically materialize and
     517              :     // garbage collect what we can.
     518            3 :     async fn retile_deltas(
     519            3 :         &mut self,
     520            3 :         job_id: JobId,
     521            3 :         ctx: &E::RequestContext,
     522            3 :     ) -> anyhow::Result<()> {
     523            3 :         let job = &self.jobs[job_id.0];
     524            3 :         assert!(job.strategy == CompactionStrategy::Divide);
     525              : 
     526              :         // Sweep the key space left to right, running an estimate of how much
     527              :         // disk size and keyspace we have accumulated
     528              :         //
     529              :         // Once the disk size reaches the target threshold, stop and think.
     530              :         // If we have accumulated only a narrow band of keyspace, create an
     531              :         // image layer. Otherwise write a delta layer.
     532              : 
     533              :         // FIXME: we are ignoring images here. Did we already divide the work
     534              :         // so that we won't encounter them here?
     535              : 
     536            3 :         let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     537           50 :         for layer_id in &job.input_layers {
     538           47 :             let l = &self.layers[layer_id.0];
     539           47 :             if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
     540           47 :                 deltas.push(dl.clone());
     541           47 :             }
     542              :         }
     543              :         // Open stream
     544            3 :         let key_value_stream =
     545            3 :             std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
     546            0 :                 .await?
     547            3 :                 .map(Result::<_, anyhow::Error>::Ok));
     548            3 :         let mut new_jobs = Vec::new();
     549            3 : 
     550            3 :         // Slide a window through the keyspace
     551            3 :         let mut key_accum =
     552            3 :             std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
     553            3 :         let mut all_in_window: bool = false;
     554            3 :         let mut window = Window::new();
     555            3 : 
     556            3 :         // Helper function to create a job for a new delta layer with given key-lsn
     557            3 :         // rectangle.
     558           49 :         let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
     559           49 :             // The inputs for the job are all the input layers of the original job that
     560           49 :             // overlap with the rectangle.
     561           49 :             let batch_layers: Vec<LayerId> = job
     562           49 :                 .input_layers
     563           49 :                 .iter()
     564         1561 :                 .filter(|layer_id| {
     565         1561 :                     overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
     566         1561 :                 })
     567           49 :                 .cloned()
     568           49 :                 .collect();
     569           49 :             assert!(!batch_layers.is_empty());
     570           49 :             new_jobs.push(CompactionJob {
     571           49 :                 key_range,
     572           49 :                 lsn_range: lsn_range.clone(),
     573           49 :                 strategy: CompactionStrategy::CreateDelta,
     574           49 :                 input_layers: batch_layers,
     575           49 :                 completed: false,
     576           49 :             });
     577           49 :         };
     578              : 
     579              :         loop {
     580        93670 :             if all_in_window && window.is_empty() {
     581              :                 // All done!
     582            3 :                 break;
     583        93667 :             }
     584              : 
     585              :             // If we now have enough keyspace for next delta layer in the window, create a
     586              :             // new delta layer
     587        93667 :             if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
     588              :             {
     589           41 :                 create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
     590           41 :                 continue;
     591        93626 :             }
     592        93626 :             assert!(!all_in_window);
     593              : 
     594              :             // Process next key in the key space
     595        93626 :             match key_accum.next().await.transpose()? {
     596            3 :                 None => {
     597            3 :                     all_in_window = true;
     598            3 :                 }
     599        93623 :                 Some(next_key) if next_key.partition_lsns.is_empty() => {
     600        93621 :                     // Normal case: extend the window by the key
     601        93621 :                     window.feed(next_key.key, next_key.size);
     602        93621 :                 }
     603            2 :                 Some(next_key) => {
     604            2 :                     // A key with too large size impact for a single delta layer. This
     605            2 :                     // case occurs if you make a huge number of updates for a single key.
     606            2 :                     //
     607            2 :                     // Drain the window with has_more = false to make a clean cut before
     608            2 :                     // the key, and then make dedicated delta layers for the single key.
     609            2 :                     //
     610            2 :                     // We cannot cluster the key with the others, because we don't want
     611            2 :                     // layer files to overlap with each other in the lsn,key space (no
     612            2 :                     // overlaps for the rectangles).
     613            2 :                     let key = next_key.key;
     614            2 :                     debug!("key {key} with size impact larger than the layer size");
     615            2 :                     while !window.is_empty() {
     616            0 :                         let has_more = false;
     617            0 :                         let key_range = window.choose_next_delta(self.target_file_size, has_more)
     618            0 :                             .expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
     619            0 :                         create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
     620            0 :                     }
     621              : 
     622              :                     // Not really required: but here for future resilience:
     623              :                     // We make a "gap" here, so any structure the window holds should
     624              :                     // probably be reset.
     625            2 :                     window = Window::new();
     626            2 : 
     627            2 :                     let mut prior_lsn = job.lsn_range.start;
     628            2 :                     let mut lsn_ranges = Vec::new();
     629            6 :                     for (lsn, _size) in next_key.partition_lsns.iter() {
     630            6 :                         lsn_ranges.push(prior_lsn..*lsn);
     631            6 :                         prior_lsn = *lsn;
     632            6 :                     }
     633            2 :                     lsn_ranges.push(prior_lsn..job.lsn_range.end);
     634           10 :                     for lsn_range in lsn_ranges {
     635            8 :                         let key_range = key..key.next();
     636            8 :                         create_delta_job(key_range, &lsn_range, &mut new_jobs);
     637            8 :                     }
     638              :                 }
     639              :             }
     640              :         }
     641              : 
     642              :         // All the input files are rewritten. Set up the tracking for when they can
     643              :         // be deleted.
     644           47 :         for layer_id in job.input_layers.iter() {
     645           47 :             let l = &mut self.layers[layer_id.0];
     646           47 :             assert!(l.deletable_after.is_none());
     647           47 :             l.deletable_after = Some(PendingJobSet::new());
     648              :         }
     649           49 :         for j in new_jobs.into_iter().rev() {
     650           49 :             let job_id = self.push_job(j);
     651           49 :             let j = &self.jobs[job_id.0];
     652         1561 :             for layer_id in j.input_layers.iter() {
     653         1561 :                 self.layers[layer_id.0]
     654         1561 :                     .deletable_after
     655         1561 :                     .as_mut()
     656         1561 :                     .unwrap()
     657         1561 :                     .pending
     658         1561 :                     .insert(job_id);
     659         1561 :             }
     660              :         }
     661              : 
     662            3 :         Ok(())
     663            3 :     }
     664              : }
     665              : 
     666              : /// Sliding window through keyspace and values for image layer
     667              : /// This is used by [`LevelCompactionState::cover_with_images`] to decide on good split points
     668              : struct KeyspaceWindow<K> {
     669              :     head: KeyspaceWindowHead<K>,
     670              : 
     671              :     start_pos: KeyspaceWindowPos<K>,
     672              : }
     673              : struct KeyspaceWindowHead<K> {
     674              :     // overall key range to cover
     675              :     key_range: Range<K>,
     676              : 
     677              :     keyspace: Vec<Range<K>>,
     678              :     target_keysize: u64,
     679              : }
     680              : 
     681              : #[derive(Clone)]
     682              : struct KeyspaceWindowPos<K> {
     683              :     end_key: K,
     684              : 
     685              :     keyspace_idx: usize,
     686              : 
     687              :     accum_keysize: u64,
     688              : }
     689              : impl<K: CompactionKey> KeyspaceWindowPos<K> {
     690            0 :     fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
     691            0 :         self.keyspace_idx == w.keyspace.len()
     692            0 :     }
     693              : 
     694              :     // Advance the cursor until it reaches 'target_keysize'.
     695            0 :     fn advance_until_size(
     696            0 :         &mut self,
     697            0 :         w: &KeyspaceWindowHead<K>,
     698            0 :         max_size: u64,
     699            0 :         shard_identity: &ShardIdentity,
     700            0 :     ) {
     701            0 :         while self.accum_keysize < max_size && !self.reached_end(w) {
     702            0 :             let curr_range = &w.keyspace[self.keyspace_idx];
     703            0 :             if self.end_key < curr_range.start {
     704            0 :                 // skip over any unused space
     705            0 :                 self.end_key = curr_range.start;
     706            0 :             }
     707              : 
     708              :             // We're now within 'curr_range'. Can we advance past it completely?
     709            0 :             let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity);
     710            0 :             if (self.accum_keysize + distance as u64) < max_size {
     711            0 :                 // oh yeah, it fits
     712            0 :                 self.end_key = curr_range.end;
     713            0 :                 self.keyspace_idx += 1;
     714            0 :                 self.accum_keysize += distance as u64;
     715            0 :             } else {
     716              :                 // advance within the range
     717            0 :                 let skip_key = self.end_key.skip_some();
     718            0 :                 let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity);
     719            0 :                 if (self.accum_keysize + distance as u64) < max_size {
     720            0 :                     self.end_key = skip_key;
     721            0 :                     self.accum_keysize += distance as u64;
     722            0 :                 } else {
     723            0 :                     self.end_key = self.end_key.next();
     724            0 :                     self.accum_keysize += 1;
     725            0 :                 }
     726              :             }
     727              :         }
     728            0 :     }
     729              : }
     730              : 
     731              : impl<K> KeyspaceWindow<K>
     732              : where
     733              :     K: CompactionKey,
     734              : {
     735            0 :     fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
     736            0 :         assert!(keyspace.first().unwrap().start >= key_range.start);
     737              : 
     738            0 :         let start_key = key_range.start;
     739            0 :         let start_pos = KeyspaceWindowPos::<K> {
     740            0 :             end_key: start_key,
     741            0 :             keyspace_idx: 0,
     742            0 :             accum_keysize: 0,
     743            0 :         };
     744            0 :         Self {
     745            0 :             head: KeyspaceWindowHead::<K> {
     746            0 :                 key_range,
     747            0 :                 keyspace,
     748            0 :                 target_keysize,
     749            0 :             },
     750            0 :             start_pos,
     751            0 :         }
     752            0 :     }
     753              : 
     754            0 :     fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option<Range<K>> {
     755            0 :         if self.start_pos.keyspace_idx == self.head.keyspace.len() {
     756              :             // we've reached the end
     757            0 :             return None;
     758            0 :         }
     759            0 : 
     760            0 :         let mut next_pos = self.start_pos.clone();
     761            0 :         next_pos.advance_until_size(
     762            0 :             &self.head,
     763            0 :             self.start_pos.accum_keysize + self.head.target_keysize,
     764            0 :             shard_identity,
     765            0 :         );
     766            0 : 
     767            0 :         // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
     768            0 :         // 1.25x target size
     769            0 :         let mut end_pos = next_pos.clone();
     770            0 :         end_pos.advance_until_size(
     771            0 :             &self.head,
     772            0 :             self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
     773            0 :             shard_identity,
     774            0 :         );
     775            0 :         if end_pos.reached_end(&self.head) {
     776              :             // gobble up any unused keyspace between the last used key and end of the range
     777            0 :             assert!(end_pos.end_key <= self.head.key_range.end);
     778            0 :             end_pos.end_key = self.head.key_range.end;
     779            0 :             next_pos = end_pos;
     780            0 :         }
     781              : 
     782            0 :         let start_key = self.start_pos.end_key;
     783            0 :         self.start_pos = next_pos;
     784            0 :         Some(start_key..self.start_pos.end_key)
     785            0 :     }
     786              : }
     787              : 
     788              : // Take previous partitioning, based on the image layers below.
     789              : //
     790              : // Candidate is at the front:
     791              : //
     792              : // Consider stretching an image layer to next divider? If it's close enough,
     793              : // that's the image candidate
     794              : //
     795              : // If it's too far, consider splitting at a reasonable point
     796              : //
     797              : // Is the image candidate smaller than the equivalent delta? If so,
     798              : // split off the image. Otherwise, split off one delta.
     799              : // Try to snap off the delta at a reasonable point
     800              : 
     801              : struct WindowElement<K> {
     802              :     start_key: K, // inclusive
     803              :     last_key: K,  // inclusive
     804              :     accum_size: u64,
     805              : }
     806              : 
     807              : /// Sliding window through keyspace and values for delta layer tiling
     808              : ///
     809              : /// This is used to decide which delta layer to write next.
     810              : struct Window<K> {
     811              :     elems: VecDeque<WindowElement<K>>,
     812              : 
     813              :     // last key that was split off, inclusive
     814              :     splitoff_key: Option<K>,
     815              :     splitoff_size: u64,
     816              : }
     817              : 
     818              : impl<K> Window<K>
     819              : where
     820              :     K: CompactionKey,
     821              : {
     822            5 :     fn new() -> Self {
     823            5 :         Self {
     824            5 :             elems: VecDeque::new(),
     825            5 :             splitoff_key: None,
     826            5 :             splitoff_size: 0,
     827            5 :         }
     828            5 :     }
     829              : 
     830        93621 :     fn feed(&mut self, key: K, size: u64) {
     831              :         let last_size;
     832        93621 :         if let Some(last) = self.elems.back_mut() {
     833              :             // We require the keys to be strictly increasing for the window.
     834              :             // Keys should already have been deduplicated by `accum_key_values`
     835        93618 :             assert!(
     836        93618 :                 last.last_key < key,
     837            0 :                 "last_key(={}) >= key(={key})",
     838              :                 last.last_key
     839              :             );
     840        93618 :             last_size = last.accum_size;
     841            3 :         } else {
     842            3 :             last_size = 0;
     843            3 :         }
     844              :         // This is a new key.
     845        93621 :         let elem = WindowElement {
     846        93621 :             start_key: key,
     847        93621 :             last_key: key,
     848        93621 :             accum_size: last_size + size,
     849        93621 :         };
     850        93621 :         self.elems.push_back(elem);
     851        93621 :     }
     852              : 
     853         7968 :     fn remain_size(&self) -> u64 {
     854         7968 :         self.elems.back().unwrap().accum_size - self.splitoff_size
     855         7968 :     }
     856              : 
     857       187214 :     fn peek_size(&self) -> u64 {
     858       187214 :         self.elems.front().unwrap().accum_size - self.splitoff_size
     859       187214 :     }
     860              : 
     861            8 :     fn is_empty(&self) -> bool {
     862            8 :         self.elems.is_empty()
     863            8 :     }
     864              : 
     865        93593 :     fn commit_upto(&mut self, mut upto: usize) {
     866       187173 :         while upto > 1 {
     867        93580 :             let popped = self.elems.pop_front().unwrap();
     868        93580 :             self.elems.front_mut().unwrap().start_key = popped.start_key;
     869        93580 :             upto -= 1;
     870        93580 :         }
     871        93593 :     }
     872              : 
     873           38 :     fn find_size_split(&self, target_size: u64) -> usize {
     874           38 :         self.elems
     875          335 :             .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
     876           38 :     }
     877              : 
     878           41 :     fn pop(&mut self) {
     879           41 :         let first = self.elems.pop_front().unwrap();
     880           41 :         self.splitoff_size = first.accum_size;
     881           41 : 
     882           41 :         self.splitoff_key = Some(first.last_key);
     883           41 :     }
     884              : 
     885              :     // the difference between delta and image is that an image covers
     886              :     // any unused keyspace before and after, while a delta tries to
     887              :     // minimize that. TODO: difference not implemented
     888           41 :     fn pop_delta(&mut self) -> Range<K> {
     889           41 :         let first = self.elems.front().unwrap();
     890           41 :         let key_range = first.start_key..first.last_key.next();
     891           41 : 
     892           41 :         self.pop();
     893           41 :         key_range
     894           41 :     }
     895              : 
     896              :     // Prerequisite: we have enough input in the window
     897              :     //
     898              :     // On return None, the caller should feed more data and call again
     899        93667 :     fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
     900        93667 :         if has_more && self.elems.is_empty() {
     901              :             // Starting up
     902            5 :             return None;
     903        93662 :         }
     904              : 
     905              :         // If we still have an undersized candidate, just keep going
     906       187214 :         while self.peek_size() < target_size {
     907       179248 :             if self.elems.len() > 1 {
     908        93552 :                 self.commit_upto(2);
     909        93552 :             } else if has_more {
     910        85694 :                 return None;
     911              :             } else {
     912            2 :                 break;
     913              :             }
     914              :         }
     915              : 
     916              :         // Ensure we have enough input in the window to make a good decision
     917         7968 :         if has_more && self.remain_size() < target_size * 5 / 4 {
     918         7927 :             return None;
     919           41 :         }
     920           41 : 
     921           41 :         // The candidate on the front is now large enough, for a delta.
     922           41 :         // And we have enough data in the window to decide.
     923           41 : 
     924           41 :         // If we're willing to stretch it up to 1.25 target size, could we
     925           41 :         // gobble up the rest of the work? This avoids creating very small
     926           41 :         // "tail" layers at the end of the keyspace
     927           41 :         if !has_more && self.remain_size() < target_size * 5 / 4 {
     928            3 :             self.commit_upto(self.elems.len());
     929            3 :         } else {
     930           38 :             let delta_split_at = self.find_size_split(target_size);
     931           38 :             self.commit_upto(delta_split_at);
     932           38 : 
     933           38 :             // If it's still not large enough, request the caller to fill the window
     934           38 :             if self.elems.len() == 1 && has_more {
     935            0 :                 return None;
     936           38 :             }
     937              :         }
     938           41 :         Some(self.pop_delta())
     939        93667 :     }
     940              : }
        

Generated by: LCOV version 2.1-beta