Line data Source code
1 : //! # Tiered compaction algorithm.
2 : //!
3 : //! Read all the input delta files, and write a new set of delta files that
4 : //! include all the input WAL records. See retile_deltas().
5 : //!
6 : //! In a "normal" LSM tree, you get to remove any values that are overwritten by
7 : //! later values, but in our system, we keep all the history. So the reshuffling
8 : //! doesn't remove any garbage, it just reshuffles the records to reduce read
9 : //! amplification, i.e. the number of files that you need to access to find the
10 : //! WAL records for a given key.
11 : //!
12 : //! If the new delta files would be very "narrow", i.e. each file would cover
13 : //! only a narrow key range, then we create a new set of image files
14 : //! instead. The current threshold is that if the estimated total size of the
15 : //! image layers is smaller than the size of the deltas, then we create image
16 : //! layers. That amounts to 2x storage amplification, and it means that the
17 : //! distance of image layers in LSN dimension is roughly equal to the logical
18 : //! database size. For example, if the logical database size is 10 GB, we would
19 : //! generate new image layers every 10 GB of WAL.
20 : use futures::StreamExt;
21 : use pageserver_api::shard::ShardIdentity;
22 : use tracing::{debug, info};
23 :
24 : use std::collections::{HashSet, VecDeque};
25 : use std::ops::Range;
26 :
27 : use crate::helpers::{
28 : accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with, PAGE_SZ,
29 : };
30 : use crate::interface::*;
31 : use utils::lsn::Lsn;
32 :
33 : use crate::identify_levels::identify_level;
34 :
35 : /// Main entry point to compaction.
36 : ///
37 : /// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
38 : /// everything below that point, that needs compaction. The cutoff LSN must
39 : /// partition the layers so that there are no layers that span across that
40 : /// LSN. To start compaction at the top of the tree, pass the end LSN of the
41 : /// written last L0 layer.
42 1000 : pub async fn compact_tiered<E: CompactionJobExecutor>(
43 1000 : executor: &mut E,
44 1000 : end_lsn: Lsn,
45 1000 : target_file_size: u64,
46 1000 : fanout: u64,
47 1000 : ctx: &E::RequestContext,
48 1000 : ) -> anyhow::Result<()> {
49 1000 : assert!(fanout >= 1, "fanout needs to be at least 1 but is {fanout}");
50 1000 : let exp_base = fanout.max(2);
51 1000 : // Start at L0
52 1000 : let mut current_level_no = 0;
53 1000 : let mut current_level_target_height = target_file_size;
54 : loop {
55 : // end LSN +1 to include possible image layers exactly at 'end_lsn'.
56 1003 : let all_layers = executor
57 1003 : .get_layers(
58 1003 : &(E::Key::MIN..E::Key::MAX),
59 1003 : &(Lsn(u64::MIN)..end_lsn + 1),
60 1003 : ctx,
61 1003 : )
62 0 : .await?;
63 1003 : info!(
64 0 : "Compacting L{}, total # of layers: {}",
65 0 : current_level_no,
66 0 : all_layers.len()
67 : );
68 :
69 : // Identify the range of LSNs that belong to this level. We assume that
70 : // each file in this level spans an LSN range up to 1.75x target file
71 : // size. That should give us enough slop that if we created a slightly
72 : // oversized L0 layer, e.g. because flushing the in-memory layer was
73 : // delayed for some reason, we don't consider the oversized layer to
74 : // belong to L1. But not too much slop, that we don't accidentally
75 : // "skip" levels.
76 1003 : let max_height = (current_level_target_height as f64 * 1.75) as u64;
77 1003 : let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
78 271 : break;
79 : };
80 :
81 : // Calculate the height of this level. If the # of tiers exceeds the
82 : // fanout parameter, it's time to compact it.
83 732 : let depth = level.depth();
84 732 : info!(
85 0 : "Level {} identified as LSN range {}-{}: depth {}",
86 : current_level_no, level.lsn_range.start, level.lsn_range.end, depth
87 : );
88 2157 : for l in &level.layers {
89 1425 : debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
90 : }
91 732 : if depth < fanout {
92 729 : debug!(
93 : level = current_level_no,
94 : depth = depth,
95 : fanout,
96 0 : "too few deltas to compact"
97 : );
98 729 : break;
99 3 : }
100 3 :
101 3 : compact_level(
102 3 : &level.lsn_range,
103 3 : &level.layers,
104 3 : executor,
105 3 : target_file_size,
106 3 : ctx,
107 3 : )
108 0 : .await?;
109 3 : if current_level_target_height == u64::MAX {
110 : // our target height includes all possible lsns
111 0 : info!(
112 : level = current_level_no,
113 : depth = depth,
114 0 : "compaction loop reached max current_level_target_height"
115 : );
116 0 : break;
117 3 : }
118 3 : current_level_no += 1;
119 3 : current_level_target_height = current_level_target_height.saturating_mul(exp_base);
120 : }
121 1000 : Ok(())
122 1000 : }
123 :
124 3 : async fn compact_level<E: CompactionJobExecutor>(
125 3 : lsn_range: &Range<Lsn>,
126 3 : layers: &[E::Layer],
127 3 : executor: &mut E,
128 3 : target_file_size: u64,
129 3 : ctx: &E::RequestContext,
130 3 : ) -> anyhow::Result<bool> {
131 3 : let mut layer_fragments = Vec::new();
132 50 : for l in layers {
133 47 : layer_fragments.push(LayerFragment::new(l.clone()));
134 47 : }
135 :
136 3 : let mut state = LevelCompactionState {
137 3 : shard_identity: *executor.get_shard_identity(),
138 3 : target_file_size,
139 3 : _lsn_range: lsn_range.clone(),
140 3 : layers: layer_fragments,
141 3 : jobs: Vec::new(),
142 3 : job_queue: Vec::new(),
143 3 : next_level: false,
144 3 : executor,
145 3 : };
146 3 :
147 3 : let first_job = CompactionJob {
148 3 : key_range: E::Key::MIN..E::Key::MAX,
149 3 : lsn_range: lsn_range.clone(),
150 3 : strategy: CompactionStrategy::Divide,
151 3 : input_layers: state
152 3 : .layers
153 3 : .iter()
154 3 : .enumerate()
155 47 : .map(|i| LayerId(i.0))
156 3 : .collect(),
157 3 : completed: false,
158 3 : };
159 3 :
160 3 : state.jobs.push(first_job);
161 3 : state.job_queue.push(JobId(0));
162 3 : state.execute(ctx).await?;
163 :
164 3 : info!(
165 0 : "compaction completed! Need to process next level: {}",
166 : state.next_level
167 : );
168 :
169 3 : Ok(state.next_level)
170 3 : }
171 :
172 : /// Blackboard that keeps track of the state of all the jobs and work remaining
173 : struct LevelCompactionState<'a, E>
174 : where
175 : E: CompactionJobExecutor,
176 : {
177 : shard_identity: ShardIdentity,
178 :
179 : // parameters
180 : target_file_size: u64,
181 :
182 : _lsn_range: Range<Lsn>,
183 : layers: Vec<LayerFragment<E>>,
184 :
185 : // job queue
186 : jobs: Vec<CompactionJob<E>>,
187 : job_queue: Vec<JobId>,
188 :
189 : /// If false, no need to compact levels below this
190 : next_level: bool,
191 :
192 : /// Interface to the outside world
193 : executor: &'a mut E,
194 : }
195 :
196 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
197 : struct LayerId(usize);
198 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
199 : struct JobId(usize);
200 :
201 : struct PendingJobSet {
202 : pending: HashSet<JobId>,
203 : completed: HashSet<JobId>,
204 : }
205 :
206 : impl PendingJobSet {
207 47 : fn new() -> Self {
208 47 : PendingJobSet {
209 47 : pending: HashSet::new(),
210 47 : completed: HashSet::new(),
211 47 : }
212 47 : }
213 :
214 1561 : fn complete_job(&mut self, job_id: JobId) {
215 1561 : self.pending.remove(&job_id);
216 1561 : self.completed.insert(job_id);
217 1561 : }
218 :
219 1561 : fn all_completed(&self) -> bool {
220 1561 : self.pending.is_empty()
221 1561 : }
222 : }
223 :
224 : // When we decide to rewrite a set of layers, LayerFragment is used to keep
225 : // track which new layers supersede an old layer. When all the stakeholder jobs
226 : // have completed, this layer can be deleted.
227 : struct LayerFragment<E>
228 : where
229 : E: CompactionJobExecutor,
230 : {
231 : layer: E::Layer,
232 :
233 : // If we will write new layers to replace this one, this keeps track of the
234 : // jobs that need to complete before this layer can be deleted. As the jobs
235 : // complete, they are moved from 'pending' to 'completed' set. Once the
236 : // 'pending' set becomes empty, the layer can be deleted.
237 : //
238 : // If None, this layer is not rewritten and must not be deleted.
239 : deletable_after: Option<PendingJobSet>,
240 :
241 : deleted: bool,
242 : }
243 :
244 : impl<E> LayerFragment<E>
245 : where
246 : E: CompactionJobExecutor,
247 : {
248 47 : fn new(layer: E::Layer) -> Self {
249 47 : LayerFragment {
250 47 : layer,
251 47 : deletable_after: None,
252 47 : deleted: false,
253 47 : }
254 47 : }
255 : }
256 :
257 : #[derive(PartialEq)]
258 : enum CompactionStrategy {
259 : Divide,
260 : CreateDelta,
261 : CreateImage,
262 : }
263 :
264 : struct CompactionJob<E: CompactionJobExecutor> {
265 : key_range: Range<E::Key>,
266 : lsn_range: Range<Lsn>,
267 :
268 : strategy: CompactionStrategy,
269 :
270 : input_layers: Vec<LayerId>,
271 :
272 : completed: bool,
273 : }
274 :
275 : impl<'a, E> LevelCompactionState<'a, E>
276 : where
277 : E: CompactionJobExecutor,
278 : {
279 : /// Main loop of the executor.
280 : ///
281 : /// In each iteration, we take the next job from the queue, and execute it.
282 : /// The execution might add new jobs to the queue. Keep going until the
283 : /// queue is empty.
284 : ///
285 : /// Initially, the job queue consists of one Divide job over the whole
286 : /// level. On first call, it is divided into smaller jobs.
287 3 : async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
288 : // TODO: this would be pretty straightforward to parallelize with FuturesUnordered
289 55 : while let Some(next_job_id) = self.job_queue.pop() {
290 52 : info!("executing job {}", next_job_id.0);
291 52 : self.execute_job(next_job_id, ctx).await?;
292 : }
293 :
294 : // all done!
295 3 : Ok(())
296 3 : }
297 :
298 52 : async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
299 52 : let job = &self.jobs[job_id.0];
300 52 : match job.strategy {
301 : CompactionStrategy::Divide => {
302 3 : self.divide_job(job_id, ctx).await?;
303 3 : Ok(())
304 : }
305 : CompactionStrategy::CreateDelta => {
306 49 : let mut deltas: Vec<E::DeltaLayer> = Vec::new();
307 49 : let mut layer_ids: Vec<LayerId> = Vec::new();
308 1610 : for layer_id in &job.input_layers {
309 1561 : let layer = &self.layers[layer_id.0].layer;
310 1561 : if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
311 1561 : deltas.push(dl.clone());
312 1561 : layer_ids.push(*layer_id);
313 1561 : }
314 : }
315 :
316 49 : self.executor
317 49 : .create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
318 0 : .await?;
319 49 : self.jobs[job_id.0].completed = true;
320 :
321 : // did we complete any fragments?
322 1610 : for layer_id in layer_ids {
323 1561 : let l = &mut self.layers[layer_id.0];
324 1561 : if let Some(deletable_after) = l.deletable_after.as_mut() {
325 1561 : deletable_after.complete_job(job_id);
326 1561 : if deletable_after.all_completed() {
327 47 : self.executor.delete_layer(&l.layer, ctx).await?;
328 47 : l.deleted = true;
329 1514 : }
330 0 : }
331 : }
332 :
333 49 : self.next_level = true;
334 49 :
335 49 : Ok(())
336 : }
337 : CompactionStrategy::CreateImage => {
338 0 : self.executor
339 0 : .create_image(job.lsn_range.end, &job.key_range, ctx)
340 0 : .await?;
341 0 : self.jobs[job_id.0].completed = true;
342 0 :
343 0 : // TODO: we could check if any layers < PITR horizon became deletable
344 0 : Ok(())
345 : }
346 : }
347 52 : }
348 :
349 49 : fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
350 49 : let job_id = JobId(self.jobs.len());
351 49 : self.jobs.push(job);
352 49 : self.job_queue.push(job_id);
353 49 : job_id
354 49 : }
355 :
356 : /// Take a partition of the key space, and decide how to compact it.
357 : ///
358 : /// TODO: Currently, this is called exactly once for the level, and we
359 : /// decide whether to create new image layers to cover the whole level, or
360 : /// write a new set of deltas. In the future, this should try to partition
361 : /// the key space, and make the decision separately for each partition.
362 3 : async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
363 3 : let job = &self.jobs[job_id.0];
364 3 : assert!(job.strategy == CompactionStrategy::Divide);
365 :
366 : // Check for dummy cases
367 3 : if job.input_layers.is_empty() {
368 0 : return Ok(());
369 3 : }
370 3 :
371 3 : let job = &self.jobs[job_id.0];
372 3 : assert!(job.strategy == CompactionStrategy::Divide);
373 :
374 : // Would it be better to create images for this partition?
375 : // Decide based on the average density of the level
376 3 : let keyspace_size = keyspace_total_size(
377 3 : &self
378 3 : .executor
379 3 : .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
380 0 : .await?,
381 3 : &self.shard_identity,
382 3 : ) * PAGE_SZ;
383 3 :
384 3 : let wal_size = job
385 3 : .input_layers
386 3 : .iter()
387 47 : .filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
388 47 : .map(|layer_id| self.layers[layer_id.0].layer.file_size())
389 3 : .sum::<u64>();
390 3 : if keyspace_size < wal_size {
391 : // seems worth it
392 0 : info!(
393 0 : "covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
394 : keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
395 : );
396 0 : self.cover_with_images(job_id, ctx).await
397 : } else {
398 : // do deltas
399 3 : info!(
400 0 : "coverage not worth it, keyspace_size {}, wal_size {}",
401 : keyspace_size, wal_size
402 : );
403 3 : self.retile_deltas(job_id, ctx).await
404 : }
405 3 : }
406 :
407 : // LSN
408 : // ^
409 : // |
410 : // | ###|###|#####
411 : // | +--+-----+--+ +--+-----+--+
412 : // | | | | | | | | |
413 : // | +--+--+--+--+ +--+--+--+--+
414 : // | | | | | | |
415 : // | +---+-+-+---+ ==> +---+-+-+---+
416 : // | | | | | | | | |
417 : // | +---+-+-++--+ +---+-+-++--+
418 : // | | | | | | | | |
419 : // | +-----+--+--+ +-----+--+--+
420 : // |
421 : // +--------------> key
422 : //
423 0 : async fn cover_with_images(
424 0 : &mut self,
425 0 : job_id: JobId,
426 0 : ctx: &E::RequestContext,
427 0 : ) -> anyhow::Result<()> {
428 0 : let job = &self.jobs[job_id.0];
429 0 : assert!(job.strategy == CompactionStrategy::Divide);
430 :
431 : // XXX: do we still need the "holes" stuff?
432 :
433 0 : let mut new_jobs = Vec::new();
434 :
435 : // Slide a window through the keyspace
436 0 : let keyspace = self
437 0 : .executor
438 0 : .get_keyspace(&job.key_range, job.lsn_range.end, ctx)
439 0 : .await?;
440 :
441 0 : let mut window = KeyspaceWindow::new(
442 0 : E::Key::MIN..E::Key::MAX,
443 0 : keyspace,
444 0 : self.target_file_size / PAGE_SZ,
445 0 : );
446 0 : while let Some(key_range) = window.choose_next_image(&self.shard_identity) {
447 0 : new_jobs.push(CompactionJob::<E> {
448 0 : key_range,
449 0 : lsn_range: job.lsn_range.clone(),
450 0 : strategy: CompactionStrategy::CreateImage,
451 0 : input_layers: Vec::new(), // XXX: Is it OK for this to be empty for image layer?
452 0 : completed: false,
453 0 : });
454 0 : }
455 :
456 0 : for j in new_jobs.into_iter().rev() {
457 0 : let _job_id = self.push_job(j);
458 0 :
459 0 : // TODO: image layers don't let us delete anything. unless < PITR horizon
460 0 : //let j = &self.jobs[job_id.0];
461 0 : // for layer_id in j.input_layers.iter() {
462 0 : // self.layers[layer_id.0].pending_stakeholders.insert(job_id);
463 0 : //}
464 0 : }
465 :
466 0 : Ok(())
467 0 : }
468 :
469 : // Merge the contents of all the input delta layers into a new set
470 : // of delta layers, based on the current partitioning.
471 : //
472 : // We split the new delta layers on the key dimension. We iterate through
473 : // the key space, and for each key, check if including the next key to the
474 : // current output layer we're building would cause the layer to become too
475 : // large. If so, dump the current output layer and start new one. It's
476 : // possible that there is a single key with so many page versions that
477 : // storing all of them in a single layer file would be too large. In that
478 : // case, we also split on the LSN dimension.
479 : //
480 : // LSN
481 : // ^
482 : // |
483 : // | +-----------+ +--+--+--+--+
484 : // | | | | | | | |
485 : // | +-----------+ | | | | |
486 : // | | | | | | | |
487 : // | +-----------+ ==> | | | | |
488 : // | | | | | | | |
489 : // | +-----------+ | | | | |
490 : // | | | | | | | |
491 : // | +-----------+ +--+--+--+--+
492 : // |
493 : // +--------------> key
494 : //
495 : //
496 : // If one key (X) has a lot of page versions:
497 : //
498 : // LSN
499 : // ^
500 : // | (X)
501 : // | +-----------+ +--+--+--+--+
502 : // | | | | | | | |
503 : // | +-----------+ | | +--+ |
504 : // | | | | | | | |
505 : // | +-----------+ ==> | | | | |
506 : // | | | | | +--+ |
507 : // | +-----------+ | | | | |
508 : // | | | | | | | |
509 : // | +-----------+ +--+--+--+--+
510 : // |
511 : // +--------------> key
512 : //
513 : // TODO: this actually divides the layers into fixed-size chunks, not
514 : // based on the partitioning.
515 : //
516 : // TODO: we should also opportunistically materialize and
517 : // garbage collect what we can.
518 3 : async fn retile_deltas(
519 3 : &mut self,
520 3 : job_id: JobId,
521 3 : ctx: &E::RequestContext,
522 3 : ) -> anyhow::Result<()> {
523 3 : let job = &self.jobs[job_id.0];
524 3 : assert!(job.strategy == CompactionStrategy::Divide);
525 :
526 : // Sweep the key space left to right, running an estimate of how much
527 : // disk size and keyspace we have accumulated
528 : //
529 : // Once the disk size reaches the target threshold, stop and think.
530 : // If we have accumulated only a narrow band of keyspace, create an
531 : // image layer. Otherwise write a delta layer.
532 :
533 : // FIXME: we are ignoring images here. Did we already divide the work
534 : // so that we won't encounter them here?
535 :
536 3 : let mut deltas: Vec<E::DeltaLayer> = Vec::new();
537 50 : for layer_id in &job.input_layers {
538 47 : let l = &self.layers[layer_id.0];
539 47 : if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
540 47 : deltas.push(dl.clone());
541 47 : }
542 : }
543 : // Open stream
544 3 : let key_value_stream =
545 3 : std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
546 0 : .await?
547 3 : .map(Result::<_, anyhow::Error>::Ok));
548 3 : let mut new_jobs = Vec::new();
549 3 :
550 3 : // Slide a window through the keyspace
551 3 : let mut key_accum =
552 3 : std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
553 3 : let mut all_in_window: bool = false;
554 3 : let mut window = Window::new();
555 3 :
556 3 : // Helper function to create a job for a new delta layer with given key-lsn
557 3 : // rectangle.
558 49 : let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
559 49 : // The inputs for the job are all the input layers of the original job that
560 49 : // overlap with the rectangle.
561 49 : let batch_layers: Vec<LayerId> = job
562 49 : .input_layers
563 49 : .iter()
564 1561 : .filter(|layer_id| {
565 1561 : overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
566 1561 : })
567 49 : .cloned()
568 49 : .collect();
569 49 : assert!(!batch_layers.is_empty());
570 49 : new_jobs.push(CompactionJob {
571 49 : key_range,
572 49 : lsn_range: lsn_range.clone(),
573 49 : strategy: CompactionStrategy::CreateDelta,
574 49 : input_layers: batch_layers,
575 49 : completed: false,
576 49 : });
577 49 : };
578 :
579 : loop {
580 93358 : if all_in_window && window.is_empty() {
581 : // All done!
582 3 : break;
583 93355 : }
584 :
585 : // If we now have enough keyspace for next delta layer in the window, create a
586 : // new delta layer
587 93355 : if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
588 : {
589 41 : create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
590 41 : continue;
591 93314 : }
592 93314 : assert!(!all_in_window);
593 :
594 : // Process next key in the key space
595 93314 : match key_accum.next().await.transpose()? {
596 3 : None => {
597 3 : all_in_window = true;
598 3 : }
599 93311 : Some(next_key) if next_key.partition_lsns.is_empty() => {
600 93309 : // Normal case: extend the window by the key
601 93309 : window.feed(next_key.key, next_key.size);
602 93309 : }
603 2 : Some(next_key) => {
604 2 : // A key with too large size impact for a single delta layer. This
605 2 : // case occurs if you make a huge number of updates for a single key.
606 2 : //
607 2 : // Drain the window with has_more = false to make a clean cut before
608 2 : // the key, and then make dedicated delta layers for the single key.
609 2 : //
610 2 : // We cannot cluster the key with the others, because we don't want
611 2 : // layer files to overlap with each other in the lsn,key space (no
612 2 : // overlaps for the rectangles).
613 2 : let key = next_key.key;
614 2 : debug!("key {key} with size impact larger than the layer size");
615 2 : while !window.is_empty() {
616 0 : let has_more = false;
617 0 : let key_range = window.choose_next_delta(self.target_file_size, has_more)
618 0 : .expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
619 0 : create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
620 0 : }
621 :
622 : // Not really required: but here for future resilience:
623 : // We make a "gap" here, so any structure the window holds should
624 : // probably be reset.
625 2 : window = Window::new();
626 2 :
627 2 : let mut prior_lsn = job.lsn_range.start;
628 2 : let mut lsn_ranges = Vec::new();
629 6 : for (lsn, _size) in next_key.partition_lsns.iter() {
630 6 : lsn_ranges.push(prior_lsn..*lsn);
631 6 : prior_lsn = *lsn;
632 6 : }
633 2 : lsn_ranges.push(prior_lsn..job.lsn_range.end);
634 10 : for lsn_range in lsn_ranges {
635 8 : let key_range = key..key.next();
636 8 : create_delta_job(key_range, &lsn_range, &mut new_jobs);
637 8 : }
638 : }
639 : }
640 : }
641 :
642 : // All the input files are rewritten. Set up the tracking for when they can
643 : // be deleted.
644 47 : for layer_id in job.input_layers.iter() {
645 47 : let l = &mut self.layers[layer_id.0];
646 47 : assert!(l.deletable_after.is_none());
647 47 : l.deletable_after = Some(PendingJobSet::new());
648 : }
649 49 : for j in new_jobs.into_iter().rev() {
650 49 : let job_id = self.push_job(j);
651 49 : let j = &self.jobs[job_id.0];
652 1561 : for layer_id in j.input_layers.iter() {
653 1561 : self.layers[layer_id.0]
654 1561 : .deletable_after
655 1561 : .as_mut()
656 1561 : .unwrap()
657 1561 : .pending
658 1561 : .insert(job_id);
659 1561 : }
660 : }
661 :
662 3 : Ok(())
663 3 : }
664 : }
665 :
666 : /// Sliding window through keyspace and values for image layer
667 : /// This is used by [`LevelCompactionState::cover_with_images`] to decide on good split points
668 : struct KeyspaceWindow<K> {
669 : head: KeyspaceWindowHead<K>,
670 :
671 : start_pos: KeyspaceWindowPos<K>,
672 : }
673 : struct KeyspaceWindowHead<K> {
674 : // overall key range to cover
675 : key_range: Range<K>,
676 :
677 : keyspace: Vec<Range<K>>,
678 : target_keysize: u64,
679 : }
680 :
681 : #[derive(Clone)]
682 : struct KeyspaceWindowPos<K> {
683 : end_key: K,
684 :
685 : keyspace_idx: usize,
686 :
687 : accum_keysize: u64,
688 : }
689 : impl<K: CompactionKey> KeyspaceWindowPos<K> {
690 0 : fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
691 0 : self.keyspace_idx == w.keyspace.len()
692 0 : }
693 :
694 : // Advance the cursor until it reaches 'target_keysize'.
695 0 : fn advance_until_size(
696 0 : &mut self,
697 0 : w: &KeyspaceWindowHead<K>,
698 0 : max_size: u64,
699 0 : shard_identity: &ShardIdentity,
700 0 : ) {
701 0 : while self.accum_keysize < max_size && !self.reached_end(w) {
702 0 : let curr_range = &w.keyspace[self.keyspace_idx];
703 0 : if self.end_key < curr_range.start {
704 0 : // skip over any unused space
705 0 : self.end_key = curr_range.start;
706 0 : }
707 :
708 : // We're now within 'curr_range'. Can we advance past it completely?
709 0 : let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity);
710 0 : if (self.accum_keysize + distance as u64) < max_size {
711 0 : // oh yeah, it fits
712 0 : self.end_key = curr_range.end;
713 0 : self.keyspace_idx += 1;
714 0 : self.accum_keysize += distance as u64;
715 0 : } else {
716 : // advance within the range
717 0 : let skip_key = self.end_key.skip_some();
718 0 : let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity);
719 0 : if (self.accum_keysize + distance as u64) < max_size {
720 0 : self.end_key = skip_key;
721 0 : self.accum_keysize += distance as u64;
722 0 : } else {
723 0 : self.end_key = self.end_key.next();
724 0 : self.accum_keysize += 1;
725 0 : }
726 : }
727 : }
728 0 : }
729 : }
730 :
731 : impl<K> KeyspaceWindow<K>
732 : where
733 : K: CompactionKey,
734 : {
735 0 : fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
736 0 : assert!(keyspace.first().unwrap().start >= key_range.start);
737 :
738 0 : let start_key = key_range.start;
739 0 : let start_pos = KeyspaceWindowPos::<K> {
740 0 : end_key: start_key,
741 0 : keyspace_idx: 0,
742 0 : accum_keysize: 0,
743 0 : };
744 0 : Self {
745 0 : head: KeyspaceWindowHead::<K> {
746 0 : key_range,
747 0 : keyspace,
748 0 : target_keysize,
749 0 : },
750 0 : start_pos,
751 0 : }
752 0 : }
753 :
754 0 : fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option<Range<K>> {
755 0 : if self.start_pos.keyspace_idx == self.head.keyspace.len() {
756 : // we've reached the end
757 0 : return None;
758 0 : }
759 0 :
760 0 : let mut next_pos = self.start_pos.clone();
761 0 : next_pos.advance_until_size(
762 0 : &self.head,
763 0 : self.start_pos.accum_keysize + self.head.target_keysize,
764 0 : shard_identity,
765 0 : );
766 0 :
767 0 : // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
768 0 : // 1.25x target size
769 0 : let mut end_pos = next_pos.clone();
770 0 : end_pos.advance_until_size(
771 0 : &self.head,
772 0 : self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
773 0 : shard_identity,
774 0 : );
775 0 : if end_pos.reached_end(&self.head) {
776 : // gobble up any unused keyspace between the last used key and end of the range
777 0 : assert!(end_pos.end_key <= self.head.key_range.end);
778 0 : end_pos.end_key = self.head.key_range.end;
779 0 : next_pos = end_pos;
780 0 : }
781 :
782 0 : let start_key = self.start_pos.end_key;
783 0 : self.start_pos = next_pos;
784 0 : Some(start_key..self.start_pos.end_key)
785 0 : }
786 : }
787 :
788 : // Take previous partitioning, based on the image layers below.
789 : //
790 : // Candidate is at the front:
791 : //
792 : // Consider stretching an image layer to next divider? If it's close enough,
793 : // that's the image candidate
794 : //
795 : // If it's too far, consider splitting at a reasonable point
796 : //
797 : // Is the image candidate smaller than the equivalent delta? If so,
798 : // split off the image. Otherwise, split off one delta.
799 : // Try to snap off the delta at a reasonable point
800 :
801 : struct WindowElement<K> {
802 : start_key: K, // inclusive
803 : last_key: K, // inclusive
804 : accum_size: u64,
805 : }
806 :
807 : /// Sliding window through keyspace and values for delta layer tiling
808 : ///
809 : /// This is used to decide which delta layer to write next.
810 : struct Window<K> {
811 : elems: VecDeque<WindowElement<K>>,
812 :
813 : // last key that was split off, inclusive
814 : splitoff_key: Option<K>,
815 : splitoff_size: u64,
816 : }
817 :
818 : impl<K> Window<K>
819 : where
820 : K: CompactionKey,
821 : {
822 5 : fn new() -> Self {
823 5 : Self {
824 5 : elems: VecDeque::new(),
825 5 : splitoff_key: None,
826 5 : splitoff_size: 0,
827 5 : }
828 5 : }
829 :
830 93309 : fn feed(&mut self, key: K, size: u64) {
831 : let last_size;
832 93309 : if let Some(last) = self.elems.back_mut() {
833 : // We require the keys to be strictly increasing for the window.
834 : // Keys should already have been deduplicated by `accum_key_values`
835 93306 : assert!(
836 93306 : last.last_key < key,
837 0 : "last_key(={}) >= key(={key})",
838 : last.last_key
839 : );
840 93306 : last_size = last.accum_size;
841 3 : } else {
842 3 : last_size = 0;
843 3 : }
844 : // This is a new key.
845 93309 : let elem = WindowElement {
846 93309 : start_key: key,
847 93309 : last_key: key,
848 93309 : accum_size: last_size + size,
849 93309 : };
850 93309 : self.elems.push_back(elem);
851 93309 : }
852 :
853 7930 : fn remain_size(&self) -> u64 {
854 7930 : self.elems.back().unwrap().accum_size - self.splitoff_size
855 7930 : }
856 :
857 186592 : fn peek_size(&self) -> u64 {
858 186592 : self.elems.front().unwrap().accum_size - self.splitoff_size
859 186592 : }
860 :
861 8 : fn is_empty(&self) -> bool {
862 8 : self.elems.is_empty()
863 8 : }
864 :
865 93283 : fn commit_upto(&mut self, mut upto: usize) {
866 186551 : while upto > 1 {
867 93268 : let popped = self.elems.pop_front().unwrap();
868 93268 : self.elems.front_mut().unwrap().start_key = popped.start_key;
869 93268 : upto -= 1;
870 93268 : }
871 93283 : }
872 :
873 38 : fn find_size_split(&self, target_size: u64) -> usize {
874 38 : self.elems
875 334 : .partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
876 38 : }
877 :
878 41 : fn pop(&mut self) {
879 41 : let first = self.elems.pop_front().unwrap();
880 41 : self.splitoff_size = first.accum_size;
881 41 :
882 41 : self.splitoff_key = Some(first.last_key);
883 41 : }
884 :
885 : // the difference between delta and image is that an image covers
886 : // any unused keyspace before and after, while a delta tries to
887 : // minimize that. TODO: difference not implemented
888 41 : fn pop_delta(&mut self) -> Range<K> {
889 41 : let first = self.elems.front().unwrap();
890 41 : let key_range = first.start_key..first.last_key.next();
891 41 :
892 41 : self.pop();
893 41 : key_range
894 41 : }
895 :
896 : // Prerequisite: we have enough input in the window
897 : //
898 : // On return None, the caller should feed more data and call again
899 93355 : fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
900 93355 : if has_more && self.elems.is_empty() {
901 : // Starting up
902 5 : return None;
903 93350 : }
904 :
905 : // If we still have an undersized candidate, just keep going
906 186592 : while self.peek_size() < target_size {
907 178664 : if self.elems.len() > 1 {
908 93242 : self.commit_upto(2);
909 93242 : } else if has_more {
910 85420 : return None;
911 : } else {
912 2 : break;
913 : }
914 : }
915 :
916 : // Ensure we have enough input in the window to make a good decision
917 7930 : if has_more && self.remain_size() < target_size * 5 / 4 {
918 7889 : return None;
919 41 : }
920 41 :
921 41 : // The candidate on the front is now large enough, for a delta.
922 41 : // And we have enough data in the window to decide.
923 41 :
924 41 : // If we're willing to stretch it up to 1.25 target size, could we
925 41 : // gobble up the rest of the work? This avoids creating very small
926 41 : // "tail" layers at the end of the keyspace
927 41 : if !has_more && self.remain_size() < target_size * 5 / 4 {
928 3 : self.commit_upto(self.elems.len());
929 3 : } else {
930 38 : let delta_split_at = self.find_size_split(target_size);
931 38 : self.commit_upto(delta_split_at);
932 38 :
933 38 : // If it's still not large enough, request the caller to fill the window
934 38 : if self.elems.len() == 1 && has_more {
935 0 : return None;
936 38 : }
937 : }
938 41 : Some(self.pop_delta())
939 93355 : }
940 : }
|