Line data Source code
1 : //! # Tiered compaction algorithm.
2 : //!
3 : //! Read all the input delta files, and write a new set of delta files that
4 : //! include all the input WAL records. See retile_deltas().
5 : //!
6 : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
7 : //! later values, but in our system, we keep all the history. So the reshuffling
8 : //! doesn't remove any garbage, it just reshuffles the records to reduce read
9 : //! amplification, i.e. the number of files that you need to access to find the
10 : //! WAL records for a given key.
11 : //!
12 : //! If the new delta files would be very "narrow", i.e. each file would cover
13 : //! only a narrow key range, then we create a new set of image files
14 : //! instead. The current threshold is that if the estimated total size of the
15 : //! image layers is smaller than the size of the deltas, then we create image
16 : //! layers. That amounts to 2x storage amplification, and it means that the
17 : //! distance of image layers in LSN dimension is roughly equal to the logical
18 : //! database size. For example, if the logical database size is 10 GB, we would
19 : //! generate new image layers every 10 GB of WAL.
20 : use futures::StreamExt;
21 : use pageserver_api::shard::ShardIdentity;
22 : use tracing::{debug, info};
23 :
24 : use std::collections::{HashSet, VecDeque};
25 : use std::ops::Range;
26 :
27 : use crate::helpers::{
28 : accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with,
29 : };
30 : use crate::interface::*;
31 : use utils::lsn::Lsn;
32 :
33 : use crate::identify_levels::identify_level;
34 :
35 : /// Main entry point to compaction.
36 : ///
37 : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
38 : /// everything below that point, that needs compaction. The cutoff LSN must
39 : /// partition the layers so that there are no layers that span across that
40 : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
41 : /// written last L0 layer.
42 2 : pub async fn compact_tiered<E: CompactionJobExecutor>(
43 2 : executor: &mut E,
44 2 : end_lsn: Lsn,
45 2 : target_file_size: u64,
46 2 : fanout: u64,
47 2 : ctx: &E::RequestContext,
48 2 : ) -> anyhow::Result<()> {
49 2 : assert!(fanout >= 1, "fanout needs to be at least 1 but is {fanout}");
50 2 : let exp_base = fanout.max(2);
51 2 : // Start at L0
52 2 : let mut current_level_no = 0;
53 2 : let mut current_level_target_height = target_file_size;
54 : loop {
55 : // end LSN +1 to include possible image layers exactly at 'end_lsn'.
56 4 : let all_layers = executor
57 4 : .get_layers(
58 4 : &(E::Key::MIN..E::Key::MAX),
59 4 : &(Lsn(u64::MIN)..end_lsn + 1),
60 4 : ctx,
61 4 : )
62 0 : .await?;
63 4 : info!(
64 0 : "Compacting L{}, total # of layers: {}",
65 0 : current_level_no,
66 0 : all_layers.len()
67 : );
68 :
69 : // Identify the range of LSNs that belong to this level. We assume that
70 : // each file in this level spans an LSN range up to 1.75x target file
71 : // size. That should give us enough slop that if we created a slightly
72 : // oversized L0 layer, e.g. because flushing the in-memory layer was
73 : // delayed for some reason, we don't consider the oversized layer to
74 : // belong to L1. But not too much slop, that we don't accidentally
75 : // "skip" levels.
76 4 : let max_height = (current_level_target_height as f64 * 1.75) as u64;
77 4 : let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
78 2 : break;
79 : };
80 :
81 : // Calculate the height of this level. If the # of tiers exceeds the
82 : // fanout parameter, it's time to compact it.
83 2 : let depth = level.depth();
84 2 : info!(
85 0 : "Level {} identified as LSN range {}-{}: depth {}",
86 : current_level_no, level.lsn_range.start, level.lsn_range.end, depth
87 : );
88 80 : for l in &level.layers {
89 78 : debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
90 : }
91 2 : if depth < fanout {
92 0 : debug!(
93 : level = current_level_no,
94 : depth = depth,
95 : fanout,
96 0 : "too few deltas to compact"
97 : );
98 0 : break;
99 2 : }
100 2 :
101 2 : compact_level(
102 2 : &level.lsn_range,
103 2 : &level.layers,
104 2 : executor,
105 2 : target_file_size,
106 2 : ctx,
107 2 : )
108 0 : .await?;
109 2 : if target_file_size == u64::MAX {
110 0 : break;
111 2 : }
112 2 : current_level_no += 1;
113 2 : current_level_target_height = current_level_target_height.saturating_mul(exp_base);
114 : }
115 2 : Ok(())
116 2 : }
117 :
118 2 : async fn compact_level<E: CompactionJobExecutor>(
119 2 : lsn_range: &Range<Lsn>,
120 2 : layers: &[E::Layer],
121 2 : executor: &mut E,
122 2 : target_file_size: u64,
123 2 : ctx: &E::RequestContext,
124 2 : ) -> anyhow::Result<bool> {
125 2 : let mut layer_fragments = Vec::new();
126 80 : for l in layers {
127 78 : layer_fragments.push(LayerFragment::new(l.clone()));
128 78 : }
129 :
130 2 : let mut state = LevelCompactionState {
131 2 : shard_identity: *executor.get_shard_identity(),
132 2 : target_file_size,
133 2 : _lsn_range: lsn_range.clone(),
134 2 : layers: layer_fragments,
135 2 : jobs: Vec::new(),
136 2 : job_queue: Vec::new(),
137 2 : next_level: false,
138 2 : executor,
139 2 : };
140 2 :
141 2 : let first_job = CompactionJob {
142 2 : key_range: E::Key::MIN..E::Key::MAX,
143 2 : lsn_range: lsn_range.clone(),
144 2 : strategy: CompactionStrategy::Divide,
145 2 : input_layers: state
146 2 : .layers
147 2 : .iter()
148 2 : .enumerate()
149 78 : .map(|i| LayerId(i.0))
150 2 : .collect(),
151 2 : completed: false,
152 2 : };
153 2 :
154 2 : state.jobs.push(first_job);
155 2 : state.job_queue.push(JobId(0));
156 2 : state.execute(ctx).await?;
157 :
158 2 : info!(
159 0 : "compaction completed! Need to process next level: {}",
160 : state.next_level
161 : );
162 :
163 2 : Ok(state.next_level)
164 2 : }
165 :
166 : /// Blackboard that keeps track of the state of all the jobs and work remaining
167 : struct LevelCompactionState<'a, E>
168 : where
169 : E: CompactionJobExecutor,
170 : {
171 : shard_identity: ShardIdentity,
172 :
173 : // parameters
174 : target_file_size: u64,
175 :
176 : _lsn_range: Range<Lsn>,
177 : layers: Vec<LayerFragment<E>>,
178 :
179 : // job queue
180 : jobs: Vec<CompactionJob<E>>,
181 : job_queue: Vec<JobId>,
182 :
183 : /// If false, no need to compact levels below this
184 : next_level: bool,
185 :
186 : /// Interface to the outside world
187 : executor: &'a mut E,
188 : }
189 :
190 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
191 : struct LayerId(usize);
192 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
193 : struct JobId(usize);
194 :
195 : struct PendingJobSet {
196 : pending: HashSet<JobId>,
197 : completed: HashSet<JobId>,
198 : }
199 :
200 : impl PendingJobSet {
201 78 : fn new() -> Self {
202 78 : PendingJobSet {
203 78 : pending: HashSet::new(),
204 78 : completed: HashSet::new(),
205 78 : }
206 78 : }
207 :
208 3042 : fn complete_job(&mut self, job_id: JobId) {
209 3042 : self.pending.remove(&job_id);
210 3042 : self.completed.insert(job_id);
211 3042 : }
212 :
213 3042 : fn all_completed(&self) -> bool {
214 3042 : self.pending.is_empty()
215 3042 : }
216 : }
217 :
218 : // When we decide to rewrite a set of layers, LayerFragment is used to keep
219 : // track which new layers supersede an old layer. When all the stakeholder jobs
220 : // have completed, this layer can be deleted.
221 : struct LayerFragment<E>
222 : where
223 : E: CompactionJobExecutor,
224 : {
225 : layer: E::Layer,
226 :
227 : // If we will write new layers to replace this one, this keeps track of the
228 : // jobs that need to complete before this layer can be deleted. As the jobs
229 : // complete, they are moved from 'pending' to 'completed' set. Once the
230 : // 'pending' set becomes empty, the layer can be deleted.
231 : //
232 : // If None, this layer is not rewritten and must not be deleted.
233 : deletable_after: Option<PendingJobSet>,
234 :
235 : deleted: bool,
236 : }
237 :
238 : impl<E> LayerFragment<E>
239 : where
240 : E: CompactionJobExecutor,
241 : {
242 78 : fn new(layer: E::Layer) -> Self {
243 78 : LayerFragment {
244 78 : layer,
245 78 : deletable_after: None,
246 78 : deleted: false,
247 78 : }
248 78 : }
249 : }
250 :
251 : #[derive(PartialEq)]
252 : enum CompactionStrategy {
253 : Divide,
254 : CreateDelta,
255 : CreateImage,
256 : }
257 :
258 : struct CompactionJob<E: CompactionJobExecutor> {
259 : key_range: Range<E::Key>,
260 : lsn_range: Range<Lsn>,
261 :
262 : strategy: CompactionStrategy,
263 :
264 : input_layers: Vec<LayerId>,
265 :
266 : completed: bool,
267 : }
268 :
269 : impl<'a, E> LevelCompactionState<'a, E>
270 : where
271 : E: CompactionJobExecutor,
272 : {
273 : /// Main loop of the executor.
274 : ///
275 : /// In each iteration, we take the next job from the queue, and execute it.
276 : /// The execution might add new jobs to the queue. Keep going until the
277 : /// queue is empty.
278 : ///
279 : /// Initially, the job queue consists of one Divide job over the whole
280 : /// level. On first call, it is divided into smaller jobs.
281 2 : async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
282 : // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
283 82 : while let Some(next_job_id) = self.job_queue.pop() {
284 80 : info!("executing job {}", next_job_id.0);
285 80 : self.execute_job(next_job_id, ctx).await?;
286 : }
287 :
288 : // all done!
289 2 : Ok(())
290 2 : }
291 :
292 80 : async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
293 80 : let job = &self.jobs[job_id.0];
294 80 : match job.strategy {
295 : CompactionStrategy::Divide => {
296 2 : self.divide_job(job_id, ctx).await?;
297 2 : Ok(())
298 : }
299 : CompactionStrategy::CreateDelta => {
300 78 : let mut deltas: Vec<E::DeltaLayer> = Vec::new();
301 78 : let mut layer_ids: Vec<LayerId> = Vec::new();
302 3120 : for layer_id in &job.input_layers {
303 3042 : let layer = &self.layers[layer_id.0].layer;
304 3042 : if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
305 3042 : deltas.push(dl.clone());
306 3042 : layer_ids.push(*layer_id);
307 3042 : }
308 : }
309 :
310 78 : self.executor
311 78 : .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
312 0 : .await?;
313 78 : self.jobs[job_id.0].completed = true;
314 :
315 : // did we complete any fragments?
316 3120 : for layer_id in layer_ids {
317 3042 : let l = &mut self.layers[layer_id.0];
318 3042 : if let Some(deletable_after) = l.deletable_after.as_mut() {
319 3042 : deletable_after.complete_job(job_id);
320 3042 : if deletable_after.all_completed() {
321 78 : self.executor.delete_layer(&l.layer, ctx).await?;
322 78 : l.deleted = true;
323 2964 : }
324 0 : }
325 : }
326 :
327 78 : self.next_level = true;
328 78 :
329 78 : Ok(())
330 : }
331 : CompactionStrategy::CreateImage => {
332 0 : self.executor
333 0 : .create_image(job.lsn_range.end, &job.key_range, ctx)
334 0 : .await?;
335 0 : self.jobs[job_id.0].completed = true;
336 0 :
337 0 : // TODO: we could check if any layers < PITR horizon became deletable
338 0 : Ok(())
339 : }
340 : }
341 80 : }
342 :
343 78 : fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
344 78 : let job_id = JobId(self.jobs.len());
345 78 : self.jobs.push(job);
346 78 : self.job_queue.push(job_id);
347 78 : job_id
348 78 : }
349 :
350 : /// Take a partition of the key space, and decide how to compact it.
351 : ///
352 : /// TODO: Currently, this is called exactly once for the level, and we
353 : /// decide whether to create new image layers to cover the whole level, or
354 : /// write a new set of deltas. In the future, this should try to partition
355 : /// the key space, and make the decision separately for each partition.
356 2 : async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
357 2 : let job = &self.jobs[job_id.0];
358 2 : assert!(job.strategy == CompactionStrategy::Divide);
359 :
360 : // Check for dummy cases
361 2 : if job.input_layers.is_empty() {
362 0 : return Ok(());
363 2 : }
364 2 :
365 2 : let job = &self.jobs[job_id.0];
366 2 : assert!(job.strategy == CompactionStrategy::Divide);
367 :
368 : // Would it be better to create images for this partition?
369 : // Decide based on the average density of the level
370 2 : let keyspace_size = keyspace_total_size(
371 2 : &self
372 2 : .executor
373 2 : .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
374 0 : .await?,
375 2 : &self.shard_identity,
376 2 : ) * 8192;
377 2 :
378 2 : let wal_size = job
379 2 : .input_layers
380 2 : .iter()
381 78 : .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
382 78 : .map(|layer_id| self.layers[layer_id.0].layer.file_size())
383 2 : .sum::<u64>();
384 2 : if keyspace_size < wal_size {
385 : // seems worth it
386 0 : info!(
387 0 : "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
388 : keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
389 : );
390 0 : self.cover_with_images(job_id, ctx).await
391 : } else {
392 : // do deltas
393 2 : info!(
394 0 : "coverage not worth it, keyspace_size {}, wal_size {}",
395 : keyspace_size, wal_size
396 : );
397 2 : self.retile_deltas(job_id, ctx).await
398 : }
399 2 : }
400 :
401 : // LSN
402 : // ^
403 : // |
404 : // | ###|###|#####
405 : // | +--+-----+--+ +--+-----+--+
406 : // | | | | | | | | |
407 : // | +--+--+--+--+ +--+--+--+--+
408 : // | | | | | | |
409 : // | +---+-+-+---+ ==> +---+-+-+---+
410 : // | | | | | | | | |
411 : // | +---+-+-++--+ +---+-+-++--+
412 : // | | | | | | | | |
413 : // | +-----+--+--+ +-----+--+--+
414 : // |
415 : // +--------------> key
416 : //
417 0 : async fn cover_with_images(
418 0 : &mut self,
419 0 : job_id: JobId,
420 0 : ctx: &E::RequestContext,
421 0 : ) -> anyhow::Result<()> {
422 0 : let job = &self.jobs[job_id.0];
423 0 : assert!(job.strategy == CompactionStrategy::Divide);
424 :
425 : // XXX: do we still need the "holes" stuff?
426 :
427 0 : let mut new_jobs = Vec::new();
428 :
429 : // Slide a window through the keyspace
430 0 : let keyspace = self
431 0 : .executor
432 0 : .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
433 0 : .await?;
434 :
435 0 : let mut window = KeyspaceWindow::new(
436 0 : E::Key::MIN..E::Key::MAX,
437 0 : keyspace,
438 0 : self.target_file_size / 8192,
439 0 : );
440 0 : while let Some(key_range) = window.choose_next_image(&self.shard_identity) {
441 0 : new_jobs.push(CompactionJob::<E> {
442 0 : key_range,
443 0 : lsn_range: job.lsn_range.clone(),
444 0 : strategy: CompactionStrategy::CreateImage,
445 0 : input_layers: Vec::new(), // XXX: Is it OK for this to be empty for image layer?
446 0 : completed: false,
447 0 : });
448 0 : }
449 :
450 0 : for j in new_jobs.into_iter().rev() {
451 0 : let _job_id = self.push_job(j);
452 0 :
453 0 : // TODO: image layers don't let us delete anything. unless < PITR horizon
454 0 : //let j = &self.jobs[job_id.0];
455 0 : // for layer_id in j.input_layers.iter() {
456 0 : // self.layers[layer_id.0].pending_stakeholders.insert(job_id);
457 0 : //}
458 0 : }
459 :
460 0 : Ok(())
461 0 : }
462 :
463 : // Merge the contents of all the input delta layers into a new set
464 : // of delta layers, based on the current partitioning.
465 : //
466 : // We split the new delta layers on the key dimension. We iterate through
467 : // the key space, and for each key, check if including the next key to the
468 : // current output layer we're building would cause the layer to become too
469 : // large. If so, dump the current output layer and start new one. It's
470 : // possible that there is a single key with so many page versions that
471 : // storing all of them in a single layer file would be too large. In that
472 : // case, we also split on the LSN dimension.
473 : //
474 : // LSN
475 : // ^
476 : // |
477 : // | +-----------+ +--+--+--+--+
478 : // | | | | | | | |
479 : // | +-----------+ | | | | |
480 : // | | | | | | | |
481 : // | +-----------+ ==> | | | | |
482 : // | | | | | | | |
483 : // | +-----------+ | | | | |
484 : // | | | | | | | |
485 : // | +-----------+ +--+--+--+--+
486 : // |
487 : // +--------------> key
488 : //
489 : //
490 : // If one key (X) has a lot of page versions:
491 : //
492 : // LSN
493 : // ^
494 : // | (X)
495 : // | +-----------+ +--+--+--+--+
496 : // | | | | | | | |
497 : // | +-----------+ | | +--+ |
498 : // | | | | | | | |
499 : // | +-----------+ ==> | | | | |
500 : // | | | | | +--+ |
501 : // | +-----------+ | | | | |
502 : // | | | | | | | |
503 : // | +-----------+ +--+--+--+--+
504 : // |
505 : // +--------------> key
506 : //
507 : // TODO: this actually divides the layers into fixed-size chunks, not
508 : // based on the partitioning.
509 : //
510 : // TODO: we should also opportunistically materialize and
511 : // garbage collect what we can.
512 2 : async fn retile_deltas(
513 2 : &mut self,
514 2 : job_id: JobId,
515 2 : ctx: &E::RequestContext,
516 2 : ) -> anyhow::Result<()> {
517 2 : let job = &self.jobs[job_id.0];
518 2 : assert!(job.strategy == CompactionStrategy::Divide);
519 :
520 : // Sweep the key space left to right, running an estimate of how much
521 : // disk size and keyspace we have accumulated
522 : //
523 : // Once the disk size reaches the target threshold, stop and think.
524 : // If we have accumulated only a narrow band of keyspace, create an
525 : // image layer. Otherwise write a delta layer.
526 :
527 : // FIXME: deal with the case of lots of values for same key
528 :
529 : // FIXME: we are ignoring images here. Did we already divide the work
530 : // so that we won't encounter them here?
531 :
532 2 : let mut deltas: Vec<E::DeltaLayer> = Vec::new();
533 80 : for layer_id in &job.input_layers {
534 78 : let l = &self.layers[layer_id.0];
535 78 : if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
536 78 : deltas.push(dl.clone());
537 78 : }
538 : }
539 : // Open stream
540 2 : let key_value_stream =
541 2 : std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
542 0 : .await?
543 2 : .map(Result::<_, anyhow::Error>::Ok));
544 2 : let mut new_jobs = Vec::new();
545 2 :
546 2 : // Slide a window through the keyspace
547 2 : let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
548 2 : let mut all_in_window: bool = false;
549 2 : let mut window = Window::new();
550 64628 : loop {
551 64628 : if all_in_window && window.elems.is_empty() {
552 : // All done!
553 2 : break;
554 64626 : }
555 64626 : if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
556 : {
557 78 : let batch_layers: Vec<LayerId> = job
558 78 : .input_layers
559 78 : .iter()
560 3042 : .filter(|layer_id| {
561 3042 : overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
562 3042 : })
563 78 : .cloned()
564 78 : .collect();
565 78 : assert!(!batch_layers.is_empty());
566 78 : new_jobs.push(CompactionJob {
567 78 : key_range,
568 78 : lsn_range: job.lsn_range.clone(),
569 78 : strategy: CompactionStrategy::CreateDelta,
570 78 : input_layers: batch_layers,
571 78 : completed: false,
572 78 : });
573 : } else {
574 64548 : assert!(!all_in_window);
575 64548 : if let Some(next_key) = key_accum.next().await.transpose()? {
576 64546 : window.feed(next_key.key, next_key.size);
577 64546 : } else {
578 2 : all_in_window = true;
579 2 : }
580 : }
581 : }
582 :
583 : // All the input files are rewritten. Set up the tracking for when they can
584 : // be deleted.
585 78 : for layer_id in job.input_layers.iter() {
586 78 : let l = &mut self.layers[layer_id.0];
587 78 : assert!(l.deletable_after.is_none());
588 78 : l.deletable_after = Some(PendingJobSet::new());
589 : }
590 78 : for j in new_jobs.into_iter().rev() {
591 78 : let job_id = self.push_job(j);
592 78 : let j = &self.jobs[job_id.0];
593 3042 : for layer_id in j.input_layers.iter() {
594 3042 : self.layers[layer_id.0]
595 3042 : .deletable_after
596 3042 : .as_mut()
597 3042 : .unwrap()
598 3042 : .pending
599 3042 : .insert(job_id);
600 3042 : }
601 : }
602 :
603 2 : Ok(())
604 2 : }
605 : }
606 :
607 : // Sliding window through keyspace and values
608 : // This is used by over_with_images to decide on good split points
609 : struct KeyspaceWindow<K> {
610 : head: KeyspaceWindowHead<K>,
611 :
612 : start_pos: KeyspaceWindowPos<K>,
613 : }
614 : struct KeyspaceWindowHead<K> {
615 : // overall key range to cover
616 : key_range: Range<K>,
617 :
618 : keyspace: Vec<Range<K>>,
619 : target_keysize: u64,
620 : }
621 :
622 : #[derive(Clone)]
623 : struct KeyspaceWindowPos<K> {
624 : end_key: K,
625 :
626 : keyspace_idx: usize,
627 :
628 : accum_keysize: u64,
629 : }
630 : impl<K: CompactionKey> KeyspaceWindowPos<K> {
631 0 : fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
632 0 : self.keyspace_idx == w.keyspace.len()
633 0 : }
634 :
635 : // Advance the cursor until it reaches 'target_keysize'.
636 0 : fn advance_until_size(
637 0 : &mut self,
638 0 : w: &KeyspaceWindowHead<K>,
639 0 : max_size: u64,
640 0 : shard_identity: &ShardIdentity,
641 0 : ) {
642 0 : while self.accum_keysize < max_size && !self.reached_end(w) {
643 0 : let curr_range = &w.keyspace[self.keyspace_idx];
644 0 : if self.end_key < curr_range.start {
645 0 : // skip over any unused space
646 0 : self.end_key = curr_range.start;
647 0 : }
648 :
649 : // We're now within 'curr_range'. Can we advance past it completely?
650 0 : let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity);
651 0 : if (self.accum_keysize + distance as u64) < max_size {
652 0 : // oh yeah, it fits
653 0 : self.end_key = curr_range.end;
654 0 : self.keyspace_idx += 1;
655 0 : self.accum_keysize += distance as u64;
656 0 : } else {
657 : // advance within the range
658 0 : let skip_key = self.end_key.skip_some();
659 0 : let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity);
660 0 : if (self.accum_keysize + distance as u64) < max_size {
661 0 : self.end_key = skip_key;
662 0 : self.accum_keysize += distance as u64;
663 0 : } else {
664 0 : self.end_key = self.end_key.next();
665 0 : self.accum_keysize += 1;
666 0 : }
667 : }
668 : }
669 0 : }
670 : }
671 :
672 : impl<K> KeyspaceWindow<K>
673 : where
674 : K: CompactionKey,
675 : {
676 0 : fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
677 0 : assert!(keyspace.first().unwrap().start >= key_range.start);
678 :
679 0 : let start_key = key_range.start;
680 0 : let start_pos = KeyspaceWindowPos::<K> {
681 0 : end_key: start_key,
682 0 : keyspace_idx: 0,
683 0 : accum_keysize: 0,
684 0 : };
685 0 : Self {
686 0 : head: KeyspaceWindowHead::<K> {
687 0 : key_range,
688 0 : keyspace,
689 0 : target_keysize,
690 0 : },
691 0 : start_pos,
692 0 : }
693 0 : }
694 :
695 0 : fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option<Range<K>> {
696 0 : if self.start_pos.keyspace_idx == self.head.keyspace.len() {
697 : // we've reached the end
698 0 : return None;
699 0 : }
700 0 :
701 0 : let mut next_pos = self.start_pos.clone();
702 0 : next_pos.advance_until_size(
703 0 : &self.head,
704 0 : self.start_pos.accum_keysize + self.head.target_keysize,
705 0 : shard_identity,
706 0 : );
707 0 :
708 0 : // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
709 0 : // 1.25x target size
710 0 : let mut end_pos = next_pos.clone();
711 0 : end_pos.advance_until_size(
712 0 : &self.head,
713 0 : self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
714 0 : shard_identity,
715 0 : );
716 0 : if end_pos.reached_end(&self.head) {
717 : // gobble up any unused keyspace between the last used key and end of the range
718 0 : assert!(end_pos.end_key <= self.head.key_range.end);
719 0 : end_pos.end_key = self.head.key_range.end;
720 0 : next_pos = end_pos;
721 0 : }
722 :
723 0 : let start_key = self.start_pos.end_key;
724 0 : self.start_pos = next_pos;
725 0 : Some(start_key..self.start_pos.end_key)
726 0 : }
727 : }
728 :
729 : // Take previous partitioning, based on the image layers below.
730 : //
731 : // Candidate is at the front:
732 : //
733 : // Consider stretching an image layer to next divider? If it's close enough,
734 : // that's the image candidate
735 : //
736 : // If it's too far, consider splitting at a reasonable point
737 : //
738 : // Is the image candidate smaller than the equivalent delta? If so,
739 : // split off the image. Otherwise, split off one delta.
740 : // Try to snap off the delta at a reasonable point
741 :
742 : struct WindowElement<K> {
743 : start_key: K, // inclusive
744 : last_key: K, // inclusive
745 : accum_size: u64,
746 : }
747 :
748 : // Sliding window through keyspace and values
749 : //
750 : // This is used to decide what layer to write next, from the beginning of the window.
751 : struct Window<K> {
752 : elems: VecDeque<WindowElement<K>>,
753 :
754 : // last key that was split off, inclusive
755 : splitoff_key: Option<K>,
756 : splitoff_size: u64,
757 : }
758 :
759 : impl<K> Window<K>
760 : where
761 : K: CompactionKey,
762 : {
763 2 : fn new() -> Self {
764 2 : Self {
765 2 : elems: VecDeque::new(),
766 2 : splitoff_key: None,
767 2 : splitoff_size: 0,
768 2 : }
769 2 : }
770 :
771 64546 : fn feed(&mut self, key: K, size: u64) {
772 : let last_size;
773 64546 : if let Some(last) = self.elems.back_mut() {
774 64544 : assert!(last.last_key <= key);
775 64544 : if key == last.last_key {
776 0 : last.accum_size += size;
777 0 : return;
778 64544 : }
779 64544 : last_size = last.accum_size;
780 2 : } else {
781 2 : last_size = 0;
782 2 : }
783 : // This is a new key.
784 64546 : let elem = WindowElement {
785 64546 : start_key: key,
786 64546 : last_key: key,
787 64546 : accum_size: last_size + size,
788 64546 : };
789 64546 : self.elems.push_back(elem);
790 64546 : }
791 :
792 15853 : fn remain_size(&self) -> u64 {
793 15853 : self.elems.back().unwrap().accum_size - self.splitoff_size
794 15853 : }
795 :
796 129040 : fn peek_size(&self) -> u64 {
797 129040 : self.elems.front().unwrap().accum_size - self.splitoff_size
798 129040 : }
799 :
800 64494 : fn commit_upto(&mut self, mut upto: usize) {
801 128962 : while upto > 1 {
802 64468 : let popped = self.elems.pop_front().unwrap();
803 64468 : self.elems.front_mut().unwrap().start_key = popped.start_key;
804 64468 : upto -= 1;
805 64468 : }
806 64494 : }
807 :
808 76 : fn find_size_split(&self, target_size: u64) -> usize {
809 76 : self.elems
810 593 : .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
811 76 : }
812 :
813 78 : fn pop(&mut self) {
814 78 : let first = self.elems.pop_front().unwrap();
815 78 : self.splitoff_size = first.accum_size;
816 78 :
817 78 : self.splitoff_key = Some(first.last_key);
818 78 : }
819 :
820 : // the difference between delta and image is that an image covers
821 : // any unused keyspace before and after, while a delta tries to
822 : // minimize that. TODO: difference not implemented
823 78 : fn pop_delta(&mut self) -> Range<K> {
824 78 : let first = self.elems.front().unwrap();
825 78 : let key_range = first.start_key..first.last_key.next();
826 78 :
827 78 : self.pop();
828 78 : key_range
829 78 : }
830 :
831 : // Prerequisite: we have enough input in the window
832 : //
833 : // On return None, the caller should feed more data and call again
834 64626 : fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
835 64626 : if has_more && self.elems.is_empty() {
836 : // Starting up
837 2 : return None;
838 64624 : }
839 :
840 : // If we still have an undersized candidate, just keep going
841 129040 : while self.peek_size() < target_size {
842 113187 : if self.elems.len() > 1 {
843 64416 : self.commit_upto(2);
844 64416 : } else if has_more {
845 48771 : return None;
846 : } else {
847 0 : break;
848 : }
849 : }
850 :
851 : // Ensure we have enough input in the window to make a good decision
852 15853 : if has_more && self.remain_size() < target_size * 5 / 4 {
853 15775 : return None;
854 78 : }
855 78 :
856 78 : // The candidate on the front is now large enough, for a delta.
857 78 : // And we have enough data in the window to decide.
858 78 :
859 78 : // If we're willing to stretch it up to 1.25 target size, could we
860 78 : // gobble up the rest of the work? This avoids creating very small
861 78 : // "tail" layers at the end of the keyspace
862 78 : if !has_more && self.remain_size() < target_size * 5 / 3 {
863 2 : self.commit_upto(self.elems.len());
864 2 : } else {
865 76 : let delta_split_at = self.find_size_split(target_size);
866 76 : self.commit_upto(delta_split_at);
867 76 :
868 76 : // If it's still not large enough, request the caller to fill the window
869 76 : if self.elems.len() == 1 && has_more {
870 0 : return None;
871 76 : }
872 : }
873 78 : Some(self.pop_delta())
874 64626 : }
875 : }
|