Line data Source code
1 : //! # Tiered compaction algorithm.
2 : //!
3 : //! Read all the input delta files, and write a new set of delta files that
4 : //! include all the input WAL records. See retile_deltas().
5 : //!
6 : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
7 : //! later values, but in our system, we keep all the history. So the reshuffling
8 : //! doesn't remove any garbage, it just reshuffles the records to reduce read
9 : //! amplification, i.e. the number of files that you need to access to find the
10 : //! WAL records for a given key.
11 : //!
12 : //! If the new delta files would be very "narrow", i.e. each file would cover
13 : //! only a narrow key range, then we create a new set of image files
14 : //! instead. The current threshold is that if the estimated total size of the
15 : //! image layers is smaller than the size of the deltas, then we create image
16 : //! layers. That amounts to 2x storage amplification, and it means that the
17 : //! distance of image layers in LSN dimension is roughly equal to the logical
18 : //! database size. For example, if the logical database size is 10 GB, we would
19 : //! generate new image layers every 10 GB of WAL.
20 : use futures::StreamExt;
21 : use tracing::{debug, info};
22 :
23 : use std::collections::{HashSet, VecDeque};
24 : use std::ops::Range;
25 :
26 : use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with};
27 : use crate::interface::*;
28 : use utils::lsn::Lsn;
29 :
30 : use crate::identify_levels::identify_level;
31 :
32 : /// Main entry point to compaction.
33 : ///
34 : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
35 : /// everything below that point, that needs compaction. The cutoff LSN must
36 : /// partition the layers so that there are no layers that span across that
37 : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
38 : /// written last L0 layer.
39 0 : pub async fn compact_tiered<E: CompactionJobExecutor>(
40 0 : executor: &mut E,
41 0 : end_lsn: Lsn,
42 0 : target_file_size: u64,
43 0 : fanout: u64,
44 0 : ctx: &E::RequestContext,
45 0 : ) -> anyhow::Result<()> {
46 0 : assert!(fanout >= 2);
47 : // Start at L0
48 0 : let mut current_level_no = 0;
49 0 : let mut current_level_target_height = target_file_size;
50 : loop {
51 : // end LSN +1 to include possible image layers exactly at 'end_lsn'.
52 0 : let all_layers = executor
53 0 : .get_layers(
54 0 : &(E::Key::MIN..E::Key::MAX),
55 0 : &(Lsn(u64::MIN)..end_lsn + 1),
56 0 : ctx,
57 0 : )
58 0 : .await?;
59 0 : info!(
60 0 : "Compacting L{}, total # of layers: {}",
61 0 : current_level_no,
62 0 : all_layers.len()
63 0 : );
64 :
65 : // Identify the range of LSNs that belong to this level. We assume that
66 : // each file in this level span an LSN range up to 1.75x target file
67 : // size. That should give us enough slop that if we created a slightly
68 : // oversized L0 layer, e.g. because flushing the in-memory layer was
69 : // delayed for some reason, we don't consider the oversized layer to
70 : // belong to L1. But not too much slop, that we don't accidentally
71 : // "skip" levels.
72 0 : let max_height = (current_level_target_height as f64 * 1.75) as u64;
73 0 : let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
74 0 : break;
75 : };
76 :
77 : // Calculate the height of this level. If the # of tiers exceeds the
78 : // fanout parameter, it's time to compact it.
79 0 : let depth = level.depth();
80 0 : info!(
81 0 : "Level {} identified as LSN range {}-{}: depth {}",
82 0 : current_level_no, level.lsn_range.start, level.lsn_range.end, depth
83 0 : );
84 0 : for l in &level.layers {
85 0 : debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
86 : }
87 0 : if depth < fanout {
88 0 : debug!(
89 0 : level = current_level_no,
90 0 : depth = depth,
91 0 : fanout,
92 0 : "too few deltas to compact"
93 0 : );
94 0 : break;
95 0 : }
96 0 :
97 0 : compact_level(
98 0 : &level.lsn_range,
99 0 : &level.layers,
100 0 : executor,
101 0 : target_file_size,
102 0 : ctx,
103 0 : )
104 0 : .await?;
105 0 : if target_file_size == u64::MAX {
106 0 : break;
107 0 : }
108 0 : current_level_no += 1;
109 0 : current_level_target_height = current_level_target_height.saturating_mul(fanout);
110 : }
111 0 : Ok(())
112 0 : }
113 :
114 0 : async fn compact_level<E: CompactionJobExecutor>(
115 0 : lsn_range: &Range<Lsn>,
116 0 : layers: &[E::Layer],
117 0 : executor: &mut E,
118 0 : target_file_size: u64,
119 0 : ctx: &E::RequestContext,
120 0 : ) -> anyhow::Result<bool> {
121 0 : let mut layer_fragments = Vec::new();
122 0 : for l in layers {
123 0 : layer_fragments.push(LayerFragment::new(l.clone()));
124 0 : }
125 :
126 0 : let mut state = LevelCompactionState {
127 0 : target_file_size,
128 0 : _lsn_range: lsn_range.clone(),
129 0 : layers: layer_fragments,
130 0 : jobs: Vec::new(),
131 0 : job_queue: Vec::new(),
132 0 : next_level: false,
133 0 : executor,
134 0 : };
135 0 :
136 0 : let first_job = CompactionJob {
137 0 : key_range: E::Key::MIN..E::Key::MAX,
138 0 : lsn_range: lsn_range.clone(),
139 0 : strategy: CompactionStrategy::Divide,
140 0 : input_layers: state
141 0 : .layers
142 0 : .iter()
143 0 : .enumerate()
144 0 : .map(|i| LayerId(i.0))
145 0 : .collect(),
146 0 : completed: false,
147 0 : };
148 0 :
149 0 : state.jobs.push(first_job);
150 0 : state.job_queue.push(JobId(0));
151 0 : state.execute(ctx).await?;
152 :
153 0 : info!(
154 0 : "compaction completed! Need to process next level: {}",
155 0 : state.next_level
156 0 : );
157 :
158 0 : Ok(state.next_level)
159 0 : }
160 :
161 : /// Blackboard that keeps track of the state of all the jobs and work remaining
162 : struct LevelCompactionState<'a, E>
163 : where
164 : E: CompactionJobExecutor,
165 : {
166 : // parameters
167 : target_file_size: u64,
168 :
169 : _lsn_range: Range<Lsn>,
170 : layers: Vec<LayerFragment<E>>,
171 :
172 : // job queue
173 : jobs: Vec<CompactionJob<E>>,
174 : job_queue: Vec<JobId>,
175 :
176 : /// If false, no need to compact levels below this
177 : next_level: bool,
178 :
179 : /// Interface to the outside world
180 : executor: &'a mut E,
181 : }
182 :
183 0 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
184 : struct LayerId(usize);
185 0 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
186 : struct JobId(usize);
187 :
188 : struct PendingJobSet {
189 : pending: HashSet<JobId>,
190 : completed: HashSet<JobId>,
191 : }
192 :
193 : impl PendingJobSet {
194 0 : fn new() -> Self {
195 0 : PendingJobSet {
196 0 : pending: HashSet::new(),
197 0 : completed: HashSet::new(),
198 0 : }
199 0 : }
200 :
201 0 : fn complete_job(&mut self, job_id: JobId) {
202 0 : self.pending.remove(&job_id);
203 0 : self.completed.insert(job_id);
204 0 : }
205 :
206 0 : fn all_completed(&self) -> bool {
207 0 : self.pending.is_empty()
208 0 : }
209 : }
210 :
211 : // When we decide to rewrite a set of layers, LayerFragment is used to keep
212 : // track which new layers supersede an old layer. When all the stakeholder jobs
213 : // have completed, this layer can be deleted.
214 : struct LayerFragment<E>
215 : where
216 : E: CompactionJobExecutor,
217 : {
218 : layer: E::Layer,
219 :
220 : // If we will write new layers to replace this one, this keeps track of the
221 : // jobs that need to complete before this layer can be deleted. As the jobs
222 : // complete, they are moved from 'pending' to 'completed' set. Once the
223 : // 'pending' set becomes empty, the layer can be deleted.
224 : //
225 : // If None, this layer is not rewritten and must not be deleted.
226 : deletable_after: Option<PendingJobSet>,
227 :
228 : deleted: bool,
229 : }
230 :
231 : impl<E> LayerFragment<E>
232 : where
233 : E: CompactionJobExecutor,
234 : {
235 0 : fn new(layer: E::Layer) -> Self {
236 0 : LayerFragment {
237 0 : layer,
238 0 : deletable_after: None,
239 0 : deleted: false,
240 0 : }
241 0 : }
242 : }
243 :
244 0 : #[derive(PartialEq)]
245 : enum CompactionStrategy {
246 : Divide,
247 : CreateDelta,
248 : CreateImage,
249 : }
250 :
251 : #[allow(dead_code)] // Todo
252 : struct CompactionJob<E: CompactionJobExecutor> {
253 : key_range: Range<E::Key>,
254 : lsn_range: Range<Lsn>,
255 :
256 : strategy: CompactionStrategy,
257 :
258 : input_layers: Vec<LayerId>,
259 :
260 : completed: bool,
261 : }
262 :
263 : impl<'a, E> LevelCompactionState<'a, E>
264 : where
265 : E: CompactionJobExecutor,
266 : {
267 : /// Main loop of the executor.
268 : ///
269 : /// In each iteration, we take the next job from the queue, and execute it.
270 : /// The execution might add new jobs to the queue. Keep going until the
271 : /// queue is empty.
272 : ///
273 : /// Initially, the job queue consists of one Divide job over the whole
274 : /// level. On first call, it is divided into smaller jobs.
275 0 : async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
276 : // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
277 0 : while let Some(next_job_id) = self.job_queue.pop() {
278 0 : info!("executing job {}", next_job_id.0);
279 0 : self.execute_job(next_job_id, ctx).await?;
280 : }
281 :
282 : // all done!
283 0 : Ok(())
284 0 : }
285 :
286 0 : async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
287 0 : let job = &self.jobs[job_id.0];
288 0 : match job.strategy {
289 : CompactionStrategy::Divide => {
290 0 : self.divide_job(job_id, ctx).await?;
291 0 : Ok(())
292 : }
293 : CompactionStrategy::CreateDelta => {
294 0 : let mut deltas: Vec<E::DeltaLayer> = Vec::new();
295 0 : let mut layer_ids: Vec<LayerId> = Vec::new();
296 0 : for layer_id in &job.input_layers {
297 0 : let layer = &self.layers[layer_id.0].layer;
298 0 : if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
299 0 : deltas.push(dl.clone());
300 0 : layer_ids.push(*layer_id);
301 0 : }
302 : }
303 :
304 0 : self.executor
305 0 : .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
306 0 : .await?;
307 0 : self.jobs[job_id.0].completed = true;
308 :
309 : // did we complete any fragments?
310 0 : for layer_id in layer_ids {
311 0 : let l = &mut self.layers[layer_id.0];
312 0 : if let Some(deletable_after) = l.deletable_after.as_mut() {
313 0 : deletable_after.complete_job(job_id);
314 0 : if deletable_after.all_completed() {
315 0 : self.executor.delete_layer(&l.layer, ctx).await?;
316 0 : l.deleted = true;
317 0 : }
318 0 : }
319 : }
320 :
321 0 : self.next_level = true;
322 0 :
323 0 : Ok(())
324 : }
325 : CompactionStrategy::CreateImage => {
326 0 : self.executor
327 0 : .create_image(job.lsn_range.end, &job.key_range, ctx)
328 0 : .await?;
329 0 : self.jobs[job_id.0].completed = true;
330 0 :
331 0 : // TODO: we could check if any layers < PITR horizon became deletable
332 0 : Ok(())
333 : }
334 : }
335 0 : }
336 :
337 0 : fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
338 0 : let job_id = JobId(self.jobs.len());
339 0 : self.jobs.push(job);
340 0 : self.job_queue.push(job_id);
341 0 : job_id
342 0 : }
343 :
344 : /// Take a partition of the key space, and decide how to compact it.
345 : ///
346 : /// TODO: Currently, this is called exactly once for the level, and we
347 : /// decide whether to create new image layers to cover the whole level, or
348 : /// write a new set of delta. In the future, this should try to partition
349 : /// the key space, and make the decision separately for each partition.
350 0 : async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
351 0 : let job = &self.jobs[job_id.0];
352 0 : assert!(job.strategy == CompactionStrategy::Divide);
353 :
354 : // Check for dummy cases
355 0 : if job.input_layers.is_empty() {
356 0 : return Ok(());
357 0 : }
358 0 :
359 0 : let job = &self.jobs[job_id.0];
360 0 : assert!(job.strategy == CompactionStrategy::Divide);
361 :
362 : // Would it be better to create images for this partition?
363 : // Decide based on the average density of the level
364 0 : let keyspace_size = keyspace_total_size(
365 0 : &self
366 0 : .executor
367 0 : .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
368 0 : .await?,
369 : ) * 8192;
370 :
371 0 : let wal_size = job
372 0 : .input_layers
373 0 : .iter()
374 0 : .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
375 0 : .map(|layer_id| self.layers[layer_id.0].layer.file_size())
376 0 : .sum::<u64>();
377 0 : if keyspace_size < wal_size {
378 : // seems worth it
379 0 : info!(
380 0 : "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
381 0 : keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
382 0 : );
383 0 : self.cover_with_images(job_id, ctx).await
384 : } else {
385 : // do deltas
386 0 : info!(
387 0 : "coverage not worth it, keyspace_size {}, wal_size {}",
388 0 : keyspace_size, wal_size
389 0 : );
390 0 : self.retile_deltas(job_id, ctx).await
391 : }
392 0 : }
393 :
394 : // LSN
395 : // ^
396 : // |
397 : // | ###|###|#####
398 : // | +--+-----+--+ +--+-----+--+
399 : // | | | | | | | | |
400 : // | +--+--+--+--+ +--+--+--+--+
401 : // | | | | | | |
402 : // | +---+-+-+---+ ==> +---+-+-+---+
403 : // | | | | | | | | |
404 : // | +---+-+-++--+ +---+-+-++--+
405 : // | | | | | | | | |
406 : // | +-----+--+--+ +-----+--+--+
407 : // |
408 : // +--------------> key
409 : //
410 0 : async fn cover_with_images(
411 0 : &mut self,
412 0 : job_id: JobId,
413 0 : ctx: &E::RequestContext,
414 0 : ) -> anyhow::Result<()> {
415 0 : let job = &self.jobs[job_id.0];
416 0 : assert!(job.strategy == CompactionStrategy::Divide);
417 :
418 : // XXX: do we still need the "holes" stuff?
419 :
420 0 : let mut new_jobs = Vec::new();
421 :
422 : // Slide a window through the keyspace
423 0 : let keyspace = self
424 0 : .executor
425 0 : .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
426 0 : .await?;
427 :
428 0 : let mut window = KeyspaceWindow::new(
429 0 : E::Key::MIN..E::Key::MAX,
430 0 : keyspace,
431 0 : self.target_file_size / 8192,
432 0 : );
433 0 : while let Some(key_range) = window.choose_next_image() {
434 0 : new_jobs.push(CompactionJob::<E> {
435 0 : key_range,
436 0 : lsn_range: job.lsn_range.clone(),
437 0 : strategy: CompactionStrategy::CreateImage,
438 0 : input_layers: Vec::new(), // XXX: Is it OK for this to be empty for image layer?
439 0 : completed: false,
440 0 : });
441 0 : }
442 :
443 0 : for j in new_jobs.into_iter().rev() {
444 0 : let _job_id = self.push_job(j);
445 0 :
446 0 : // TODO: image layers don't let us delete anything. unless < PITR horizon
447 0 : //let j = &self.jobs[job_id.0];
448 0 : // for layer_id in j.input_layers.iter() {
449 0 : // self.layers[layer_id.0].pending_stakeholders.insert(job_id);
450 0 : //}
451 0 : }
452 :
453 0 : Ok(())
454 0 : }
455 :
456 : // Merge the contents of all the input delta layers into a new set
457 : // of delta layers, based on the current partitioning.
458 : //
459 : // We split the new delta layers on the key dimension. We iterate through
460 : // the key space, and for each key, check if including the next key to the
461 : // current output layer we're building would cause the layer to become too
462 : // large. If so, dump the current output layer and start new one. It's
463 : // possible that there is a single key with so many page versions that
464 : // storing all of them in a single layer file would be too large. In that
465 : // case, we also split on the LSN dimension.
466 : //
467 : // LSN
468 : // ^
469 : // |
470 : // | +-----------+ +--+--+--+--+
471 : // | | | | | | | |
472 : // | +-----------+ | | | | |
473 : // | | | | | | | |
474 : // | +-----------+ ==> | | | | |
475 : // | | | | | | | |
476 : // | +-----------+ | | | | |
477 : // | | | | | | | |
478 : // | +-----------+ +--+--+--+--+
479 : // |
480 : // +--------------> key
481 : //
482 : //
483 : // If one key (X) has a lot of page versions:
484 : //
485 : // LSN
486 : // ^
487 : // | (X)
488 : // | +-----------+ +--+--+--+--+
489 : // | | | | | | | |
490 : // | +-----------+ | | +--+ |
491 : // | | | | | | | |
492 : // | +-----------+ ==> | | | | |
493 : // | | | | | +--+ |
494 : // | +-----------+ | | | | |
495 : // | | | | | | | |
496 : // | +-----------+ +--+--+--+--+
497 : // |
498 : // +--------------> key
499 : //
500 : // TODO: this actually divides the layers into fixed-size chunks, not
501 : // based on the partitioning.
502 : //
503 : // TODO: we should also opportunistically materialize and
504 : // garbage collect what we can.
505 0 : async fn retile_deltas(
506 0 : &mut self,
507 0 : job_id: JobId,
508 0 : ctx: &E::RequestContext,
509 0 : ) -> anyhow::Result<()> {
510 0 : let job = &self.jobs[job_id.0];
511 0 : assert!(job.strategy == CompactionStrategy::Divide);
512 :
513 : // Sweep the key space left to right, running an estimate of how much
514 : // disk size and keyspace we have accumulated
515 : //
516 : // Once the disk size reaches the target threshold, stop and think.
517 : // If we have accumulated only a narrow band of keyspace, create an
518 : // image layer. Otherwise write a delta layer.
519 :
520 : // FIXME: deal with the case of lots of values for same key
521 :
522 : // FIXME: we are ignoring images here. Did we already divide the work
523 : // so that we won't encounter them here?
524 :
525 0 : let mut deltas: Vec<E::DeltaLayer> = Vec::new();
526 0 : for layer_id in &job.input_layers {
527 0 : let l = &self.layers[layer_id.0];
528 0 : if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
529 0 : deltas.push(dl.clone());
530 0 : }
531 : }
532 : // Open stream
533 0 : let key_value_stream = std::pin::pin!(merge_delta_keys::<E>(deltas.as_slice(), ctx));
534 0 : let mut new_jobs = Vec::new();
535 0 :
536 0 : // Slide a window through the keyspace
537 0 : let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
538 0 : let mut all_in_window: bool = false;
539 0 : let mut window = Window::new();
540 0 : loop {
541 0 : if all_in_window && window.elems.is_empty() {
542 : // All done!
543 0 : break;
544 0 : }
545 0 : if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
546 : {
547 0 : let batch_layers: Vec<LayerId> = job
548 0 : .input_layers
549 0 : .iter()
550 0 : .filter(|layer_id| {
551 0 : overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
552 0 : })
553 0 : .cloned()
554 0 : .collect();
555 0 : assert!(!batch_layers.is_empty());
556 0 : new_jobs.push(CompactionJob {
557 0 : key_range,
558 0 : lsn_range: job.lsn_range.clone(),
559 0 : strategy: CompactionStrategy::CreateDelta,
560 0 : input_layers: batch_layers,
561 0 : completed: false,
562 0 : });
563 : } else {
564 0 : assert!(!all_in_window);
565 0 : if let Some(next_key) = key_accum.next().await.transpose()? {
566 0 : window.feed(next_key.key, next_key.size);
567 0 : } else {
568 0 : all_in_window = true;
569 0 : }
570 : }
571 : }
572 :
573 : // All the input files are rewritten. Set up the tracking for when they can
574 : // be deleted.
575 0 : for layer_id in job.input_layers.iter() {
576 0 : let l = &mut self.layers[layer_id.0];
577 0 : assert!(l.deletable_after.is_none());
578 0 : l.deletable_after = Some(PendingJobSet::new());
579 : }
580 0 : for j in new_jobs.into_iter().rev() {
581 0 : let job_id = self.push_job(j);
582 0 : let j = &self.jobs[job_id.0];
583 0 : for layer_id in j.input_layers.iter() {
584 0 : self.layers[layer_id.0]
585 0 : .deletable_after
586 0 : .as_mut()
587 0 : .unwrap()
588 0 : .pending
589 0 : .insert(job_id);
590 0 : }
591 : }
592 :
593 0 : Ok(())
594 0 : }
595 : }
596 :
597 : // Sliding window through keyspace and values
598 : // This is used by over_with_images to decide on good split points
599 : struct KeyspaceWindow<K> {
600 : head: KeyspaceWindowHead<K>,
601 :
602 : start_pos: KeyspaceWindowPos<K>,
603 : }
604 : struct KeyspaceWindowHead<K> {
605 : // overall key range to cover
606 : key_range: Range<K>,
607 :
608 : keyspace: Vec<Range<K>>,
609 : target_keysize: u64,
610 : }
611 :
612 0 : #[derive(Clone)]
613 : struct KeyspaceWindowPos<K> {
614 : end_key: K,
615 :
616 : keyspace_idx: usize,
617 :
618 : accum_keysize: u64,
619 : }
620 : impl<K: CompactionKey> KeyspaceWindowPos<K> {
621 0 : fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
622 0 : self.keyspace_idx == w.keyspace.len()
623 0 : }
624 :
625 : // Advance the cursor until it reaches 'target_keysize'.
626 0 : fn advance_until_size(&mut self, w: &KeyspaceWindowHead<K>, max_size: u64) {
627 0 : while self.accum_keysize < max_size && !self.reached_end(w) {
628 0 : let curr_range = &w.keyspace[self.keyspace_idx];
629 0 : if self.end_key < curr_range.start {
630 0 : // skip over any unused space
631 0 : self.end_key = curr_range.start;
632 0 : }
633 :
634 : // We're now within 'curr_range'. Can we advance past it completely?
635 0 : let distance = K::key_range_size(&(self.end_key..curr_range.end));
636 0 : if (self.accum_keysize + distance as u64) < max_size {
637 0 : // oh yeah, it fits
638 0 : self.end_key = curr_range.end;
639 0 : self.keyspace_idx += 1;
640 0 : self.accum_keysize += distance as u64;
641 0 : } else {
642 : // advance within the range
643 0 : let skip_key = self.end_key.skip_some();
644 0 : let distance = K::key_range_size(&(self.end_key..skip_key));
645 0 : if (self.accum_keysize + distance as u64) < max_size {
646 0 : self.end_key = skip_key;
647 0 : self.accum_keysize += distance as u64;
648 0 : } else {
649 0 : self.end_key = self.end_key.next();
650 0 : self.accum_keysize += 1;
651 0 : }
652 : }
653 : }
654 0 : }
655 : }
656 :
657 : impl<K> KeyspaceWindow<K>
658 : where
659 : K: CompactionKey,
660 : {
661 0 : fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
662 0 : assert!(keyspace.first().unwrap().start >= key_range.start);
663 :
664 0 : let start_key = key_range.start;
665 0 : let start_pos = KeyspaceWindowPos::<K> {
666 0 : end_key: start_key,
667 0 : keyspace_idx: 0,
668 0 : accum_keysize: 0,
669 0 : };
670 0 : Self {
671 0 : head: KeyspaceWindowHead::<K> {
672 0 : key_range,
673 0 : keyspace,
674 0 : target_keysize,
675 0 : },
676 0 : start_pos,
677 0 : }
678 0 : }
679 :
680 0 : fn choose_next_image(&mut self) -> Option<Range<K>> {
681 0 : if self.start_pos.keyspace_idx == self.head.keyspace.len() {
682 : // we've reached the end
683 0 : return None;
684 0 : }
685 0 :
686 0 : let mut next_pos = self.start_pos.clone();
687 0 : next_pos.advance_until_size(
688 0 : &self.head,
689 0 : self.start_pos.accum_keysize + self.head.target_keysize,
690 0 : );
691 0 :
692 0 : // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
693 0 : // 1.25x target size
694 0 : let mut end_pos = next_pos.clone();
695 0 : end_pos.advance_until_size(
696 0 : &self.head,
697 0 : self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
698 0 : );
699 0 : if end_pos.reached_end(&self.head) {
700 : // gobble up any unused keyspace between the last used key and end of the range
701 0 : assert!(end_pos.end_key <= self.head.key_range.end);
702 0 : end_pos.end_key = self.head.key_range.end;
703 0 : next_pos = end_pos;
704 0 : }
705 :
706 0 : let start_key = self.start_pos.end_key;
707 0 : self.start_pos = next_pos;
708 0 : Some(start_key..self.start_pos.end_key)
709 0 : }
710 : }
711 :
712 : // Sliding window through keyspace and values
713 : //
714 : // This is used to decide what layer to write next, from the beginning of the window.
715 : //
716 : // Candidates:
717 : //
718 : // 1. Create an image layer, snapping to previous images
719 : // 2. Create a delta layer, snapping to previous images
720 : // 3. Create an image layer, snapping to
721 : //
722 : //
723 :
724 : // Take previous partitioning, based on the image layers below.
725 : //
726 : // Candidate is at the front:
727 : //
728 : // Consider stretching an image layer to next divider? If it's close enough,
729 : // that's the image candidate
730 : //
731 : // If it's too far, consider splitting at a reasonable point
732 : //
733 : // Is the image candidate smaller than the equivalent delta? If so,
734 : // split off the image. Otherwise, split off one delta.
735 : // Try to snap off the delta at a reasonable point
736 :
737 : struct WindowElement<K> {
738 : start_key: K, // inclusive
739 : last_key: K, // inclusive
740 : accum_size: u64,
741 : }
742 : struct Window<K> {
743 : elems: VecDeque<WindowElement<K>>,
744 :
745 : // last key that was split off, inclusive
746 : splitoff_key: Option<K>,
747 : splitoff_size: u64,
748 : }
749 :
750 : impl<K> Window<K>
751 : where
752 : K: CompactionKey,
753 : {
754 0 : fn new() -> Self {
755 0 : Self {
756 0 : elems: VecDeque::new(),
757 0 : splitoff_key: None,
758 0 : splitoff_size: 0,
759 0 : }
760 0 : }
761 :
762 0 : fn feed(&mut self, key: K, size: u64) {
763 : let last_size;
764 0 : if let Some(last) = self.elems.back_mut() {
765 0 : assert!(last.last_key <= key);
766 0 : if key == last.last_key {
767 0 : last.accum_size += size;
768 0 : return;
769 0 : }
770 0 : last_size = last.accum_size;
771 0 : } else {
772 0 : last_size = 0;
773 0 : }
774 : // This is a new key.
775 0 : let elem = WindowElement {
776 0 : start_key: key,
777 0 : last_key: key,
778 0 : accum_size: last_size + size,
779 0 : };
780 0 : self.elems.push_back(elem);
781 0 : }
782 :
783 0 : fn remain_size(&self) -> u64 {
784 0 : self.elems.back().unwrap().accum_size - self.splitoff_size
785 0 : }
786 :
787 0 : fn peek_size(&self) -> u64 {
788 0 : self.elems.front().unwrap().accum_size - self.splitoff_size
789 0 : }
790 :
791 0 : fn commit_upto(&mut self, mut upto: usize) {
792 0 : while upto > 1 {
793 0 : let popped = self.elems.pop_front().unwrap();
794 0 : self.elems.front_mut().unwrap().start_key = popped.start_key;
795 0 : upto -= 1;
796 0 : }
797 0 : }
798 :
799 0 : fn find_size_split(&self, target_size: u64) -> usize {
800 0 : self.elems
801 0 : .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
802 0 : }
803 :
804 0 : fn pop(&mut self) {
805 0 : let first = self.elems.pop_front().unwrap();
806 0 : self.splitoff_size = first.accum_size;
807 0 :
808 0 : self.splitoff_key = Some(first.last_key);
809 0 : }
810 :
811 : // the difference between delta and image is that an image covers
812 : // any unused keyspace before and after, while a delta tries to
813 : // minimize that. TODO: difference not implemented
814 0 : fn pop_delta(&mut self) -> Range<K> {
815 0 : let first = self.elems.front().unwrap();
816 0 : let key_range = first.start_key..first.last_key.next();
817 0 :
818 0 : self.pop();
819 0 : key_range
820 0 : }
821 :
822 : // Prerequisite: we have enough input in the window
823 : //
824 : // On return None, the caller should feed more data and call again
825 0 : fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
826 0 : if has_more && self.elems.is_empty() {
827 : // Starting up
828 0 : return None;
829 0 : }
830 :
831 : // If we still have an undersized candidate, just keep going
832 0 : while self.peek_size() < target_size {
833 0 : if self.elems.len() > 1 {
834 0 : self.commit_upto(2);
835 0 : } else if has_more {
836 0 : return None;
837 : } else {
838 0 : break;
839 : }
840 : }
841 :
842 : // Ensure we have enough input in the window to make a good decision
843 0 : if has_more && self.remain_size() < target_size * 5 / 4 {
844 0 : return None;
845 0 : }
846 0 :
847 0 : // The candidate on the front is now large enough, for a delta.
848 0 : // And we have enough data in the window to decide.
849 0 :
850 0 : // If we're willing to stretch it up to 1.25 target size, could we
851 0 : // gobble up the rest of the work? This avoids creating very small
852 0 : // "tail" layers at the end of the keyspace
853 0 : if !has_more && self.remain_size() < target_size * 5 / 3 {
854 0 : self.commit_upto(self.elems.len());
855 0 : } else {
856 0 : let delta_split_at = self.find_size_split(target_size);
857 0 : self.commit_upto(delta_split_at);
858 0 :
859 0 : // If it's still not large enough, request the caller to fill the window
860 0 : if self.elems.len() == 1 && has_more {
861 0 : return None;
862 0 : }
863 : }
864 0 : Some(self.pop_delta())
865 0 : }
866 : }
|