Line data Source code
1 : //! This file contains generic utility functions over the interface types,
2 : //! which could be handy for any compaction implementation.
3 : use crate::interface::*;
4 :
5 : use futures::future::BoxFuture;
6 : use futures::{Stream, StreamExt};
7 : use itertools::Itertools;
8 : use pageserver_api::shard::ShardIdentity;
9 : use pin_project_lite::pin_project;
10 : use std::collections::BinaryHeap;
11 : use std::collections::VecDeque;
12 : use std::fmt::Display;
13 : use std::future::Future;
14 : use std::ops::{DerefMut, Range};
15 : use std::pin::Pin;
16 : use std::task::{ready, Poll};
17 : use utils::lsn::Lsn;
18 :
19 : pub const PAGE_SZ: u64 = 8192;
20 :
21 3 : pub fn keyspace_total_size<K>(
22 3 : keyspace: &CompactionKeySpace<K>,
23 3 : shard_identity: &ShardIdentity,
24 3 : ) -> u64
25 3 : where
26 3 : K: CompactionKey,
27 3 : {
28 3 : keyspace
29 3 : .iter()
30 3 : .map(|r| K::key_range_size(r, shard_identity) as u64)
31 3 : .sum()
32 3 : }
33 :
34 13821 : pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
35 13821 : !(a.end <= b.start || b.end <= a.start)
36 13821 : }
37 :
38 : /// Whether a fully contains b, example as below
39 : /// ```plain
40 : /// | a |
41 : /// | b |
42 : /// ```
43 120 : pub fn fully_contains<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
44 120 : a.start <= b.start && a.end >= b.end
45 120 : }
46 :
47 2397 : pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
48 2397 : let x = std::mem::take(a);
49 2397 : let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
50 2397 : .into_iter()
51 2397 : .kmerge_by(|a, b| a.start < b.start);
52 2397 : let mut ranges = Vec::new();
53 2397 : if let Some(first) = all_ranges_iter.next() {
54 2397 : let (mut start, mut end) = (first.start, first.end);
55 :
56 4792 : for r in all_ranges_iter {
57 2395 : assert!(r.start >= start);
58 2395 : if r.start > end {
59 0 : ranges.push(start..end);
60 0 : start = r.start;
61 0 : end = r.end;
62 2395 : } else if r.end > end {
63 0 : end = r.end;
64 2395 : }
65 : }
66 2397 : ranges.push(start..end);
67 0 : }
68 2397 : *a = ranges
69 2397 : }
70 :
71 3 : pub fn intersect_keyspace<K: Ord + Clone + Copy>(
72 3 : a: &CompactionKeySpace<K>,
73 3 : r: &Range<K>,
74 3 : ) -> CompactionKeySpace<K> {
75 3 : let mut ranges: Vec<Range<K>> = Vec::new();
76 :
77 3 : for x in a.iter() {
78 3 : if x.end <= r.start {
79 0 : continue;
80 3 : }
81 3 : if x.start >= r.end {
82 0 : break;
83 3 : }
84 3 : ranges.push(x.clone())
85 : }
86 :
87 : // trim the ends
88 3 : if let Some(first) = ranges.first_mut() {
89 3 : first.start = std::cmp::max(first.start, r.start);
90 3 : }
91 3 : if let Some(last) = ranges.last_mut() {
92 3 : last.end = std::cmp::min(last.end, r.end);
93 3 : }
94 3 : ranges
95 3 : }
96 :
97 : /// Create a stream that iterates through all DeltaEntrys among all input
98 : /// layers, in key-lsn order.
99 : ///
100 : /// This is public because the create_delta() implementation likely wants to use this too
101 : /// TODO: move to a more shared place
102 49 : pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
103 49 : layers: &'a [E::DeltaLayer],
104 49 : ctx: &'a E::RequestContext,
105 49 : ) -> MergeDeltaKeys<'a, E> {
106 49 : // Use a binary heap to merge the layers. Each input layer is initially
107 49 : // represented by a LazyLoadLayer::Unloaded element, which uses the start of
108 49 : // the layer's key range as the key. The first time a layer reaches the top
109 49 : // of the heap, all the keys of the layer are loaded into a sorted vector.
110 49 : //
111 49 : // This helps to keep the memory usage reasonable: we only need to hold in
112 49 : // memory the DeltaEntrys of the layers that overlap with the "current" key.
113 49 : let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
114 1610 : for l in layers {
115 1561 : heap.push(LazyLoadLayer::Unloaded(l));
116 1561 : }
117 49 : MergeDeltaKeys {
118 49 : heap,
119 49 : ctx,
120 49 : load_future: None,
121 49 : }
122 49 : }
123 :
124 3 : pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
125 3 : layers: &'a [E::DeltaLayer],
126 3 : ctx: &'a E::RequestContext,
127 3 : ) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
128 3 : {
129 3 : let mut keys = Vec::new();
130 50 : for l in layers {
131 : // Boxing and casting to LoadFuture is required to obtain the right Sync bound.
132 : // If we do l.load_keys(ctx).await? directly, there is a compilation error.
133 47 : let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
134 47 : keys.extend(load_future.await?.into_iter());
135 : }
136 5296390 : keys.sort_by_key(|k| (k.key(), k.lsn()));
137 3 : let stream = futures::stream::iter(keys.into_iter());
138 3 : Ok(stream)
139 3 : }
140 :
141 : enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
142 : Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
143 : Unloaded(&'a E::DeltaLayer),
144 : }
145 : impl<E: CompactionJobExecutor> LazyLoadLayer<'_, E> {
146 40930432 : fn min_key(&self) -> E::Key {
147 40930432 : match self {
148 33625965 : Self::Loaded(entries) => entries.front().unwrap().key(),
149 7304467 : Self::Unloaded(dl) => dl.key_range().start,
150 : }
151 40930432 : }
152 40930432 : fn min_lsn(&self) -> Lsn {
153 40930432 : match self {
154 33625965 : Self::Loaded(entries) => entries.front().unwrap().lsn(),
155 7304467 : Self::Unloaded(dl) => dl.lsn_range().start,
156 : }
157 40930432 : }
158 : }
159 : impl<E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'_, E> {
160 20465216 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
161 20465216 : Some(self.cmp(other))
162 20465216 : }
163 : }
164 : impl<E: CompactionJobExecutor> Ord for LazyLoadLayer<'_, E> {
165 20465216 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
166 20465216 : // reverse order so that we get a min-heap
167 20465216 : (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
168 20465216 : }
169 : }
170 : impl<E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'_, E> {
171 0 : fn eq(&self, other: &Self) -> bool {
172 0 : self.cmp(other) == std::cmp::Ordering::Equal
173 0 : }
174 : }
175 : impl<E: CompactionJobExecutor> Eq for LazyLoadLayer<'_, E> {}
176 :
177 : type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
178 :
179 : // Stream returned by `merge_delta_keys`
180 : pin_project! {
181 : #[allow(clippy::type_complexity)]
182 : pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
183 : heap: BinaryHeap<LazyLoadLayer<'a, E>>,
184 :
185 : #[pin]
186 : load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
187 :
188 : ctx: &'a E::RequestContext,
189 : }
190 : }
191 :
192 : impl<'a, E> Stream for MergeDeltaKeys<'a, E>
193 : where
194 : E: CompactionJobExecutor + 'a,
195 : {
196 : type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
197 :
198 5522610 : fn poll_next(
199 5522610 : self: Pin<&mut Self>,
200 5522610 : cx: &mut std::task::Context<'_>,
201 5522610 : ) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
202 5522610 : let mut this = self.project();
203 : loop {
204 5524171 : if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
205 : // We are waiting for loading the keys to finish
206 1561 : match ready!(load_future.as_mut().poll(cx)) {
207 1561 : Ok(entries) => {
208 1561 : this.load_future.set(None);
209 1561 : *this.heap.peek_mut().unwrap() =
210 1561 : LazyLoadLayer::Loaded(VecDeque::from(entries));
211 1561 : }
212 0 : Err(e) => {
213 0 : return Poll::Ready(Some(Err(e)));
214 : }
215 : }
216 5522610 : }
217 :
218 : // If the topmost layer in the heap hasn't been loaded yet, start
219 : // loading it. Otherwise return the next entry from it and update
220 : // the layer's position in the heap (this decreaseKey operation is
221 : // performed implicitly when `top` is dropped).
222 5524171 : if let Some(mut top) = this.heap.peek_mut() {
223 5524122 : match top.deref_mut() {
224 1561 : LazyLoadLayer::Unloaded(ref mut l) => {
225 1561 : let fut = l.load_keys(this.ctx);
226 1561 : this.load_future.set(Some(Box::pin(fut)));
227 1561 : continue;
228 : }
229 5522561 : LazyLoadLayer::Loaded(ref mut entries) => {
230 5522561 : let result = entries.pop_front().unwrap();
231 5522561 : if entries.is_empty() {
232 1561 : std::collections::binary_heap::PeekMut::pop(top);
233 5521000 : }
234 5522561 : return Poll::Ready(Some(Ok(result)));
235 : }
236 : }
237 : } else {
238 49 : return Poll::Ready(None);
239 : }
240 : }
241 5522610 : }
242 : }
243 :
244 : // Accumulate values at key boundaries
245 : pub struct KeySize<K> {
246 : pub key: K,
247 : pub num_values: u64,
248 : pub size: u64,
249 : /// The lsns to partition at (if empty then no per-lsn partitioning)
250 : pub partition_lsns: Vec<(Lsn, u64)>,
251 : }
252 :
253 3 : pub fn accum_key_values<'a, I, K, D, E>(
254 3 : input: I,
255 3 : target_size: u64,
256 3 : ) -> impl Stream<Item = Result<KeySize<K>, E>>
257 3 : where
258 3 : K: Eq + PartialOrd + Display + Copy,
259 3 : I: Stream<Item = Result<D, E>>,
260 3 : D: CompactionDeltaEntry<'a, K>,
261 3 : {
262 3 : async_stream::try_stream! {
263 3 : // Initialize the state from the first value
264 3 : let mut input = std::pin::pin!(input);
265 3 :
266 3 : if let Some(first) = input.next().await {
267 3 : let first = first?;
268 3 : let mut part_size = first.size();
269 3 : let mut accum: KeySize<K> = KeySize {
270 3 : key: first.key(),
271 3 : num_values: 1,
272 3 : size: part_size,
273 3 : partition_lsns: Vec::new(),
274 3 : };
275 3 : let mut last_key = accum.key;
276 3 : while let Some(this) = input.next().await {
277 3 : let this = this?;
278 3 : if this.key() == accum.key {
279 3 : let add_size = this.size();
280 3 : if part_size + add_size > target_size {
281 3 : accum.partition_lsns.push((this.lsn(), part_size));
282 3 : part_size = 0;
283 3 : }
284 3 : part_size += add_size;
285 3 : accum.size += add_size;
286 3 : accum.num_values += 1;
287 3 : } else {
288 3 : assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
289 3 : last_key = accum.key;
290 3 : yield accum;
291 3 : part_size = this.size();
292 3 : accum = KeySize {
293 3 : key: this.key(),
294 3 : num_values: 1,
295 3 : size: part_size,
296 3 : partition_lsns: Vec::new(),
297 3 : };
298 3 : }
299 3 : }
300 3 : assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
301 3 : yield accum;
302 3 : }
303 3 : }
304 3 : }
|