LCOV - code coverage report
Current view: top level - pageserver/compaction/src - compact_tiered.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 0.0 % 464 0
Test Date: 2024-02-29 11:57:12 Functions: 0.0 % 141 0

            Line data    Source code
       1              : //! # Tiered compaction algorithm.
       2              : //!
       3              : //! Read all the input delta files, and write a new set of delta files that
       4              : //! include all the input WAL records. See retile_deltas().
       5              : //!
       6              : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
       7              : //! later values, but in our system, we keep all the history. So the reshuffling
       8              : //! doesn't remove any garbage, it just reshuffles the records to reduce read
       9              : //! amplification, i.e. the number of files that you need to access to find the
      10              : //! WAL records for a given key.
      11              : //!
      12              : //! If the new delta files would be very "narrow", i.e. each file would cover
      13              : //! only a narrow key range, then we create a new set of image files
      14              : //! instead. The current threshold is that if the estimated total size of the
      15              : //! image layers is smaller than the size of the deltas, then we create image
      16              : //! layers. That amounts to 2x storage amplification, and it means that the
      17              : //! distance of image layers in LSN dimension is roughly equal to the logical
      18              : //! database size. For example, if the logical database size is 10 GB, we would
      19              : //! generate new image layers every 10 GB of WAL.
      20              : use futures::StreamExt;
      21              : use tracing::{debug, info};
      22              : 
      23              : use std::collections::{HashSet, VecDeque};
      24              : use std::ops::Range;
      25              : 
      26              : use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with};
      27              : use crate::interface::*;
      28              : use utils::lsn::Lsn;
      29              : 
      30              : use crate::identify_levels::identify_level;
      31              : 
      32              : /// Main entry point to compaction.
      33              : ///
      34              : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
      35              : /// everything below that point, that needs compaction. The cutoff LSN must
      36              : /// partition the layers so that there are no layers that span across that
      37              : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
      38              : /// written last L0 layer.
      39            0 : pub async fn compact_tiered<E: CompactionJobExecutor>(
      40            0 :     executor: &mut E,
      41            0 :     end_lsn: Lsn,
      42            0 :     target_file_size: u64,
      43            0 :     fanout: u64,
      44            0 :     ctx: &E::RequestContext,
      45            0 : ) -> anyhow::Result<()> {
      46            0 :     assert!(fanout >= 2);
      47              :     // Start at L0
      48            0 :     let mut current_level_no = 0;
      49            0 :     let mut current_level_target_height = target_file_size;
      50              :     loop {
      51              :         // end LSN +1 to include possible image layers exactly at 'end_lsn'.
      52            0 :         let all_layers = executor
      53            0 :             .get_layers(
      54            0 :                 &(E::Key::MIN..E::Key::MAX),
      55            0 :                 &(Lsn(u64::MIN)..end_lsn + 1),
      56            0 :                 ctx,
      57            0 :             )
      58            0 :             .await?;
      59            0 :         info!(
      60            0 :             "Compacting L{}, total # of layers: {}",
      61            0 :             current_level_no,
      62            0 :             all_layers.len()
      63            0 :         );
      64              : 
      65              :         // Identify the range of LSNs that belong to this level. We assume that
      66              :         // each file in this level span an LSN range up to 1.75x target file
      67              :         // size. That should give us enough slop that if we created a slightly
      68              :         // oversized L0 layer, e.g. because flushing the in-memory layer was
      69              :         // delayed for some reason, we don't consider the oversized layer to
      70              :         // belong to L1. But not too much slop, that we don't accidentally
      71              :         // "skip" levels.
      72            0 :         let max_height = (current_level_target_height as f64 * 1.75) as u64;
      73            0 :         let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
      74            0 :             break;
      75              :         };
      76              : 
      77              :         // Calculate the height of this level. If the # of tiers exceeds the
      78              :         // fanout parameter, it's time to compact it.
      79            0 :         let depth = level.depth();
      80            0 :         info!(
      81            0 :             "Level {} identified as LSN range {}-{}: depth {}",
      82            0 :             current_level_no, level.lsn_range.start, level.lsn_range.end, depth
      83            0 :         );
      84            0 :         for l in &level.layers {
      85            0 :             debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
      86              :         }
      87            0 :         if depth < fanout {
      88            0 :             debug!(
      89            0 :                 level = current_level_no,
      90            0 :                 depth = depth,
      91            0 :                 fanout,
      92            0 :                 "too few deltas to compact"
      93            0 :             );
      94            0 :             break;
      95            0 :         }
      96            0 : 
      97            0 :         compact_level(
      98            0 :             &level.lsn_range,
      99            0 :             &level.layers,
     100            0 :             executor,
     101            0 :             target_file_size,
     102            0 :             ctx,
     103            0 :         )
     104            0 :         .await?;
     105            0 :         if target_file_size == u64::MAX {
     106            0 :             break;
     107            0 :         }
     108            0 :         current_level_no += 1;
     109            0 :         current_level_target_height = current_level_target_height.saturating_mul(fanout);
     110              :     }
     111            0 :     Ok(())
     112            0 : }
     113              : 
     114            0 : async fn compact_level<E: CompactionJobExecutor>(
     115            0 :     lsn_range: &Range<Lsn>,
     116            0 :     layers: &[E::Layer],
     117            0 :     executor: &mut E,
     118            0 :     target_file_size: u64,
     119            0 :     ctx: &E::RequestContext,
     120            0 : ) -> anyhow::Result<bool> {
     121            0 :     let mut layer_fragments = Vec::new();
     122            0 :     for l in layers {
     123            0 :         layer_fragments.push(LayerFragment::new(l.clone()));
     124            0 :     }
     125              : 
     126            0 :     let mut state = LevelCompactionState {
     127            0 :         target_file_size,
     128            0 :         _lsn_range: lsn_range.clone(),
     129            0 :         layers: layer_fragments,
     130            0 :         jobs: Vec::new(),
     131            0 :         job_queue: Vec::new(),
     132            0 :         next_level: false,
     133            0 :         executor,
     134            0 :     };
     135            0 : 
     136            0 :     let first_job = CompactionJob {
     137            0 :         key_range: E::Key::MIN..E::Key::MAX,
     138            0 :         lsn_range: lsn_range.clone(),
     139            0 :         strategy: CompactionStrategy::Divide,
     140            0 :         input_layers: state
     141            0 :             .layers
     142            0 :             .iter()
     143            0 :             .enumerate()
     144            0 :             .map(|i| LayerId(i.0))
     145            0 :             .collect(),
     146            0 :         completed: false,
     147            0 :     };
     148            0 : 
     149            0 :     state.jobs.push(first_job);
     150            0 :     state.job_queue.push(JobId(0));
     151            0 :     state.execute(ctx).await?;
     152              : 
     153            0 :     info!(
     154            0 :         "compaction completed! Need to process next level: {}",
     155            0 :         state.next_level
     156            0 :     );
     157              : 
     158            0 :     Ok(state.next_level)
     159            0 : }
     160              : 
     161              : /// Blackboard that keeps track of the state of all the jobs and work remaining
     162              : struct LevelCompactionState<'a, E>
     163              : where
     164              :     E: CompactionJobExecutor,
     165              : {
     166              :     // parameters
     167              :     target_file_size: u64,
     168              : 
     169              :     _lsn_range: Range<Lsn>,
     170              :     layers: Vec<LayerFragment<E>>,
     171              : 
     172              :     // job queue
     173              :     jobs: Vec<CompactionJob<E>>,
     174              :     job_queue: Vec<JobId>,
     175              : 
     176              :     /// If false, no need to compact levels below this
     177              :     next_level: bool,
     178              : 
     179              :     /// Interface to the outside world
     180              :     executor: &'a mut E,
     181              : }
     182              : 
     183            0 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     184              : struct LayerId(usize);
     185            0 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     186              : struct JobId(usize);
     187              : 
     188              : struct PendingJobSet {
     189              :     pending: HashSet<JobId>,
     190              :     completed: HashSet<JobId>,
     191              : }
     192              : 
     193              : impl PendingJobSet {
     194            0 :     fn new() -> Self {
     195            0 :         PendingJobSet {
     196            0 :             pending: HashSet::new(),
     197            0 :             completed: HashSet::new(),
     198            0 :         }
     199            0 :     }
     200              : 
     201            0 :     fn complete_job(&mut self, job_id: JobId) {
     202            0 :         self.pending.remove(&job_id);
     203            0 :         self.completed.insert(job_id);
     204            0 :     }
     205              : 
     206            0 :     fn all_completed(&self) -> bool {
     207            0 :         self.pending.is_empty()
     208            0 :     }
     209              : }
     210              : 
     211              : // When we decide to rewrite a set of layers, LayerFragment is used to keep
     212              : // track which new layers supersede an old layer. When all the stakeholder jobs
     213              : // have completed, this layer can be deleted.
     214              : struct LayerFragment<E>
     215              : where
     216              :     E: CompactionJobExecutor,
     217              : {
     218              :     layer: E::Layer,
     219              : 
     220              :     // If we will write new layers to replace this one, this keeps track of the
     221              :     // jobs that need to complete before this layer can be deleted. As the jobs
     222              :     // complete, they are moved from 'pending' to 'completed' set. Once the
     223              :     // 'pending' set becomes empty, the layer can be deleted.
     224              :     //
     225              :     // If None, this layer is not rewritten and must not be deleted.
     226              :     deletable_after: Option<PendingJobSet>,
     227              : 
     228              :     deleted: bool,
     229              : }
     230              : 
     231              : impl<E> LayerFragment<E>
     232              : where
     233              :     E: CompactionJobExecutor,
     234              : {
     235            0 :     fn new(layer: E::Layer) -> Self {
     236            0 :         LayerFragment {
     237            0 :             layer,
     238            0 :             deletable_after: None,
     239            0 :             deleted: false,
     240            0 :         }
     241            0 :     }
     242              : }
     243              : 
     244            0 : #[derive(PartialEq)]
     245              : enum CompactionStrategy {
     246              :     Divide,
     247              :     CreateDelta,
     248              :     CreateImage,
     249              : }
     250              : 
     251              : #[allow(dead_code)] // Todo
     252              : struct CompactionJob<E: CompactionJobExecutor> {
     253              :     key_range: Range<E::Key>,
     254              :     lsn_range: Range<Lsn>,
     255              : 
     256              :     strategy: CompactionStrategy,
     257              : 
     258              :     input_layers: Vec<LayerId>,
     259              : 
     260              :     completed: bool,
     261              : }
     262              : 
     263              : impl<'a, E> LevelCompactionState<'a, E>
     264              : where
     265              :     E: CompactionJobExecutor,
     266              : {
     267              :     /// Main loop of the executor.
     268              :     ///
     269              :     /// In each iteration, we take the next job from the queue, and execute it.
     270              :     /// The execution might add new jobs to the queue. Keep going until the
     271              :     /// queue is empty.
     272              :     ///
     273              :     /// Initially, the job queue consists of one Divide job over the whole
     274              :     /// level. On first call, it is divided into smaller jobs.
     275            0 :     async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
     276              :         // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
     277            0 :         while let Some(next_job_id) = self.job_queue.pop() {
     278            0 :             info!("executing job {}", next_job_id.0);
     279            0 :             self.execute_job(next_job_id, ctx).await?;
     280              :         }
     281              : 
     282              :         // all done!
     283            0 :         Ok(())
     284            0 :     }
     285              : 
     286            0 :     async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     287            0 :         let job = &self.jobs[job_id.0];
     288            0 :         match job.strategy {
     289              :             CompactionStrategy::Divide => {
     290            0 :                 self.divide_job(job_id, ctx).await?;
     291            0 :                 Ok(())
     292              :             }
     293              :             CompactionStrategy::CreateDelta => {
     294            0 :                 let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     295            0 :                 let mut layer_ids: Vec<LayerId> = Vec::new();
     296            0 :                 for layer_id in &job.input_layers {
     297            0 :                     let layer = &self.layers[layer_id.0].layer;
     298            0 :                     if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
     299            0 :                         deltas.push(dl.clone());
     300            0 :                         layer_ids.push(*layer_id);
     301            0 :                     }
     302              :                 }
     303              : 
     304            0 :                 self.executor
     305            0 :                     .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
     306            0 :                     .await?;
     307            0 :                 self.jobs[job_id.0].completed = true;
     308              : 
     309              :                 // did we complete any fragments?
     310            0 :                 for layer_id in layer_ids {
     311            0 :                     let l = &mut self.layers[layer_id.0];
     312            0 :                     if let Some(deletable_after) = l.deletable_after.as_mut() {
     313            0 :                         deletable_after.complete_job(job_id);
     314            0 :                         if deletable_after.all_completed() {
     315            0 :                             self.executor.delete_layer(&l.layer, ctx).await?;
     316            0 :                             l.deleted = true;
     317            0 :                         }
     318            0 :                     }
     319              :                 }
     320              : 
     321            0 :                 self.next_level = true;
     322            0 : 
     323            0 :                 Ok(())
     324              :             }
     325              :             CompactionStrategy::CreateImage => {
     326            0 :                 self.executor
     327            0 :                     .create_image(job.lsn_range.end, &job.key_range, ctx)
     328            0 :                     .await?;
     329            0 :                 self.jobs[job_id.0].completed = true;
     330            0 : 
     331            0 :                 // TODO: we could check if any layers < PITR horizon became deletable
     332            0 :                 Ok(())
     333              :             }
     334              :         }
     335            0 :     }
     336              : 
     337            0 :     fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
     338            0 :         let job_id = JobId(self.jobs.len());
     339            0 :         self.jobs.push(job);
     340            0 :         self.job_queue.push(job_id);
     341            0 :         job_id
     342            0 :     }
     343              : 
     344              :     /// Take a partition of the key space, and decide how to compact it.
     345              :     ///
     346              :     /// TODO: Currently, this is called exactly once for the level, and we
     347              :     /// decide whether to create new image layers to cover the whole level, or
     348              :     /// write a new set of delta. In the future, this should try to partition
     349              :     /// the key space, and make the decision separately for each partition.
     350            0 :     async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     351            0 :         let job = &self.jobs[job_id.0];
     352            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     353              : 
     354              :         // Check for dummy cases
     355            0 :         if job.input_layers.is_empty() {
     356            0 :             return Ok(());
     357            0 :         }
     358            0 : 
     359            0 :         let job = &self.jobs[job_id.0];
     360            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     361              : 
     362              :         // Would it be better to create images for this partition?
     363              :         // Decide based on the average density of the level
     364            0 :         let keyspace_size = keyspace_total_size(
     365            0 :             &self
     366            0 :                 .executor
     367            0 :                 .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     368            0 :                 .await?,
     369              :         ) * 8192;
     370              : 
     371            0 :         let wal_size = job
     372            0 :             .input_layers
     373            0 :             .iter()
     374            0 :             .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
     375            0 :             .map(|layer_id| self.layers[layer_id.0].layer.file_size())
     376            0 :             .sum::<u64>();
     377            0 :         if keyspace_size < wal_size {
     378              :             // seems worth it
     379            0 :             info!(
     380            0 :                 "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
     381            0 :                 keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
     382            0 :             );
     383            0 :             self.cover_with_images(job_id, ctx).await
     384              :         } else {
     385              :             // do deltas
     386            0 :             info!(
     387            0 :                 "coverage not worth it, keyspace_size {}, wal_size {}",
     388            0 :                 keyspace_size, wal_size
     389            0 :             );
     390            0 :             self.retile_deltas(job_id, ctx).await
     391              :         }
     392            0 :     }
     393              : 
     394              :     // LSN
     395              :     //  ^
     396              :     //  |
     397              :     //  |                          ###|###|#####
     398              :     //  | +--+-----+--+            +--+-----+--+
     399              :     //  | |  |     |  |            |  |     |  |
     400              :     //  | +--+--+--+--+            +--+--+--+--+
     401              :     //  | |     |     |            |     |     |
     402              :     //  | +---+-+-+---+     ==>    +---+-+-+---+
     403              :     //  | |   |   |   |            |   |   |   |
     404              :     //  | +---+-+-++--+            +---+-+-++--+
     405              :     //  | |     |  |  |            |     |  |  |
     406              :     //  | +-----+--+--+            +-----+--+--+
     407              :     //  |
     408              :     //  +--------------> key
     409              :     //
     410            0 :     async fn cover_with_images(
     411            0 :         &mut self,
     412            0 :         job_id: JobId,
     413            0 :         ctx: &E::RequestContext,
     414            0 :     ) -> anyhow::Result<()> {
     415            0 :         let job = &self.jobs[job_id.0];
     416            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     417              : 
     418              :         // XXX: do we still need the "holes" stuff?
     419              : 
     420            0 :         let mut new_jobs = Vec::new();
     421              : 
     422              :         // Slide a window through the keyspace
     423            0 :         let keyspace = self
     424            0 :             .executor
     425            0 :             .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     426            0 :             .await?;
     427              : 
     428            0 :         let mut window = KeyspaceWindow::new(
     429            0 :             E::Key::MIN..E::Key::MAX,
     430            0 :             keyspace,
     431            0 :             self.target_file_size / 8192,
     432            0 :         );
     433            0 :         while let Some(key_range) = window.choose_next_image() {
     434            0 :             new_jobs.push(CompactionJob::<E> {
     435            0 :                 key_range,
     436            0 :                 lsn_range: job.lsn_range.clone(),
     437            0 :                 strategy: CompactionStrategy::CreateImage,
     438            0 :                 input_layers: Vec::new(), // XXX: Is it OK for  this to be empty for image layer?
     439            0 :                 completed: false,
     440            0 :             });
     441            0 :         }
     442              : 
     443            0 :         for j in new_jobs.into_iter().rev() {
     444            0 :             let _job_id = self.push_job(j);
     445            0 : 
     446            0 :             // TODO: image layers don't let us delete anything. unless < PITR horizon
     447            0 :             //let j = &self.jobs[job_id.0];
     448            0 :             // for layer_id in j.input_layers.iter() {
     449            0 :             //    self.layers[layer_id.0].pending_stakeholders.insert(job_id);
     450            0 :             //}
     451            0 :         }
     452              : 
     453            0 :         Ok(())
     454            0 :     }
     455              : 
     456              :     // Merge the contents of all the input delta layers into a new set
     457              :     // of delta layers, based on the current partitioning.
     458              :     //
     459              :     // We split the new delta layers on the key dimension. We iterate through
     460              :     // the key space, and for each key, check if including the next key to the
     461              :     // current output layer we're building would cause the layer to become too
     462              :     // large. If so, dump the current output layer and start new one.  It's
     463              :     // possible that there is a single key with so many page versions that
     464              :     // storing all of them in a single layer file would be too large. In that
     465              :     // case, we also split on the LSN dimension.
     466              :     //
     467              :     // LSN
     468              :     //  ^
     469              :     //  |
     470              :     //  | +-----------+            +--+--+--+--+
     471              :     //  | |           |            |  |  |  |  |
     472              :     //  | +-----------+            |  |  |  |  |
     473              :     //  | |           |            |  |  |  |  |
     474              :     //  | +-----------+     ==>    |  |  |  |  |
     475              :     //  | |           |            |  |  |  |  |
     476              :     //  | +-----------+            |  |  |  |  |
     477              :     //  | |           |            |  |  |  |  |
     478              :     //  | +-----------+            +--+--+--+--+
     479              :     //  |
     480              :     //  +--------------> key
     481              :     //
     482              :     //
     483              :     // If one key (X) has a lot of page versions:
     484              :     //
     485              :     // LSN
     486              :     //  ^
     487              :     //  |                                 (X)
     488              :     //  | +-----------+            +--+--+--+--+
     489              :     //  | |           |            |  |  |  |  |
     490              :     //  | +-----------+            |  |  +--+  |
     491              :     //  | |           |            |  |  |  |  |
     492              :     //  | +-----------+     ==>    |  |  |  |  |
     493              :     //  | |           |            |  |  +--+  |
     494              :     //  | +-----------+            |  |  |  |  |
     495              :     //  | |           |            |  |  |  |  |
     496              :     //  | +-----------+            +--+--+--+--+
     497              :     //  |
     498              :     //  +--------------> key
     499              :     //
     500              :     // TODO: this actually divides the layers into fixed-size chunks, not
     501              :     // based on the partitioning.
     502              :     //
     503              :     // TODO: we should also opportunistically materialize and
     504              :     // garbage collect what we can.
     505            0 :     async fn retile_deltas(
     506            0 :         &mut self,
     507            0 :         job_id: JobId,
     508            0 :         ctx: &E::RequestContext,
     509            0 :     ) -> anyhow::Result<()> {
     510            0 :         let job = &self.jobs[job_id.0];
     511            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     512              : 
     513              :         // Sweep the key space left to right, running an estimate of how much
     514              :         // disk size and keyspace we have accumulated
     515              :         //
     516              :         // Once the disk size reaches the target threshold, stop and think.
     517              :         // If we have accumulated only a narrow band of keyspace, create an
     518              :         // image layer. Otherwise write a delta layer.
     519              : 
     520              :         // FIXME: deal with the case of lots of values for same key
     521              : 
     522              :         // FIXME: we are ignoring images here. Did we already divide the work
     523              :         // so that we won't encounter them here?
     524              : 
     525            0 :         let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     526            0 :         for layer_id in &job.input_layers {
     527            0 :             let l = &self.layers[layer_id.0];
     528            0 :             if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
     529            0 :                 deltas.push(dl.clone());
     530            0 :             }
     531              :         }
     532              :         // Open stream
     533            0 :         let key_value_stream = std::pin::pin!(merge_delta_keys::<E>(deltas.as_slice(), ctx));
     534            0 :         let mut new_jobs = Vec::new();
     535            0 : 
     536            0 :         // Slide a window through the keyspace
     537            0 :         let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
     538            0 :         let mut all_in_window: bool = false;
     539            0 :         let mut window = Window::new();
     540            0 :         loop {
     541            0 :             if all_in_window && window.elems.is_empty() {
     542              :                 // All done!
     543            0 :                 break;
     544            0 :             }
     545            0 :             if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
     546              :             {
     547            0 :                 let batch_layers: Vec<LayerId> = job
     548            0 :                     .input_layers
     549            0 :                     .iter()
     550            0 :                     .filter(|layer_id| {
     551            0 :                         overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
     552            0 :                     })
     553            0 :                     .cloned()
     554            0 :                     .collect();
     555            0 :                 assert!(!batch_layers.is_empty());
     556            0 :                 new_jobs.push(CompactionJob {
     557            0 :                     key_range,
     558            0 :                     lsn_range: job.lsn_range.clone(),
     559            0 :                     strategy: CompactionStrategy::CreateDelta,
     560            0 :                     input_layers: batch_layers,
     561            0 :                     completed: false,
     562            0 :                 });
     563              :             } else {
     564            0 :                 assert!(!all_in_window);
     565            0 :                 if let Some(next_key) = key_accum.next().await.transpose()? {
     566            0 :                     window.feed(next_key.key, next_key.size);
     567            0 :                 } else {
     568            0 :                     all_in_window = true;
     569            0 :                 }
     570              :             }
     571              :         }
     572              : 
     573              :         // All the input files are rewritten. Set up the tracking for when they can
     574              :         // be deleted.
     575            0 :         for layer_id in job.input_layers.iter() {
     576            0 :             let l = &mut self.layers[layer_id.0];
     577            0 :             assert!(l.deletable_after.is_none());
     578            0 :             l.deletable_after = Some(PendingJobSet::new());
     579              :         }
     580            0 :         for j in new_jobs.into_iter().rev() {
     581            0 :             let job_id = self.push_job(j);
     582            0 :             let j = &self.jobs[job_id.0];
     583            0 :             for layer_id in j.input_layers.iter() {
     584            0 :                 self.layers[layer_id.0]
     585            0 :                     .deletable_after
     586            0 :                     .as_mut()
     587            0 :                     .unwrap()
     588            0 :                     .pending
     589            0 :                     .insert(job_id);
     590            0 :             }
     591              :         }
     592              : 
     593            0 :         Ok(())
     594            0 :     }
     595              : }
     596              : 
     597              : // Sliding window through keyspace and values
     598              : // This is used by over_with_images to decide on good split points
     599              : struct KeyspaceWindow<K> {
     600              :     head: KeyspaceWindowHead<K>,
     601              : 
     602              :     start_pos: KeyspaceWindowPos<K>,
     603              : }
     604              : struct KeyspaceWindowHead<K> {
     605              :     // overall key range to cover
     606              :     key_range: Range<K>,
     607              : 
     608              :     keyspace: Vec<Range<K>>,
     609              :     target_keysize: u64,
     610              : }
     611              : 
     612            0 : #[derive(Clone)]
     613              : struct KeyspaceWindowPos<K> {
     614              :     end_key: K,
     615              : 
     616              :     keyspace_idx: usize,
     617              : 
     618              :     accum_keysize: u64,
     619              : }
     620              : impl<K: CompactionKey> KeyspaceWindowPos<K> {
     621            0 :     fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
     622            0 :         self.keyspace_idx == w.keyspace.len()
     623            0 :     }
     624              : 
     625              :     // Advance the cursor until it reaches 'target_keysize'.
     626            0 :     fn advance_until_size(&mut self, w: &KeyspaceWindowHead<K>, max_size: u64) {
     627            0 :         while self.accum_keysize < max_size && !self.reached_end(w) {
     628            0 :             let curr_range = &w.keyspace[self.keyspace_idx];
     629            0 :             if self.end_key < curr_range.start {
     630            0 :                 // skip over any unused space
     631            0 :                 self.end_key = curr_range.start;
     632            0 :             }
     633              : 
     634              :             // We're now within 'curr_range'. Can we advance past it completely?
     635            0 :             let distance = K::key_range_size(&(self.end_key..curr_range.end));
     636            0 :             if (self.accum_keysize + distance as u64) < max_size {
     637            0 :                 // oh yeah, it fits
     638            0 :                 self.end_key = curr_range.end;
     639            0 :                 self.keyspace_idx += 1;
     640            0 :                 self.accum_keysize += distance as u64;
     641            0 :             } else {
     642              :                 // advance within the range
     643            0 :                 let skip_key = self.end_key.skip_some();
     644            0 :                 let distance = K::key_range_size(&(self.end_key..skip_key));
     645            0 :                 if (self.accum_keysize + distance as u64) < max_size {
     646            0 :                     self.end_key = skip_key;
     647            0 :                     self.accum_keysize += distance as u64;
     648            0 :                 } else {
     649            0 :                     self.end_key = self.end_key.next();
     650            0 :                     self.accum_keysize += 1;
     651            0 :                 }
     652              :             }
     653              :         }
     654            0 :     }
     655              : }
     656              : 
     657              : impl<K> KeyspaceWindow<K>
     658              : where
     659              :     K: CompactionKey,
     660              : {
     661            0 :     fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
     662            0 :         assert!(keyspace.first().unwrap().start >= key_range.start);
     663              : 
     664            0 :         let start_key = key_range.start;
     665            0 :         let start_pos = KeyspaceWindowPos::<K> {
     666            0 :             end_key: start_key,
     667            0 :             keyspace_idx: 0,
     668            0 :             accum_keysize: 0,
     669            0 :         };
     670            0 :         Self {
     671            0 :             head: KeyspaceWindowHead::<K> {
     672            0 :                 key_range,
     673            0 :                 keyspace,
     674            0 :                 target_keysize,
     675            0 :             },
     676            0 :             start_pos,
     677            0 :         }
     678            0 :     }
     679              : 
     680            0 :     fn choose_next_image(&mut self) -> Option<Range<K>> {
     681            0 :         if self.start_pos.keyspace_idx == self.head.keyspace.len() {
     682              :             // we've reached the end
     683            0 :             return None;
     684            0 :         }
     685            0 : 
     686            0 :         let mut next_pos = self.start_pos.clone();
     687            0 :         next_pos.advance_until_size(
     688            0 :             &self.head,
     689            0 :             self.start_pos.accum_keysize + self.head.target_keysize,
     690            0 :         );
     691            0 : 
     692            0 :         // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
     693            0 :         // 1.25x target size
     694            0 :         let mut end_pos = next_pos.clone();
     695            0 :         end_pos.advance_until_size(
     696            0 :             &self.head,
     697            0 :             self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
     698            0 :         );
     699            0 :         if end_pos.reached_end(&self.head) {
     700              :             // gobble up any unused keyspace between the last used key and end of the range
     701            0 :             assert!(end_pos.end_key <= self.head.key_range.end);
     702            0 :             end_pos.end_key = self.head.key_range.end;
     703            0 :             next_pos = end_pos;
     704            0 :         }
     705              : 
     706            0 :         let start_key = self.start_pos.end_key;
     707            0 :         self.start_pos = next_pos;
     708            0 :         Some(start_key..self.start_pos.end_key)
     709            0 :     }
     710              : }
     711              : 
     712              : // Sliding window through keyspace and values
     713              : //
     714              : // This is used to decide what layer to write next, from the beginning of the window.
     715              : //
     716              : // Candidates:
     717              : //
     718              : // 1. Create an image layer, snapping to previous images
     719              : // 2. Create a delta layer, snapping to previous images
     720              : // 3. Create an image layer, snapping to
     721              : //
     722              : //
     723              : 
     724              : // Take previous partitioning, based on the image layers below.
     725              : //
     726              : // Candidate is at the front:
     727              : //
     728              : // Consider stretching an image layer to next divider? If it's close enough,
     729              : // that's the image candidate
     730              : //
     731              : // If it's too far, consider splitting at a reasonable point
     732              : //
     733              : // Is the image candidate smaller than the equivalent delta? If so,
     734              : // split off the image. Otherwise, split off one delta.
     735              : // Try to snap off the delta at a reasonable point
     736              : 
     737              : struct WindowElement<K> {
     738              :     start_key: K, // inclusive
     739              :     last_key: K,  // inclusive
     740              :     accum_size: u64,
     741              : }
     742              : struct Window<K> {
     743              :     elems: VecDeque<WindowElement<K>>,
     744              : 
     745              :     // last key that was split off, inclusive
     746              :     splitoff_key: Option<K>,
     747              :     splitoff_size: u64,
     748              : }
     749              : 
     750              : impl<K> Window<K>
     751              : where
     752              :     K: CompactionKey,
     753              : {
     754            0 :     fn new() -> Self {
     755            0 :         Self {
     756            0 :             elems: VecDeque::new(),
     757            0 :             splitoff_key: None,
     758            0 :             splitoff_size: 0,
     759            0 :         }
     760            0 :     }
     761              : 
     762            0 :     fn feed(&mut self, key: K, size: u64) {
     763              :         let last_size;
     764            0 :         if let Some(last) = self.elems.back_mut() {
     765            0 :             assert!(last.last_key <= key);
     766            0 :             if key == last.last_key {
     767            0 :                 last.accum_size += size;
     768            0 :                 return;
     769            0 :             }
     770            0 :             last_size = last.accum_size;
     771            0 :         } else {
     772            0 :             last_size = 0;
     773            0 :         }
     774              :         // This is a new key.
     775            0 :         let elem = WindowElement {
     776            0 :             start_key: key,
     777            0 :             last_key: key,
     778            0 :             accum_size: last_size + size,
     779            0 :         };
     780            0 :         self.elems.push_back(elem);
     781            0 :     }
     782              : 
     783            0 :     fn remain_size(&self) -> u64 {
     784            0 :         self.elems.back().unwrap().accum_size - self.splitoff_size
     785            0 :     }
     786              : 
     787            0 :     fn peek_size(&self) -> u64 {
     788            0 :         self.elems.front().unwrap().accum_size - self.splitoff_size
     789            0 :     }
     790              : 
     791            0 :     fn commit_upto(&mut self, mut upto: usize) {
     792            0 :         while upto > 1 {
     793            0 :             let popped = self.elems.pop_front().unwrap();
     794            0 :             self.elems.front_mut().unwrap().start_key = popped.start_key;
     795            0 :             upto -= 1;
     796            0 :         }
     797            0 :     }
     798              : 
     799            0 :     fn find_size_split(&self, target_size: u64) -> usize {
     800            0 :         self.elems
     801            0 :             .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
     802            0 :     }
     803              : 
     804            0 :     fn pop(&mut self) {
     805            0 :         let first = self.elems.pop_front().unwrap();
     806            0 :         self.splitoff_size = first.accum_size;
     807            0 : 
     808            0 :         self.splitoff_key = Some(first.last_key);
     809            0 :     }
     810              : 
     811              :     // the difference between delta and image is that an image covers
     812              :     // any unused keyspace before and after, while a delta tries to
     813              :     // minimize that. TODO: difference not implemented
     814            0 :     fn pop_delta(&mut self) -> Range<K> {
     815            0 :         let first = self.elems.front().unwrap();
     816            0 :         let key_range = first.start_key..first.last_key.next();
     817            0 : 
     818            0 :         self.pop();
     819            0 :         key_range
     820            0 :     }
     821              : 
     822              :     // Prerequisite: we have enough input in the window
     823              :     //
     824              :     // On return None, the caller should feed more data and call again
     825            0 :     fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
     826            0 :         if has_more && self.elems.is_empty() {
     827              :             // Starting up
     828            0 :             return None;
     829            0 :         }
     830              : 
     831              :         // If we still have an undersized candidate, just keep going
     832            0 :         while self.peek_size() < target_size {
     833            0 :             if self.elems.len() > 1 {
     834            0 :                 self.commit_upto(2);
     835            0 :             } else if has_more {
     836            0 :                 return None;
     837              :             } else {
     838            0 :                 break;
     839              :             }
     840              :         }
     841              : 
     842              :         // Ensure we have enough input in the window to make a good decision
     843            0 :         if has_more && self.remain_size() < target_size * 5 / 4 {
     844            0 :             return None;
     845            0 :         }
     846            0 : 
     847            0 :         // The candidate on the front is now large enough, for a delta.
     848            0 :         // And we have enough data in the window to decide.
     849            0 : 
     850            0 :         // If we're willing to stretch it up to 1.25 target size, could we
     851            0 :         // gobble up the rest of the work? This avoids creating very small
     852            0 :         // "tail" layers at the end of the keyspace
     853            0 :         if !has_more && self.remain_size() < target_size * 5 / 3 {
     854            0 :             self.commit_upto(self.elems.len());
     855            0 :         } else {
     856            0 :             let delta_split_at = self.find_size_split(target_size);
     857            0 :             self.commit_upto(delta_split_at);
     858            0 : 
     859            0 :             // If it's still not large enough, request the caller to fill the window
     860            0 :             if self.elems.len() == 1 && has_more {
     861            0 :                 return None;
     862            0 :             }
     863              :         }
     864            0 :         Some(self.pop_delta())
     865            0 :     }
     866              : }
        

Generated by: LCOV version 2.1-beta