LCOV - code coverage report
Current view: top level - pageserver/compaction/src - helpers.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 0.0 % 147 0
Test Date: 2024-02-29 11:57:12 Functions: 0.0 % 30 0

            Line data    Source code
       1              : //! This file contains generic utility functions over the interface types,
       2              : //! which could be handy for any compaction implementation.
       3              : use crate::interface::*;
       4              : 
       5              : use futures::future::BoxFuture;
       6              : use futures::{Stream, StreamExt};
       7              : use itertools::Itertools;
       8              : use pin_project_lite::pin_project;
       9              : use std::cmp::Ord;
      10              : use std::collections::BinaryHeap;
      11              : use std::collections::VecDeque;
      12              : use std::future::Future;
      13              : use std::ops::{DerefMut, Range};
      14              : use std::pin::Pin;
      15              : use std::task::{ready, Poll};
      16              : 
      17            0 : pub fn keyspace_total_size<K>(keyspace: &CompactionKeySpace<K>) -> u64
      18            0 : where
      19            0 :     K: CompactionKey,
      20            0 : {
      21            0 :     keyspace.iter().map(|r| K::key_range_size(r) as u64).sum()
      22            0 : }
      23              : 
      24            0 : pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
      25            0 :     !(a.end <= b.start || b.end <= a.start)
      26            0 : }
      27              : 
      28            0 : pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
      29            0 :     let x = std::mem::take(a);
      30            0 :     let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
      31            0 :         .into_iter()
      32            0 :         .kmerge_by(|a, b| a.start < b.start);
      33            0 :     let mut ranges = Vec::new();
      34            0 :     if let Some(first) = all_ranges_iter.next() {
      35            0 :         let (mut start, mut end) = (first.start, first.end);
      36              : 
      37            0 :         for r in all_ranges_iter {
      38            0 :             assert!(r.start >= start);
      39            0 :             if r.start > end {
      40            0 :                 ranges.push(start..end);
      41            0 :                 start = r.start;
      42            0 :                 end = r.end;
      43            0 :             } else if r.end > end {
      44            0 :                 end = r.end;
      45            0 :             }
      46              :         }
      47            0 :         ranges.push(start..end);
      48            0 :     }
      49            0 :     *a = ranges
      50            0 : }
      51              : 
      52            0 : pub fn intersect_keyspace<K: Ord + Clone + Copy>(
      53            0 :     a: &CompactionKeySpace<K>,
      54            0 :     r: &Range<K>,
      55            0 : ) -> CompactionKeySpace<K> {
      56            0 :     let mut ranges: Vec<Range<K>> = Vec::new();
      57              : 
      58            0 :     for x in a.iter() {
      59            0 :         if x.end <= r.start {
      60            0 :             continue;
      61            0 :         }
      62            0 :         if x.start >= r.end {
      63            0 :             break;
      64            0 :         }
      65            0 :         ranges.push(x.clone())
      66              :     }
      67              : 
      68              :     // trim the ends
      69            0 :     if let Some(first) = ranges.first_mut() {
      70            0 :         first.start = std::cmp::max(first.start, r.start);
      71            0 :     }
      72            0 :     if let Some(last) = ranges.last_mut() {
      73            0 :         last.end = std::cmp::min(last.end, r.end);
      74            0 :     }
      75            0 :     ranges
      76            0 : }
      77              : 
      78              : /// Create a stream that iterates through all DeltaEntrys among all input
      79              : /// layers, in key-lsn order.
      80              : ///
      81              : /// This is public because the create_delta() implementation likely wants to use this too
      82              : /// TODO: move to a more shared place
      83            0 : pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
      84            0 :     layers: &'a [E::DeltaLayer],
      85            0 :     ctx: &'a E::RequestContext,
      86            0 : ) -> MergeDeltaKeys<'a, E> {
      87            0 :     // Use a binary heap to merge the layers. Each input layer is initially
      88            0 :     // represented by a LazyLoadLayer::Unloaded element, which uses the start of
      89            0 :     // the layer's key range as the key. The first time a layer reaches the top
      90            0 :     // of the heap, all the keys of the layer are loaded into a sorted vector.
      91            0 :     //
      92            0 :     // This helps to keep the memory usage reasonable: we only need to hold in
      93            0 :     // memory the DeltaEntrys of the layers that overlap with the "current" key.
      94            0 :     let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
      95            0 :     for l in layers {
      96            0 :         heap.push(LazyLoadLayer::Unloaded(l));
      97            0 :     }
      98            0 :     MergeDeltaKeys {
      99            0 :         heap,
     100            0 :         ctx,
     101            0 :         load_future: None,
     102            0 :     }
     103            0 : }
     104              : 
     105              : enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
     106              :     Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
     107              :     Unloaded(&'a E::DeltaLayer),
     108              : }
     109              : impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
     110            0 :     fn key(&self) -> E::Key {
     111            0 :         match self {
     112            0 :             Self::Loaded(entries) => entries.front().unwrap().key(),
     113            0 :             Self::Unloaded(dl) => dl.key_range().start,
     114              :         }
     115            0 :     }
     116              : }
     117              : impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
     118            0 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     119            0 :         Some(self.cmp(other))
     120            0 :     }
     121              : }
     122              : impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
     123            0 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     124            0 :         // reverse order so that we get a min-heap
     125            0 :         other.key().cmp(&self.key())
     126            0 :     }
     127              : }
     128              : impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
     129            0 :     fn eq(&self, other: &Self) -> bool {
     130            0 :         self.key().eq(&other.key())
     131            0 :     }
     132              : }
     133              : impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
     134              : 
     135              : type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
     136              : 
     137              : // Stream returned by `merge_delta_keys`
     138              : pin_project! {
     139              : #[allow(clippy::type_complexity)]
     140              : pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
     141              :     heap: BinaryHeap<LazyLoadLayer<'a, E>>,
     142              : 
     143              :     #[pin]
     144              :     load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
     145              : 
     146              :     ctx: &'a E::RequestContext,
     147              : }
     148              : }
     149              : 
     150              : impl<'a, E> Stream for MergeDeltaKeys<'a, E>
     151              : where
     152              :     E: CompactionJobExecutor + 'a,
     153              : {
     154              :     type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
     155              : 
     156            0 :     fn poll_next(
     157            0 :         self: Pin<&mut Self>,
     158            0 :         cx: &mut std::task::Context<'_>,
     159            0 :     ) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
     160            0 :         let mut this = self.project();
     161              :         loop {
     162            0 :             if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
     163              :                 // We are waiting for loading the keys to finish
     164            0 :                 match ready!(load_future.as_mut().poll(cx)) {
     165            0 :                     Ok(entries) => {
     166            0 :                         this.load_future.set(None);
     167            0 :                         *this.heap.peek_mut().unwrap() =
     168            0 :                             LazyLoadLayer::Loaded(VecDeque::from(entries));
     169            0 :                     }
     170            0 :                     Err(e) => {
     171            0 :                         return Poll::Ready(Some(Err(e)));
     172              :                     }
     173              :                 }
     174            0 :             }
     175              : 
     176              :             // If the topmost layer in the heap hasn't been loaded yet, start
     177              :             // loading it. Otherwise return the next entry from it and update
     178              :             // the layer's position in the heap (this decreaseKey operation is
     179              :             // performed implicitly when `top` is dropped).
     180            0 :             if let Some(mut top) = this.heap.peek_mut() {
     181            0 :                 match top.deref_mut() {
     182            0 :                     LazyLoadLayer::Unloaded(ref mut l) => {
     183            0 :                         let fut = l.load_keys(this.ctx);
     184            0 :                         this.load_future.set(Some(fut));
     185            0 :                         continue;
     186              :                     }
     187            0 :                     LazyLoadLayer::Loaded(ref mut entries) => {
     188            0 :                         let result = entries.pop_front().unwrap();
     189            0 :                         if entries.is_empty() {
     190            0 :                             std::collections::binary_heap::PeekMut::pop(top);
     191            0 :                         }
     192            0 :                         return Poll::Ready(Some(Ok(result)));
     193              :                     }
     194              :                 }
     195              :             } else {
     196            0 :                 return Poll::Ready(None);
     197              :             }
     198              :         }
     199            0 :     }
     200              : }
     201              : 
     202              : // Accumulate values at key boundaries
     203              : pub struct KeySize<K> {
     204              :     pub key: K,
     205              :     pub num_values: u64,
     206              :     pub size: u64,
     207              : }
     208              : 
     209            0 : pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
     210            0 : where
     211            0 :     K: Eq,
     212            0 :     I: Stream<Item = Result<D, E>>,
     213            0 :     D: CompactionDeltaEntry<'a, K>,
     214            0 : {
     215            0 :     async_stream::try_stream! {
     216            0 :         // Initialize the state from the first value
     217            0 :         let mut input = std::pin::pin!(input);
     218              : 
     219            0 :         if let Some(first) = input.next().await {
     220            0 :             let first = first?;
     221            0 :             let mut accum: KeySize<K> = KeySize {
     222            0 :                 key: first.key(),
     223            0 :                 num_values: 1,
     224            0 :                 size: first.size(),
     225            0 :             };
     226            0 :             while let Some(this) = input.next().await {
     227            0 :                 let this = this?;
     228            0 :                 if this.key() == accum.key {
     229            0 :                     accum.size += this.size();
     230            0 :                     accum.num_values += 1;
     231            0 :                 } else {
     232            0 :                     yield accum;
     233            0 :                     accum = KeySize {
     234            0 :                         key: this.key(),
     235            0 :                         num_values: 1,
     236            0 :                         size: this.size(),
     237            0 :                     };
     238              :                 }
     239              :             }
     240            0 :             yield accum;
     241            0 :         }
     242              :     }
     243            0 : }
        

Generated by: LCOV version 2.1-beta