Line data Source code
1 : //! This file contains generic utility functions over the interface types,
2 : //! which could be handy for any compaction implementation.
3 : use crate::interface::*;
4 :
5 : use futures::future::BoxFuture;
6 : use futures::{Stream, StreamExt};
7 : use itertools::Itertools;
8 : use pageserver_api::shard::ShardIdentity;
9 : use pin_project_lite::pin_project;
10 : use std::collections::BinaryHeap;
11 : use std::collections::VecDeque;
12 : use std::fmt::Display;
13 : use std::future::Future;
14 : use std::ops::{DerefMut, Range};
15 : use std::pin::Pin;
16 : use std::task::{ready, Poll};
17 : use utils::lsn::Lsn;
18 :
19 2 : pub fn keyspace_total_size<K>(
20 2 : keyspace: &CompactionKeySpace<K>,
21 2 : shard_identity: &ShardIdentity,
22 2 : ) -> u64
23 2 : where
24 2 : K: CompactionKey,
25 2 : {
26 2 : keyspace
27 2 : .iter()
28 2 : .map(|r| K::key_range_size(r, shard_identity) as u64)
29 2 : .sum()
30 2 : }
31 :
32 3354 : pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
33 3354 : !(a.end <= b.start || b.end <= a.start)
34 3354 : }
35 :
36 798 : pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
37 798 : let x = std::mem::take(a);
38 798 : let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
39 798 : .into_iter()
40 798 : .kmerge_by(|a, b| a.start < b.start);
41 798 : let mut ranges = Vec::new();
42 798 : if let Some(first) = all_ranges_iter.next() {
43 798 : let (mut start, mut end) = (first.start, first.end);
44 :
45 1594 : for r in all_ranges_iter {
46 796 : assert!(r.start >= start);
47 796 : if r.start > end {
48 0 : ranges.push(start..end);
49 0 : start = r.start;
50 0 : end = r.end;
51 796 : } else if r.end > end {
52 0 : end = r.end;
53 796 : }
54 : }
55 798 : ranges.push(start..end);
56 0 : }
57 798 : *a = ranges
58 798 : }
59 :
60 2 : pub fn intersect_keyspace<K: Ord + Clone + Copy>(
61 2 : a: &CompactionKeySpace<K>,
62 2 : r: &Range<K>,
63 2 : ) -> CompactionKeySpace<K> {
64 2 : let mut ranges: Vec<Range<K>> = Vec::new();
65 :
66 2 : for x in a.iter() {
67 2 : if x.end <= r.start {
68 0 : continue;
69 2 : }
70 2 : if x.start >= r.end {
71 0 : break;
72 2 : }
73 2 : ranges.push(x.clone())
74 : }
75 :
76 : // trim the ends
77 2 : if let Some(first) = ranges.first_mut() {
78 2 : first.start = std::cmp::max(first.start, r.start);
79 2 : }
80 2 : if let Some(last) = ranges.last_mut() {
81 2 : last.end = std::cmp::min(last.end, r.end);
82 2 : }
83 2 : ranges
84 2 : }
85 :
86 : /// Create a stream that iterates through all DeltaEntrys among all input
87 : /// layers, in key-lsn order.
88 : ///
89 : /// This is public because the create_delta() implementation likely wants to use this too
90 : /// TODO: move to a more shared place
91 78 : pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
92 78 : layers: &'a [E::DeltaLayer],
93 78 : ctx: &'a E::RequestContext,
94 78 : ) -> MergeDeltaKeys<'a, E> {
95 78 : // Use a binary heap to merge the layers. Each input layer is initially
96 78 : // represented by a LazyLoadLayer::Unloaded element, which uses the start of
97 78 : // the layer's key range as the key. The first time a layer reaches the top
98 78 : // of the heap, all the keys of the layer are loaded into a sorted vector.
99 78 : //
100 78 : // This helps to keep the memory usage reasonable: we only need to hold in
101 78 : // memory the DeltaEntrys of the layers that overlap with the "current" key.
102 78 : let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
103 3120 : for l in layers {
104 3042 : heap.push(LazyLoadLayer::Unloaded(l));
105 3042 : }
106 78 : MergeDeltaKeys {
107 78 : heap,
108 78 : ctx,
109 78 : load_future: None,
110 78 : }
111 78 : }
112 :
113 2 : pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
114 2 : layers: &'a [E::DeltaLayer],
115 2 : ctx: &'a E::RequestContext,
116 2 : ) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
117 2 : {
118 2 : let mut keys = Vec::new();
119 80 : for l in layers {
120 : // Boxing and casting to LoadFuture is required to obtain the right Sync bound.
121 : // If we do l.load_keys(ctx).await? directly, there is a compilation error.
122 78 : let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
123 78 : keys.extend(load_future.await?.into_iter());
124 : }
125 1032710 : keys.sort_by_key(|k| (k.key(), k.lsn()));
126 2 : let stream = futures::stream::iter(keys.into_iter());
127 2 : Ok(stream)
128 2 : }
129 :
130 : enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
131 : Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
132 : Unloaded(&'a E::DeltaLayer),
133 : }
134 : impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
135 49040550 : fn min_key(&self) -> E::Key {
136 49040550 : match self {
137 48976239 : Self::Loaded(entries) => entries.front().unwrap().key(),
138 64311 : Self::Unloaded(dl) => dl.key_range().start,
139 : }
140 49040550 : }
141 49040550 : fn min_lsn(&self) -> Lsn {
142 49040550 : match self {
143 48976239 : Self::Loaded(entries) => entries.front().unwrap().lsn(),
144 64311 : Self::Unloaded(dl) => dl.lsn_range().start,
145 : }
146 49040550 : }
147 : }
148 : impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
149 24520275 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150 24520275 : Some(self.cmp(other))
151 24520275 : }
152 : }
153 : impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
154 24520275 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
155 24520275 : // reverse order so that we get a min-heap
156 24520275 : (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
157 24520275 : }
158 : }
159 : impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
160 0 : fn eq(&self, other: &Self) -> bool {
161 0 : self.cmp(other) == std::cmp::Ordering::Equal
162 0 : }
163 : }
164 : impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
165 :
166 : type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
167 :
168 : // Stream returned by `merge_delta_keys`
169 : pin_project! {
170 : #[allow(clippy::type_complexity)]
171 : pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
172 : heap: BinaryHeap<LazyLoadLayer<'a, E>>,
173 :
174 : #[pin]
175 : load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
176 :
177 : ctx: &'a E::RequestContext,
178 : }
179 : }
180 :
181 : impl<'a, E> Stream for MergeDeltaKeys<'a, E>
182 : where
183 : E: CompactionJobExecutor + 'a,
184 : {
185 : type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
186 :
187 3045120 : fn poll_next(
188 3045120 : self: Pin<&mut Self>,
189 3045120 : cx: &mut std::task::Context<'_>,
190 3045120 : ) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
191 3045120 : let mut this = self.project();
192 : loop {
193 3048162 : if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
194 : // We are waiting for loading the keys to finish
195 3042 : match ready!(load_future.as_mut().poll(cx)) {
196 3042 : Ok(entries) => {
197 3042 : this.load_future.set(None);
198 3042 : *this.heap.peek_mut().unwrap() =
199 3042 : LazyLoadLayer::Loaded(VecDeque::from(entries));
200 3042 : }
201 0 : Err(e) => {
202 0 : return Poll::Ready(Some(Err(e)));
203 : }
204 : }
205 3045120 : }
206 :
207 : // If the topmost layer in the heap hasn't been loaded yet, start
208 : // loading it. Otherwise return the next entry from it and update
209 : // the layer's position in the heap (this decreaseKey operation is
210 : // performed implicitly when `top` is dropped).
211 3048162 : if let Some(mut top) = this.heap.peek_mut() {
212 3048084 : match top.deref_mut() {
213 3042 : LazyLoadLayer::Unloaded(ref mut l) => {
214 3042 : let fut = l.load_keys(this.ctx);
215 3042 : this.load_future.set(Some(Box::pin(fut)));
216 3042 : continue;
217 : }
218 3045042 : LazyLoadLayer::Loaded(ref mut entries) => {
219 3045042 : let result = entries.pop_front().unwrap();
220 3045042 : if entries.is_empty() {
221 3042 : std::collections::binary_heap::PeekMut::pop(top);
222 3042000 : }
223 3045042 : return Poll::Ready(Some(Ok(result)));
224 : }
225 : }
226 : } else {
227 78 : return Poll::Ready(None);
228 : }
229 : }
230 3045120 : }
231 : }
232 :
233 : // Accumulate values at key boundaries
234 : pub struct KeySize<K> {
235 : pub key: K,
236 : pub num_values: u64,
237 : pub size: u64,
238 : }
239 :
240 2 : pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
241 2 : where
242 2 : K: Eq + PartialOrd + Display + Copy,
243 2 : I: Stream<Item = Result<D, E>>,
244 2 : D: CompactionDeltaEntry<'a, K>,
245 2 : {
246 : async_stream::try_stream! {
247 : // Initialize the state from the first value
248 : let mut input = std::pin::pin!(input);
249 :
250 : if let Some(first) = input.next().await {
251 : let first = first?;
252 : let mut accum: KeySize<K> = KeySize {
253 : key: first.key(),
254 : num_values: 1,
255 : size: first.size(),
256 : };
257 : let mut last_key = accum.key;
258 : while let Some(this) = input.next().await {
259 : let this = this?;
260 : if this.key() == accum.key {
261 : accum.size += this.size();
262 : accum.num_values += 1;
263 : } else {
264 : assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
265 : last_key = accum.key;
266 : yield accum;
267 : accum = KeySize {
268 : key: this.key(),
269 : num_values: 1,
270 : size: this.size(),
271 : };
272 : }
273 : }
274 : assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
275 : yield accum;
276 : }
277 : }
278 2 : }
|