LCOV - code coverage report
Current view: top level - pageserver/compaction/src - compact_tiered.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 68.5 % 463 317
Test Date: 2024-05-10 13:18:37 Functions: 29.5 % 105 31

            Line data    Source code
       1              : //! # Tiered compaction algorithm.
       2              : //!
       3              : //! Read all the input delta files, and write a new set of delta files that
       4              : //! include all the input WAL records. See retile_deltas().
       5              : //!
       6              : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
       7              : //! later values, but in our system, we keep all the history. So the reshuffling
       8              : //! doesn't remove any garbage, it just reshuffles the records to reduce read
       9              : //! amplification, i.e. the number of files that you need to access to find the
      10              : //! WAL records for a given key.
      11              : //!
      12              : //! If the new delta files would be very "narrow", i.e. each file would cover
      13              : //! only a narrow key range, then we create a new set of image files
      14              : //! instead. The current threshold is that if the estimated total size of the
      15              : //! image layers is smaller than the size of the deltas, then we create image
      16              : //! layers. That amounts to 2x storage amplification, and it means that the
      17              : //! distance of image layers in LSN dimension is roughly equal to the logical
      18              : //! database size. For example, if the logical database size is 10 GB, we would
      19              : //! generate new image layers every 10 GB of WAL.
      20              : use futures::StreamExt;
      21              : use pageserver_api::shard::ShardIdentity;
      22              : use tracing::{debug, info};
      23              : 
      24              : use std::collections::{HashSet, VecDeque};
      25              : use std::ops::Range;
      26              : 
      27              : use crate::helpers::{
      28              :     accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with,
      29              : };
      30              : use crate::interface::*;
      31              : use utils::lsn::Lsn;
      32              : 
      33              : use crate::identify_levels::identify_level;
      34              : 
      35              : /// Main entry point to compaction.
      36              : ///
      37              : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
      38              : /// everything below that point, that needs compaction. The cutoff LSN must
      39              : /// partition the layers so that there are no layers that span across that
      40              : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
      41              : /// written last L0 layer.
      42            2 : pub async fn compact_tiered<E: CompactionJobExecutor>(
      43            2 :     executor: &mut E,
      44            2 :     end_lsn: Lsn,
      45            2 :     target_file_size: u64,
      46            2 :     fanout: u64,
      47            2 :     ctx: &E::RequestContext,
      48            2 : ) -> anyhow::Result<()> {
      49            2 :     assert!(fanout >= 1, "fanout needs to be at least 1 but is {fanout}");
      50            2 :     let exp_base = fanout.max(2);
      51            2 :     // Start at L0
      52            2 :     let mut current_level_no = 0;
      53            2 :     let mut current_level_target_height = target_file_size;
      54              :     loop {
      55              :         // end LSN +1 to include possible image layers exactly at 'end_lsn'.
      56            4 :         let all_layers = executor
      57            4 :             .get_layers(
      58            4 :                 &(E::Key::MIN..E::Key::MAX),
      59            4 :                 &(Lsn(u64::MIN)..end_lsn + 1),
      60            4 :                 ctx,
      61            4 :             )
      62            0 :             .await?;
      63            4 :         info!(
      64            0 :             "Compacting L{}, total # of layers: {}",
      65            0 :             current_level_no,
      66            0 :             all_layers.len()
      67              :         );
      68              : 
      69              :         // Identify the range of LSNs that belong to this level. We assume that
      70              :         // each file in this level spans an LSN range up to 1.75x target file
      71              :         // size. That should give us enough slop that if we created a slightly
      72              :         // oversized L0 layer, e.g. because flushing the in-memory layer was
      73              :         // delayed for some reason, we don't consider the oversized layer to
      74              :         // belong to L1. But not too much slop, that we don't accidentally
      75              :         // "skip" levels.
      76            4 :         let max_height = (current_level_target_height as f64 * 1.75) as u64;
      77            4 :         let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
      78            2 :             break;
      79              :         };
      80              : 
      81              :         // Calculate the height of this level. If the # of tiers exceeds the
      82              :         // fanout parameter, it's time to compact it.
      83            2 :         let depth = level.depth();
      84            2 :         info!(
      85            0 :             "Level {} identified as LSN range {}-{}: depth {}",
      86              :             current_level_no, level.lsn_range.start, level.lsn_range.end, depth
      87              :         );
      88           80 :         for l in &level.layers {
      89           78 :             debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
      90              :         }
      91            2 :         if depth < fanout {
      92            0 :             debug!(
      93              :                 level = current_level_no,
      94              :                 depth = depth,
      95              :                 fanout,
      96            0 :                 "too few deltas to compact"
      97              :             );
      98            0 :             break;
      99            2 :         }
     100            2 : 
     101            2 :         compact_level(
     102            2 :             &level.lsn_range,
     103            2 :             &level.layers,
     104            2 :             executor,
     105            2 :             target_file_size,
     106            2 :             ctx,
     107            2 :         )
     108            0 :         .await?;
     109            2 :         if target_file_size == u64::MAX {
     110            0 :             break;
     111            2 :         }
     112            2 :         current_level_no += 1;
     113            2 :         current_level_target_height = current_level_target_height.saturating_mul(exp_base);
     114              :     }
     115            2 :     Ok(())
     116            2 : }
     117              : 
     118            2 : async fn compact_level<E: CompactionJobExecutor>(
     119            2 :     lsn_range: &Range<Lsn>,
     120            2 :     layers: &[E::Layer],
     121            2 :     executor: &mut E,
     122            2 :     target_file_size: u64,
     123            2 :     ctx: &E::RequestContext,
     124            2 : ) -> anyhow::Result<bool> {
     125            2 :     let mut layer_fragments = Vec::new();
     126           80 :     for l in layers {
     127           78 :         layer_fragments.push(LayerFragment::new(l.clone()));
     128           78 :     }
     129              : 
     130            2 :     let mut state = LevelCompactionState {
     131            2 :         shard_identity: *executor.get_shard_identity(),
     132            2 :         target_file_size,
     133            2 :         _lsn_range: lsn_range.clone(),
     134            2 :         layers: layer_fragments,
     135            2 :         jobs: Vec::new(),
     136            2 :         job_queue: Vec::new(),
     137            2 :         next_level: false,
     138            2 :         executor,
     139            2 :     };
     140            2 : 
     141            2 :     let first_job = CompactionJob {
     142            2 :         key_range: E::Key::MIN..E::Key::MAX,
     143            2 :         lsn_range: lsn_range.clone(),
     144            2 :         strategy: CompactionStrategy::Divide,
     145            2 :         input_layers: state
     146            2 :             .layers
     147            2 :             .iter()
     148            2 :             .enumerate()
     149           78 :             .map(|i| LayerId(i.0))
     150            2 :             .collect(),
     151            2 :         completed: false,
     152            2 :     };
     153            2 : 
     154            2 :     state.jobs.push(first_job);
     155            2 :     state.job_queue.push(JobId(0));
     156            2 :     state.execute(ctx).await?;
     157              : 
     158            2 :     info!(
     159            0 :         "compaction completed! Need to process next level: {}",
     160              :         state.next_level
     161              :     );
     162              : 
     163            2 :     Ok(state.next_level)
     164            2 : }
     165              : 
     166              : /// Blackboard that keeps track of the state of all the jobs and work remaining
     167              : struct LevelCompactionState<'a, E>
     168              : where
     169              :     E: CompactionJobExecutor,
     170              : {
     171              :     shard_identity: ShardIdentity,
     172              : 
     173              :     // parameters
     174              :     target_file_size: u64,
     175              : 
     176              :     _lsn_range: Range<Lsn>,
     177              :     layers: Vec<LayerFragment<E>>,
     178              : 
     179              :     // job queue
     180              :     jobs: Vec<CompactionJob<E>>,
     181              :     job_queue: Vec<JobId>,
     182              : 
     183              :     /// If false, no need to compact levels below this
     184              :     next_level: bool,
     185              : 
     186              :     /// Interface to the outside world
     187              :     executor: &'a mut E,
     188              : }
     189              : 
     190              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     191              : struct LayerId(usize);
     192              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
     193              : struct JobId(usize);
     194              : 
     195              : struct PendingJobSet {
     196              :     pending: HashSet<JobId>,
     197              :     completed: HashSet<JobId>,
     198              : }
     199              : 
     200              : impl PendingJobSet {
     201           78 :     fn new() -> Self {
     202           78 :         PendingJobSet {
     203           78 :             pending: HashSet::new(),
     204           78 :             completed: HashSet::new(),
     205           78 :         }
     206           78 :     }
     207              : 
     208         3042 :     fn complete_job(&mut self, job_id: JobId) {
     209         3042 :         self.pending.remove(&job_id);
     210         3042 :         self.completed.insert(job_id);
     211         3042 :     }
     212              : 
     213         3042 :     fn all_completed(&self) -> bool {
     214         3042 :         self.pending.is_empty()
     215         3042 :     }
     216              : }
     217              : 
     218              : // When we decide to rewrite a set of layers, LayerFragment is used to keep
     219              : // track which new layers supersede an old layer. When all the stakeholder jobs
     220              : // have completed, this layer can be deleted.
     221              : struct LayerFragment<E>
     222              : where
     223              :     E: CompactionJobExecutor,
     224              : {
     225              :     layer: E::Layer,
     226              : 
     227              :     // If we will write new layers to replace this one, this keeps track of the
     228              :     // jobs that need to complete before this layer can be deleted. As the jobs
     229              :     // complete, they are moved from 'pending' to 'completed' set. Once the
     230              :     // 'pending' set becomes empty, the layer can be deleted.
     231              :     //
     232              :     // If None, this layer is not rewritten and must not be deleted.
     233              :     deletable_after: Option<PendingJobSet>,
     234              : 
     235              :     deleted: bool,
     236              : }
     237              : 
     238              : impl<E> LayerFragment<E>
     239              : where
     240              :     E: CompactionJobExecutor,
     241              : {
     242           78 :     fn new(layer: E::Layer) -> Self {
     243           78 :         LayerFragment {
     244           78 :             layer,
     245           78 :             deletable_after: None,
     246           78 :             deleted: false,
     247           78 :         }
     248           78 :     }
     249              : }
     250              : 
     251              : #[derive(PartialEq)]
     252              : enum CompactionStrategy {
     253              :     Divide,
     254              :     CreateDelta,
     255              :     CreateImage,
     256              : }
     257              : 
     258              : struct CompactionJob<E: CompactionJobExecutor> {
     259              :     key_range: Range<E::Key>,
     260              :     lsn_range: Range<Lsn>,
     261              : 
     262              :     strategy: CompactionStrategy,
     263              : 
     264              :     input_layers: Vec<LayerId>,
     265              : 
     266              :     completed: bool,
     267              : }
     268              : 
     269              : impl<'a, E> LevelCompactionState<'a, E>
     270              : where
     271              :     E: CompactionJobExecutor,
     272              : {
     273              :     /// Main loop of the executor.
     274              :     ///
     275              :     /// In each iteration, we take the next job from the queue, and execute it.
     276              :     /// The execution might add new jobs to the queue. Keep going until the
     277              :     /// queue is empty.
     278              :     ///
     279              :     /// Initially, the job queue consists of one Divide job over the whole
     280              :     /// level. On first call, it is divided into smaller jobs.
     281            2 :     async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
     282              :         // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
     283           82 :         while let Some(next_job_id) = self.job_queue.pop() {
     284           80 :             info!("executing job {}", next_job_id.0);
     285           80 :             self.execute_job(next_job_id, ctx).await?;
     286              :         }
     287              : 
     288              :         // all done!
     289            2 :         Ok(())
     290            2 :     }
     291              : 
     292           80 :     async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     293           80 :         let job = &self.jobs[job_id.0];
     294           80 :         match job.strategy {
     295              :             CompactionStrategy::Divide => {
     296            2 :                 self.divide_job(job_id, ctx).await?;
     297            2 :                 Ok(())
     298              :             }
     299              :             CompactionStrategy::CreateDelta => {
     300           78 :                 let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     301           78 :                 let mut layer_ids: Vec<LayerId> = Vec::new();
     302         3120 :                 for layer_id in &job.input_layers {
     303         3042 :                     let layer = &self.layers[layer_id.0].layer;
     304         3042 :                     if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
     305         3042 :                         deltas.push(dl.clone());
     306         3042 :                         layer_ids.push(*layer_id);
     307         3042 :                     }
     308              :                 }
     309              : 
     310           78 :                 self.executor
     311           78 :                     .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
     312            0 :                     .await?;
     313           78 :                 self.jobs[job_id.0].completed = true;
     314              : 
     315              :                 // did we complete any fragments?
     316         3120 :                 for layer_id in layer_ids {
     317         3042 :                     let l = &mut self.layers[layer_id.0];
     318         3042 :                     if let Some(deletable_after) = l.deletable_after.as_mut() {
     319         3042 :                         deletable_after.complete_job(job_id);
     320         3042 :                         if deletable_after.all_completed() {
     321           78 :                             self.executor.delete_layer(&l.layer, ctx).await?;
     322           78 :                             l.deleted = true;
     323         2964 :                         }
     324            0 :                     }
     325              :                 }
     326              : 
     327           78 :                 self.next_level = true;
     328           78 : 
     329           78 :                 Ok(())
     330              :             }
     331              :             CompactionStrategy::CreateImage => {
     332            0 :                 self.executor
     333            0 :                     .create_image(job.lsn_range.end, &job.key_range, ctx)
     334            0 :                     .await?;
     335            0 :                 self.jobs[job_id.0].completed = true;
     336            0 : 
     337            0 :                 // TODO: we could check if any layers < PITR horizon became deletable
     338            0 :                 Ok(())
     339              :             }
     340              :         }
     341           80 :     }
     342              : 
     343           78 :     fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
     344           78 :         let job_id = JobId(self.jobs.len());
     345           78 :         self.jobs.push(job);
     346           78 :         self.job_queue.push(job_id);
     347           78 :         job_id
     348           78 :     }
     349              : 
     350              :     /// Take a partition of the key space, and decide how to compact it.
     351              :     ///
     352              :     /// TODO: Currently, this is called exactly once for the level, and we
     353              :     /// decide whether to create new image layers to cover the whole level, or
     354              :     /// write a new set of deltas. In the future, this should try to partition
     355              :     /// the key space, and make the decision separately for each partition.
     356            2 :     async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
     357            2 :         let job = &self.jobs[job_id.0];
     358            2 :         assert!(job.strategy == CompactionStrategy::Divide);
     359              : 
     360              :         // Check for dummy cases
     361            2 :         if job.input_layers.is_empty() {
     362            0 :             return Ok(());
     363            2 :         }
     364            2 : 
     365            2 :         let job = &self.jobs[job_id.0];
     366            2 :         assert!(job.strategy == CompactionStrategy::Divide);
     367              : 
     368              :         // Would it be better to create images for this partition?
     369              :         // Decide based on the average density of the level
     370            2 :         let keyspace_size = keyspace_total_size(
     371            2 :             &self
     372            2 :                 .executor
     373            2 :                 .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     374            0 :                 .await?,
     375            2 :             &self.shard_identity,
     376            2 :         ) * 8192;
     377            2 : 
     378            2 :         let wal_size = job
     379            2 :             .input_layers
     380            2 :             .iter()
     381           78 :             .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
     382           78 :             .map(|layer_id| self.layers[layer_id.0].layer.file_size())
     383            2 :             .sum::<u64>();
     384            2 :         if keyspace_size < wal_size {
     385              :             // seems worth it
     386            0 :             info!(
     387            0 :                 "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
     388              :                 keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
     389              :             );
     390            0 :             self.cover_with_images(job_id, ctx).await
     391              :         } else {
     392              :             // do deltas
     393            2 :             info!(
     394            0 :                 "coverage not worth it, keyspace_size {}, wal_size {}",
     395              :                 keyspace_size, wal_size
     396              :             );
     397            2 :             self.retile_deltas(job_id, ctx).await
     398              :         }
     399            2 :     }
     400              : 
     401              :     // LSN
     402              :     //  ^
     403              :     //  |
     404              :     //  |                          ###|###|#####
     405              :     //  | +--+-----+--+            +--+-----+--+
     406              :     //  | |  |     |  |            |  |     |  |
     407              :     //  | +--+--+--+--+            +--+--+--+--+
     408              :     //  | |     |     |            |     |     |
     409              :     //  | +---+-+-+---+     ==>    +---+-+-+---+
     410              :     //  | |   |   |   |            |   |   |   |
     411              :     //  | +---+-+-++--+            +---+-+-++--+
     412              :     //  | |     |  |  |            |     |  |  |
     413              :     //  | +-----+--+--+            +-----+--+--+
     414              :     //  |
     415              :     //  +--------------> key
     416              :     //
     417            0 :     async fn cover_with_images(
     418            0 :         &mut self,
     419            0 :         job_id: JobId,
     420            0 :         ctx: &E::RequestContext,
     421            0 :     ) -> anyhow::Result<()> {
     422            0 :         let job = &self.jobs[job_id.0];
     423            0 :         assert!(job.strategy == CompactionStrategy::Divide);
     424              : 
     425              :         // XXX: do we still need the "holes" stuff?
     426              : 
     427            0 :         let mut new_jobs = Vec::new();
     428              : 
     429              :         // Slide a window through the keyspace
     430            0 :         let keyspace = self
     431            0 :             .executor
     432            0 :             .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
     433            0 :             .await?;
     434              : 
     435            0 :         let mut window = KeyspaceWindow::new(
     436            0 :             E::Key::MIN..E::Key::MAX,
     437            0 :             keyspace,
     438            0 :             self.target_file_size / 8192,
     439            0 :         );
     440            0 :         while let Some(key_range) = window.choose_next_image(&self.shard_identity) {
     441            0 :             new_jobs.push(CompactionJob::<E> {
     442            0 :                 key_range,
     443            0 :                 lsn_range: job.lsn_range.clone(),
     444            0 :                 strategy: CompactionStrategy::CreateImage,
     445            0 :                 input_layers: Vec::new(), // XXX: Is it OK for  this to be empty for image layer?
     446            0 :                 completed: false,
     447            0 :             });
     448            0 :         }
     449              : 
     450            0 :         for j in new_jobs.into_iter().rev() {
     451            0 :             let _job_id = self.push_job(j);
     452            0 : 
     453            0 :             // TODO: image layers don't let us delete anything. unless < PITR horizon
     454            0 :             //let j = &self.jobs[job_id.0];
     455            0 :             // for layer_id in j.input_layers.iter() {
     456            0 :             //    self.layers[layer_id.0].pending_stakeholders.insert(job_id);
     457            0 :             //}
     458            0 :         }
     459              : 
     460            0 :         Ok(())
     461            0 :     }
     462              : 
     463              :     // Merge the contents of all the input delta layers into a new set
     464              :     // of delta layers, based on the current partitioning.
     465              :     //
     466              :     // We split the new delta layers on the key dimension. We iterate through
     467              :     // the key space, and for each key, check if including the next key to the
     468              :     // current output layer we're building would cause the layer to become too
     469              :     // large. If so, dump the current output layer and start new one.  It's
     470              :     // possible that there is a single key with so many page versions that
     471              :     // storing all of them in a single layer file would be too large. In that
     472              :     // case, we also split on the LSN dimension.
     473              :     //
     474              :     // LSN
     475              :     //  ^
     476              :     //  |
     477              :     //  | +-----------+            +--+--+--+--+
     478              :     //  | |           |            |  |  |  |  |
     479              :     //  | +-----------+            |  |  |  |  |
     480              :     //  | |           |            |  |  |  |  |
     481              :     //  | +-----------+     ==>    |  |  |  |  |
     482              :     //  | |           |            |  |  |  |  |
     483              :     //  | +-----------+            |  |  |  |  |
     484              :     //  | |           |            |  |  |  |  |
     485              :     //  | +-----------+            +--+--+--+--+
     486              :     //  |
     487              :     //  +--------------> key
     488              :     //
     489              :     //
     490              :     // If one key (X) has a lot of page versions:
     491              :     //
     492              :     // LSN
     493              :     //  ^
     494              :     //  |                                 (X)
     495              :     //  | +-----------+            +--+--+--+--+
     496              :     //  | |           |            |  |  |  |  |
     497              :     //  | +-----------+            |  |  +--+  |
     498              :     //  | |           |            |  |  |  |  |
     499              :     //  | +-----------+     ==>    |  |  |  |  |
     500              :     //  | |           |            |  |  +--+  |
     501              :     //  | +-----------+            |  |  |  |  |
     502              :     //  | |           |            |  |  |  |  |
     503              :     //  | +-----------+            +--+--+--+--+
     504              :     //  |
     505              :     //  +--------------> key
     506              :     //
     507              :     // TODO: this actually divides the layers into fixed-size chunks, not
     508              :     // based on the partitioning.
     509              :     //
     510              :     // TODO: we should also opportunistically materialize and
     511              :     // garbage collect what we can.
     512            2 :     async fn retile_deltas(
     513            2 :         &mut self,
     514            2 :         job_id: JobId,
     515            2 :         ctx: &E::RequestContext,
     516            2 :     ) -> anyhow::Result<()> {
     517            2 :         let job = &self.jobs[job_id.0];
     518            2 :         assert!(job.strategy == CompactionStrategy::Divide);
     519              : 
     520              :         // Sweep the key space left to right, running an estimate of how much
     521              :         // disk size and keyspace we have accumulated
     522              :         //
     523              :         // Once the disk size reaches the target threshold, stop and think.
     524              :         // If we have accumulated only a narrow band of keyspace, create an
     525              :         // image layer. Otherwise write a delta layer.
     526              : 
     527              :         // FIXME: deal with the case of lots of values for same key
     528              : 
     529              :         // FIXME: we are ignoring images here. Did we already divide the work
     530              :         // so that we won't encounter them here?
     531              : 
     532            2 :         let mut deltas: Vec<E::DeltaLayer> = Vec::new();
     533           80 :         for layer_id in &job.input_layers {
     534           78 :             let l = &self.layers[layer_id.0];
     535           78 :             if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
     536           78 :                 deltas.push(dl.clone());
     537           78 :             }
     538              :         }
     539              :         // Open stream
     540            2 :         let key_value_stream =
     541            2 :             std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
     542            0 :                 .await?
     543            2 :                 .map(Result::<_, anyhow::Error>::Ok));
     544            2 :         let mut new_jobs = Vec::new();
     545            2 : 
     546            2 :         // Slide a window through the keyspace
     547            2 :         let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
     548            2 :         let mut all_in_window: bool = false;
     549            2 :         let mut window = Window::new();
     550        64628 :         loop {
     551        64628 :             if all_in_window && window.elems.is_empty() {
     552              :                 // All done!
     553            2 :                 break;
     554        64626 :             }
     555        64626 :             if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
     556              :             {
     557           78 :                 let batch_layers: Vec<LayerId> = job
     558           78 :                     .input_layers
     559           78 :                     .iter()
     560         3042 :                     .filter(|layer_id| {
     561         3042 :                         overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
     562         3042 :                     })
     563           78 :                     .cloned()
     564           78 :                     .collect();
     565           78 :                 assert!(!batch_layers.is_empty());
     566           78 :                 new_jobs.push(CompactionJob {
     567           78 :                     key_range,
     568           78 :                     lsn_range: job.lsn_range.clone(),
     569           78 :                     strategy: CompactionStrategy::CreateDelta,
     570           78 :                     input_layers: batch_layers,
     571           78 :                     completed: false,
     572           78 :                 });
     573              :             } else {
     574        64548 :                 assert!(!all_in_window);
     575        64548 :                 if let Some(next_key) = key_accum.next().await.transpose()? {
     576        64546 :                     window.feed(next_key.key, next_key.size);
     577        64546 :                 } else {
     578            2 :                     all_in_window = true;
     579            2 :                 }
     580              :             }
     581              :         }
     582              : 
     583              :         // All the input files are rewritten. Set up the tracking for when they can
     584              :         // be deleted.
     585           78 :         for layer_id in job.input_layers.iter() {
     586           78 :             let l = &mut self.layers[layer_id.0];
     587           78 :             assert!(l.deletable_after.is_none());
     588           78 :             l.deletable_after = Some(PendingJobSet::new());
     589              :         }
     590           78 :         for j in new_jobs.into_iter().rev() {
     591           78 :             let job_id = self.push_job(j);
     592           78 :             let j = &self.jobs[job_id.0];
     593         3042 :             for layer_id in j.input_layers.iter() {
     594         3042 :                 self.layers[layer_id.0]
     595         3042 :                     .deletable_after
     596         3042 :                     .as_mut()
     597         3042 :                     .unwrap()
     598         3042 :                     .pending
     599         3042 :                     .insert(job_id);
     600         3042 :             }
     601              :         }
     602              : 
     603            2 :         Ok(())
     604            2 :     }
     605              : }
     606              : 
     607              : // Sliding window through keyspace and values
     608              : // This is used by over_with_images to decide on good split points
     609              : struct KeyspaceWindow<K> {
     610              :     head: KeyspaceWindowHead<K>,
     611              : 
     612              :     start_pos: KeyspaceWindowPos<K>,
     613              : }
     614              : struct KeyspaceWindowHead<K> {
     615              :     // overall key range to cover
     616              :     key_range: Range<K>,
     617              : 
     618              :     keyspace: Vec<Range<K>>,
     619              :     target_keysize: u64,
     620              : }
     621              : 
     622              : #[derive(Clone)]
     623              : struct KeyspaceWindowPos<K> {
     624              :     end_key: K,
     625              : 
     626              :     keyspace_idx: usize,
     627              : 
     628              :     accum_keysize: u64,
     629              : }
     630              : impl<K: CompactionKey> KeyspaceWindowPos<K> {
     631            0 :     fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
     632            0 :         self.keyspace_idx == w.keyspace.len()
     633            0 :     }
     634              : 
     635              :     // Advance the cursor until it reaches 'target_keysize'.
     636            0 :     fn advance_until_size(
     637            0 :         &mut self,
     638            0 :         w: &KeyspaceWindowHead<K>,
     639            0 :         max_size: u64,
     640            0 :         shard_identity: &ShardIdentity,
     641            0 :     ) {
     642            0 :         while self.accum_keysize < max_size && !self.reached_end(w) {
     643            0 :             let curr_range = &w.keyspace[self.keyspace_idx];
     644            0 :             if self.end_key < curr_range.start {
     645            0 :                 // skip over any unused space
     646            0 :                 self.end_key = curr_range.start;
     647            0 :             }
     648              : 
     649              :             // We're now within 'curr_range'. Can we advance past it completely?
     650            0 :             let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity);
     651            0 :             if (self.accum_keysize + distance as u64) < max_size {
     652            0 :                 // oh yeah, it fits
     653            0 :                 self.end_key = curr_range.end;
     654            0 :                 self.keyspace_idx += 1;
     655            0 :                 self.accum_keysize += distance as u64;
     656            0 :             } else {
     657              :                 // advance within the range
     658            0 :                 let skip_key = self.end_key.skip_some();
     659            0 :                 let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity);
     660            0 :                 if (self.accum_keysize + distance as u64) < max_size {
     661            0 :                     self.end_key = skip_key;
     662            0 :                     self.accum_keysize += distance as u64;
     663            0 :                 } else {
     664            0 :                     self.end_key = self.end_key.next();
     665            0 :                     self.accum_keysize += 1;
     666            0 :                 }
     667              :             }
     668              :         }
     669            0 :     }
     670              : }
     671              : 
     672              : impl<K> KeyspaceWindow<K>
     673              : where
     674              :     K: CompactionKey,
     675              : {
     676            0 :     fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
     677            0 :         assert!(keyspace.first().unwrap().start >= key_range.start);
     678              : 
     679            0 :         let start_key = key_range.start;
     680            0 :         let start_pos = KeyspaceWindowPos::<K> {
     681            0 :             end_key: start_key,
     682            0 :             keyspace_idx: 0,
     683            0 :             accum_keysize: 0,
     684            0 :         };
     685            0 :         Self {
     686            0 :             head: KeyspaceWindowHead::<K> {
     687            0 :                 key_range,
     688            0 :                 keyspace,
     689            0 :                 target_keysize,
     690            0 :             },
     691            0 :             start_pos,
     692            0 :         }
     693            0 :     }
     694              : 
     695            0 :     fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option<Range<K>> {
     696            0 :         if self.start_pos.keyspace_idx == self.head.keyspace.len() {
     697              :             // we've reached the end
     698            0 :             return None;
     699            0 :         }
     700            0 : 
     701            0 :         let mut next_pos = self.start_pos.clone();
     702            0 :         next_pos.advance_until_size(
     703            0 :             &self.head,
     704            0 :             self.start_pos.accum_keysize + self.head.target_keysize,
     705            0 :             shard_identity,
     706            0 :         );
     707            0 : 
     708            0 :         // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
     709            0 :         // 1.25x target size
     710            0 :         let mut end_pos = next_pos.clone();
     711            0 :         end_pos.advance_until_size(
     712            0 :             &self.head,
     713            0 :             self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
     714            0 :             shard_identity,
     715            0 :         );
     716            0 :         if end_pos.reached_end(&self.head) {
     717              :             // gobble up any unused keyspace between the last used key and end of the range
     718            0 :             assert!(end_pos.end_key <= self.head.key_range.end);
     719            0 :             end_pos.end_key = self.head.key_range.end;
     720            0 :             next_pos = end_pos;
     721            0 :         }
     722              : 
     723            0 :         let start_key = self.start_pos.end_key;
     724            0 :         self.start_pos = next_pos;
     725            0 :         Some(start_key..self.start_pos.end_key)
     726            0 :     }
     727              : }
     728              : 
     729              : // Take previous partitioning, based on the image layers below.
     730              : //
     731              : // Candidate is at the front:
     732              : //
     733              : // Consider stretching an image layer to next divider? If it's close enough,
     734              : // that's the image candidate
     735              : //
     736              : // If it's too far, consider splitting at a reasonable point
     737              : //
     738              : // Is the image candidate smaller than the equivalent delta? If so,
     739              : // split off the image. Otherwise, split off one delta.
     740              : // Try to snap off the delta at a reasonable point
     741              : 
     742              : struct WindowElement<K> {
     743              :     start_key: K, // inclusive
     744              :     last_key: K,  // inclusive
     745              :     accum_size: u64,
     746              : }
     747              : 
     748              : // Sliding window through keyspace and values
     749              : //
     750              : // This is used to decide what layer to write next, from the beginning of the window.
     751              : struct Window<K> {
     752              :     elems: VecDeque<WindowElement<K>>,
     753              : 
     754              :     // last key that was split off, inclusive
     755              :     splitoff_key: Option<K>,
     756              :     splitoff_size: u64,
     757              : }
     758              : 
     759              : impl<K> Window<K>
     760              : where
     761              :     K: CompactionKey,
     762              : {
     763            2 :     fn new() -> Self {
     764            2 :         Self {
     765            2 :             elems: VecDeque::new(),
     766            2 :             splitoff_key: None,
     767            2 :             splitoff_size: 0,
     768            2 :         }
     769            2 :     }
     770              : 
     771        64546 :     fn feed(&mut self, key: K, size: u64) {
     772              :         let last_size;
     773        64546 :         if let Some(last) = self.elems.back_mut() {
     774        64544 :             assert!(last.last_key <= key);
     775        64544 :             if key == last.last_key {
     776            0 :                 last.accum_size += size;
     777            0 :                 return;
     778        64544 :             }
     779        64544 :             last_size = last.accum_size;
     780            2 :         } else {
     781            2 :             last_size = 0;
     782            2 :         }
     783              :         // This is a new key.
     784        64546 :         let elem = WindowElement {
     785        64546 :             start_key: key,
     786        64546 :             last_key: key,
     787        64546 :             accum_size: last_size + size,
     788        64546 :         };
     789        64546 :         self.elems.push_back(elem);
     790        64546 :     }
     791              : 
     792        15853 :     fn remain_size(&self) -> u64 {
     793        15853 :         self.elems.back().unwrap().accum_size - self.splitoff_size
     794        15853 :     }
     795              : 
     796       129040 :     fn peek_size(&self) -> u64 {
     797       129040 :         self.elems.front().unwrap().accum_size - self.splitoff_size
     798       129040 :     }
     799              : 
     800        64494 :     fn commit_upto(&mut self, mut upto: usize) {
     801       128962 :         while upto > 1 {
     802        64468 :             let popped = self.elems.pop_front().unwrap();
     803        64468 :             self.elems.front_mut().unwrap().start_key = popped.start_key;
     804        64468 :             upto -= 1;
     805        64468 :         }
     806        64494 :     }
     807              : 
     808           76 :     fn find_size_split(&self, target_size: u64) -> usize {
     809           76 :         self.elems
     810          593 :             .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
     811           76 :     }
     812              : 
     813           78 :     fn pop(&mut self) {
     814           78 :         let first = self.elems.pop_front().unwrap();
     815           78 :         self.splitoff_size = first.accum_size;
     816           78 : 
     817           78 :         self.splitoff_key = Some(first.last_key);
     818           78 :     }
     819              : 
     820              :     // the difference between delta and image is that an image covers
     821              :     // any unused keyspace before and after, while a delta tries to
     822              :     // minimize that. TODO: difference not implemented
     823           78 :     fn pop_delta(&mut self) -> Range<K> {
     824           78 :         let first = self.elems.front().unwrap();
     825           78 :         let key_range = first.start_key..first.last_key.next();
     826           78 : 
     827           78 :         self.pop();
     828           78 :         key_range
     829           78 :     }
     830              : 
     831              :     // Prerequisite: we have enough input in the window
     832              :     //
     833              :     // On return None, the caller should feed more data and call again
     834        64626 :     fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
     835        64626 :         if has_more && self.elems.is_empty() {
     836              :             // Starting up
     837            2 :             return None;
     838        64624 :         }
     839              : 
     840              :         // If we still have an undersized candidate, just keep going
     841       129040 :         while self.peek_size() < target_size {
     842       113187 :             if self.elems.len() > 1 {
     843        64416 :                 self.commit_upto(2);
     844        64416 :             } else if has_more {
     845        48771 :                 return None;
     846              :             } else {
     847            0 :                 break;
     848              :             }
     849              :         }
     850              : 
     851              :         // Ensure we have enough input in the window to make a good decision
     852        15853 :         if has_more && self.remain_size() < target_size * 5 / 4 {
     853        15775 :             return None;
     854           78 :         }
     855           78 : 
     856           78 :         // The candidate on the front is now large enough, for a delta.
     857           78 :         // And we have enough data in the window to decide.
     858           78 : 
     859           78 :         // If we're willing to stretch it up to 1.25 target size, could we
     860           78 :         // gobble up the rest of the work? This avoids creating very small
     861           78 :         // "tail" layers at the end of the keyspace
     862           78 :         if !has_more && self.remain_size() < target_size * 5 / 3 {
     863            2 :             self.commit_upto(self.elems.len());
     864            2 :         } else {
     865           76 :             let delta_split_at = self.find_size_split(target_size);
     866           76 :             self.commit_upto(delta_split_at);
     867           76 : 
     868           76 :             // If it's still not large enough, request the caller to fill the window
     869           76 :             if self.elems.len() == 1 && has_more {
     870            0 :                 return None;
     871           76 :             }
     872              :         }
     873           78 :         Some(self.pop_delta())
     874        64626 :     }
     875              : }
        

Generated by: LCOV version 2.1-beta