Line data Source code
1 : //! This file contains generic utility functions over the interface types,
2 : //! which could be handy for any compaction implementation.
3 : use crate::interface::*;
4 :
5 : use futures::future::BoxFuture;
6 : use futures::{Stream, StreamExt};
7 : use itertools::Itertools;
8 : use pin_project_lite::pin_project;
9 : use std::cmp::Ord;
10 : use std::collections::BinaryHeap;
11 : use std::collections::VecDeque;
12 : use std::future::Future;
13 : use std::ops::{DerefMut, Range};
14 : use std::pin::Pin;
15 : use std::task::{ready, Poll};
16 :
17 0 : pub fn keyspace_total_size<K>(keyspace: &CompactionKeySpace<K>) -> u64
18 0 : where
19 0 : K: CompactionKey,
20 0 : {
21 0 : keyspace.iter().map(|r| K::key_range_size(r) as u64).sum()
22 0 : }
23 :
24 0 : pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
25 0 : !(a.end <= b.start || b.end <= a.start)
26 0 : }
27 :
28 0 : pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
29 0 : let x = std::mem::take(a);
30 0 : let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
31 0 : .into_iter()
32 0 : .kmerge_by(|a, b| a.start < b.start);
33 0 : let mut ranges = Vec::new();
34 0 : if let Some(first) = all_ranges_iter.next() {
35 0 : let (mut start, mut end) = (first.start, first.end);
36 :
37 0 : for r in all_ranges_iter {
38 0 : assert!(r.start >= start);
39 0 : if r.start > end {
40 0 : ranges.push(start..end);
41 0 : start = r.start;
42 0 : end = r.end;
43 0 : } else if r.end > end {
44 0 : end = r.end;
45 0 : }
46 : }
47 0 : ranges.push(start..end);
48 0 : }
49 0 : *a = ranges
50 0 : }
51 :
52 0 : pub fn intersect_keyspace<K: Ord + Clone + Copy>(
53 0 : a: &CompactionKeySpace<K>,
54 0 : r: &Range<K>,
55 0 : ) -> CompactionKeySpace<K> {
56 0 : let mut ranges: Vec<Range<K>> = Vec::new();
57 :
58 0 : for x in a.iter() {
59 0 : if x.end <= r.start {
60 0 : continue;
61 0 : }
62 0 : if x.start >= r.end {
63 0 : break;
64 0 : }
65 0 : ranges.push(x.clone())
66 : }
67 :
68 : // trim the ends
69 0 : if let Some(first) = ranges.first_mut() {
70 0 : first.start = std::cmp::max(first.start, r.start);
71 0 : }
72 0 : if let Some(last) = ranges.last_mut() {
73 0 : last.end = std::cmp::min(last.end, r.end);
74 0 : }
75 0 : ranges
76 0 : }
77 :
78 : /// Create a stream that iterates through all DeltaEntrys among all input
79 : /// layers, in key-lsn order.
80 : ///
81 : /// This is public because the create_delta() implementation likely wants to use this too
82 : /// TODO: move to a more shared place
83 0 : pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
84 0 : layers: &'a [E::DeltaLayer],
85 0 : ctx: &'a E::RequestContext,
86 0 : ) -> MergeDeltaKeys<'a, E> {
87 0 : // Use a binary heap to merge the layers. Each input layer is initially
88 0 : // represented by a LazyLoadLayer::Unloaded element, which uses the start of
89 0 : // the layer's key range as the key. The first time a layer reaches the top
90 0 : // of the heap, all the keys of the layer are loaded into a sorted vector.
91 0 : //
92 0 : // This helps to keep the memory usage reasonable: we only need to hold in
93 0 : // memory the DeltaEntrys of the layers that overlap with the "current" key.
94 0 : let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
95 0 : for l in layers {
96 0 : heap.push(LazyLoadLayer::Unloaded(l));
97 0 : }
98 0 : MergeDeltaKeys {
99 0 : heap,
100 0 : ctx,
101 0 : load_future: None,
102 0 : }
103 0 : }
104 :
105 : enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
106 : Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
107 : Unloaded(&'a E::DeltaLayer),
108 : }
109 : impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
110 0 : fn key(&self) -> E::Key {
111 0 : match self {
112 0 : Self::Loaded(entries) => entries.front().unwrap().key(),
113 0 : Self::Unloaded(dl) => dl.key_range().start,
114 : }
115 0 : }
116 : }
117 : impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
118 0 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
119 0 : Some(self.cmp(other))
120 0 : }
121 : }
122 : impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
123 0 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
124 0 : // reverse order so that we get a min-heap
125 0 : other.key().cmp(&self.key())
126 0 : }
127 : }
128 : impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
129 0 : fn eq(&self, other: &Self) -> bool {
130 0 : self.key().eq(&other.key())
131 0 : }
132 : }
133 : impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
134 :
135 : type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
136 :
137 : // Stream returned by `merge_delta_keys`
138 : pin_project! {
139 : #[allow(clippy::type_complexity)]
140 : pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
141 : heap: BinaryHeap<LazyLoadLayer<'a, E>>,
142 :
143 : #[pin]
144 : load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
145 :
146 : ctx: &'a E::RequestContext,
147 : }
148 : }
149 :
150 : impl<'a, E> Stream for MergeDeltaKeys<'a, E>
151 : where
152 : E: CompactionJobExecutor + 'a,
153 : {
154 : type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
155 :
156 0 : fn poll_next(
157 0 : self: Pin<&mut Self>,
158 0 : cx: &mut std::task::Context<'_>,
159 0 : ) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
160 0 : let mut this = self.project();
161 : loop {
162 0 : if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
163 : // We are waiting for loading the keys to finish
164 0 : match ready!(load_future.as_mut().poll(cx)) {
165 0 : Ok(entries) => {
166 0 : this.load_future.set(None);
167 0 : *this.heap.peek_mut().unwrap() =
168 0 : LazyLoadLayer::Loaded(VecDeque::from(entries));
169 0 : }
170 0 : Err(e) => {
171 0 : return Poll::Ready(Some(Err(e)));
172 : }
173 : }
174 0 : }
175 :
176 : // If the topmost layer in the heap hasn't been loaded yet, start
177 : // loading it. Otherwise return the next entry from it and update
178 : // the layer's position in the heap (this decreaseKey operation is
179 : // performed implicitly when `top` is dropped).
180 0 : if let Some(mut top) = this.heap.peek_mut() {
181 0 : match top.deref_mut() {
182 0 : LazyLoadLayer::Unloaded(ref mut l) => {
183 0 : let fut = l.load_keys(this.ctx);
184 0 : this.load_future.set(Some(fut));
185 0 : continue;
186 : }
187 0 : LazyLoadLayer::Loaded(ref mut entries) => {
188 0 : let result = entries.pop_front().unwrap();
189 0 : if entries.is_empty() {
190 0 : std::collections::binary_heap::PeekMut::pop(top);
191 0 : }
192 0 : return Poll::Ready(Some(Ok(result)));
193 : }
194 : }
195 : } else {
196 0 : return Poll::Ready(None);
197 : }
198 : }
199 0 : }
200 : }
201 :
202 : // Accumulate values at key boundaries
203 : pub struct KeySize<K> {
204 : pub key: K,
205 : pub num_values: u64,
206 : pub size: u64,
207 : }
208 :
209 0 : pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
210 0 : where
211 0 : K: Eq,
212 0 : I: Stream<Item = Result<D, E>>,
213 0 : D: CompactionDeltaEntry<'a, K>,
214 0 : {
215 0 : async_stream::try_stream! {
216 0 : // Initialize the state from the first value
217 0 : let mut input = std::pin::pin!(input);
218 :
219 0 : if let Some(first) = input.next().await {
220 0 : let first = first?;
221 0 : let mut accum: KeySize<K> = KeySize {
222 0 : key: first.key(),
223 0 : num_values: 1,
224 0 : size: first.size(),
225 0 : };
226 0 : while let Some(this) = input.next().await {
227 0 : let this = this?;
228 0 : if this.key() == accum.key {
229 0 : accum.size += this.size();
230 0 : accum.num_values += 1;
231 0 : } else {
232 0 : yield accum;
233 0 : accum = KeySize {
234 0 : key: this.key(),
235 0 : num_values: 1,
236 0 : size: this.size(),
237 0 : };
238 : }
239 : }
240 0 : yield accum;
241 0 : }
242 : }
243 0 : }
|