LCOV - code coverage report
Current view: top level - pageserver/compaction/src - helpers.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 91.8 % 147 135
Test Date: 2024-05-10 13:18:37 Functions: 42.5 % 40 17

            Line data    Source code
       1              : //! This file contains generic utility functions over the interface types,
       2              : //! which could be handy for any compaction implementation.
       3              : use crate::interface::*;
       4              : 
       5              : use futures::future::BoxFuture;
       6              : use futures::{Stream, StreamExt};
       7              : use itertools::Itertools;
       8              : use pageserver_api::shard::ShardIdentity;
       9              : use pin_project_lite::pin_project;
      10              : use std::collections::BinaryHeap;
      11              : use std::collections::VecDeque;
      12              : use std::fmt::Display;
      13              : use std::future::Future;
      14              : use std::ops::{DerefMut, Range};
      15              : use std::pin::Pin;
      16              : use std::task::{ready, Poll};
      17              : use utils::lsn::Lsn;
      18              : 
      19            2 : pub fn keyspace_total_size<K>(
      20            2 :     keyspace: &CompactionKeySpace<K>,
      21            2 :     shard_identity: &ShardIdentity,
      22            2 : ) -> u64
      23            2 : where
      24            2 :     K: CompactionKey,
      25            2 : {
      26            2 :     keyspace
      27            2 :         .iter()
      28            2 :         .map(|r| K::key_range_size(r, shard_identity) as u64)
      29            2 :         .sum()
      30            2 : }
      31              : 
      32         3354 : pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
      33         3354 :     !(a.end <= b.start || b.end <= a.start)
      34         3354 : }
      35              : 
      36          798 : pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
      37          798 :     let x = std::mem::take(a);
      38          798 :     let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
      39          798 :         .into_iter()
      40          798 :         .kmerge_by(|a, b| a.start < b.start);
      41          798 :     let mut ranges = Vec::new();
      42          798 :     if let Some(first) = all_ranges_iter.next() {
      43          798 :         let (mut start, mut end) = (first.start, first.end);
      44              : 
      45         1594 :         for r in all_ranges_iter {
      46          796 :             assert!(r.start >= start);
      47          796 :             if r.start > end {
      48            0 :                 ranges.push(start..end);
      49            0 :                 start = r.start;
      50            0 :                 end = r.end;
      51          796 :             } else if r.end > end {
      52            0 :                 end = r.end;
      53          796 :             }
      54              :         }
      55          798 :         ranges.push(start..end);
      56            0 :     }
      57          798 :     *a = ranges
      58          798 : }
      59              : 
      60            2 : pub fn intersect_keyspace<K: Ord + Clone + Copy>(
      61            2 :     a: &CompactionKeySpace<K>,
      62            2 :     r: &Range<K>,
      63            2 : ) -> CompactionKeySpace<K> {
      64            2 :     let mut ranges: Vec<Range<K>> = Vec::new();
      65              : 
      66            2 :     for x in a.iter() {
      67            2 :         if x.end <= r.start {
      68            0 :             continue;
      69            2 :         }
      70            2 :         if x.start >= r.end {
      71            0 :             break;
      72            2 :         }
      73            2 :         ranges.push(x.clone())
      74              :     }
      75              : 
      76              :     // trim the ends
      77            2 :     if let Some(first) = ranges.first_mut() {
      78            2 :         first.start = std::cmp::max(first.start, r.start);
      79            2 :     }
      80            2 :     if let Some(last) = ranges.last_mut() {
      81            2 :         last.end = std::cmp::min(last.end, r.end);
      82            2 :     }
      83            2 :     ranges
      84            2 : }
      85              : 
      86              : /// Create a stream that iterates through all DeltaEntrys among all input
      87              : /// layers, in key-lsn order.
      88              : ///
      89              : /// This is public because the create_delta() implementation likely wants to use this too
      90              : /// TODO: move to a more shared place
      91           78 : pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
      92           78 :     layers: &'a [E::DeltaLayer],
      93           78 :     ctx: &'a E::RequestContext,
      94           78 : ) -> MergeDeltaKeys<'a, E> {
      95           78 :     // Use a binary heap to merge the layers. Each input layer is initially
      96           78 :     // represented by a LazyLoadLayer::Unloaded element, which uses the start of
      97           78 :     // the layer's key range as the key. The first time a layer reaches the top
      98           78 :     // of the heap, all the keys of the layer are loaded into a sorted vector.
      99           78 :     //
     100           78 :     // This helps to keep the memory usage reasonable: we only need to hold in
     101           78 :     // memory the DeltaEntrys of the layers that overlap with the "current" key.
     102           78 :     let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
     103         3120 :     for l in layers {
     104         3042 :         heap.push(LazyLoadLayer::Unloaded(l));
     105         3042 :     }
     106           78 :     MergeDeltaKeys {
     107           78 :         heap,
     108           78 :         ctx,
     109           78 :         load_future: None,
     110           78 :     }
     111           78 : }
     112              : 
     113            2 : pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
     114            2 :     layers: &'a [E::DeltaLayer],
     115            2 :     ctx: &'a E::RequestContext,
     116            2 : ) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
     117            2 : {
     118            2 :     let mut keys = Vec::new();
     119           80 :     for l in layers {
     120              :         // Boxing and casting to LoadFuture is required to obtain the right Sync bound.
     121              :         // If we do l.load_keys(ctx).await? directly, there is a compilation error.
     122           78 :         let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
     123           78 :         keys.extend(load_future.await?.into_iter());
     124              :     }
     125      1032710 :     keys.sort_by_key(|k| (k.key(), k.lsn()));
     126            2 :     let stream = futures::stream::iter(keys.into_iter());
     127            2 :     Ok(stream)
     128            2 : }
     129              : 
     130              : enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
     131              :     Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
     132              :     Unloaded(&'a E::DeltaLayer),
     133              : }
     134              : impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
     135     49040550 :     fn min_key(&self) -> E::Key {
     136     49040550 :         match self {
     137     48976239 :             Self::Loaded(entries) => entries.front().unwrap().key(),
     138        64311 :             Self::Unloaded(dl) => dl.key_range().start,
     139              :         }
     140     49040550 :     }
     141     49040550 :     fn min_lsn(&self) -> Lsn {
     142     49040550 :         match self {
     143     48976239 :             Self::Loaded(entries) => entries.front().unwrap().lsn(),
     144        64311 :             Self::Unloaded(dl) => dl.lsn_range().start,
     145              :         }
     146     49040550 :     }
     147              : }
     148              : impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
     149     24520275 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     150     24520275 :         Some(self.cmp(other))
     151     24520275 :     }
     152              : }
     153              : impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
     154     24520275 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     155     24520275 :         // reverse order so that we get a min-heap
     156     24520275 :         (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
     157     24520275 :     }
     158              : }
     159              : impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
     160            0 :     fn eq(&self, other: &Self) -> bool {
     161            0 :         self.cmp(other) == std::cmp::Ordering::Equal
     162            0 :     }
     163              : }
     164              : impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
     165              : 
     166              : type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
     167              : 
     168              : // Stream returned by `merge_delta_keys`
     169              : pin_project! {
     170              : #[allow(clippy::type_complexity)]
     171              : pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
     172              :     heap: BinaryHeap<LazyLoadLayer<'a, E>>,
     173              : 
     174              :     #[pin]
     175              :     load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
     176              : 
     177              :     ctx: &'a E::RequestContext,
     178              : }
     179              : }
     180              : 
     181              : impl<'a, E> Stream for MergeDeltaKeys<'a, E>
     182              : where
     183              :     E: CompactionJobExecutor + 'a,
     184              : {
     185              :     type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
     186              : 
     187      3045120 :     fn poll_next(
     188      3045120 :         self: Pin<&mut Self>,
     189      3045120 :         cx: &mut std::task::Context<'_>,
     190      3045120 :     ) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
     191      3045120 :         let mut this = self.project();
     192              :         loop {
     193      3048162 :             if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
     194              :                 // We are waiting for loading the keys to finish
     195         3042 :                 match ready!(load_future.as_mut().poll(cx)) {
     196         3042 :                     Ok(entries) => {
     197         3042 :                         this.load_future.set(None);
     198         3042 :                         *this.heap.peek_mut().unwrap() =
     199         3042 :                             LazyLoadLayer::Loaded(VecDeque::from(entries));
     200         3042 :                     }
     201            0 :                     Err(e) => {
     202            0 :                         return Poll::Ready(Some(Err(e)));
     203              :                     }
     204              :                 }
     205      3045120 :             }
     206              : 
     207              :             // If the topmost layer in the heap hasn't been loaded yet, start
     208              :             // loading it. Otherwise return the next entry from it and update
     209              :             // the layer's position in the heap (this decreaseKey operation is
     210              :             // performed implicitly when `top` is dropped).
     211      3048162 :             if let Some(mut top) = this.heap.peek_mut() {
     212      3048084 :                 match top.deref_mut() {
     213         3042 :                     LazyLoadLayer::Unloaded(ref mut l) => {
     214         3042 :                         let fut = l.load_keys(this.ctx);
     215         3042 :                         this.load_future.set(Some(Box::pin(fut)));
     216         3042 :                         continue;
     217              :                     }
     218      3045042 :                     LazyLoadLayer::Loaded(ref mut entries) => {
     219      3045042 :                         let result = entries.pop_front().unwrap();
     220      3045042 :                         if entries.is_empty() {
     221         3042 :                             std::collections::binary_heap::PeekMut::pop(top);
     222      3042000 :                         }
     223      3045042 :                         return Poll::Ready(Some(Ok(result)));
     224              :                     }
     225              :                 }
     226              :             } else {
     227           78 :                 return Poll::Ready(None);
     228              :             }
     229              :         }
     230      3045120 :     }
     231              : }
     232              : 
     233              : // Accumulate values at key boundaries
     234              : pub struct KeySize<K> {
     235              :     pub key: K,
     236              :     pub num_values: u64,
     237              :     pub size: u64,
     238              : }
     239              : 
     240            2 : pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
     241            2 : where
     242            2 :     K: Eq + PartialOrd + Display + Copy,
     243            2 :     I: Stream<Item = Result<D, E>>,
     244            2 :     D: CompactionDeltaEntry<'a, K>,
     245            2 : {
     246              :     async_stream::try_stream! {
     247              :         // Initialize the state from the first value
     248              :         let mut input = std::pin::pin!(input);
     249              : 
     250              :         if let Some(first) = input.next().await {
     251              :             let first = first?;
     252              :             let mut accum: KeySize<K> = KeySize {
     253              :                 key: first.key(),
     254              :                 num_values: 1,
     255              :                 size: first.size(),
     256              :             };
     257              :             let mut last_key = accum.key;
     258              :             while let Some(this) = input.next().await {
     259              :                 let this = this?;
     260              :                 if this.key() == accum.key {
     261              :                     accum.size += this.size();
     262              :                     accum.num_values += 1;
     263              :                 } else {
     264              :                     assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
     265              :                     last_key = accum.key;
     266              :                     yield accum;
     267              :                     accum = KeySize {
     268              :                         key: this.key(),
     269              :                         num_values: 1,
     270              :                         size: this.size(),
     271              :                     };
     272              :                 }
     273              :             }
     274              :             assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
     275              :             yield accum;
     276              :         }
     277              :     }
     278            2 : }
        

Generated by: LCOV version 2.1-beta