LCOV - code coverage report
Current view: top level - pageserver/compaction/src - helpers.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 93.8 % 195 183
Test Date: 2024-11-20 17:59:39 Functions: 45.2 % 42 19

            Line data    Source code
       1              : //! This file contains generic utility functions over the interface types,
       2              : //! which could be handy for any compaction implementation.
       3              : use crate::interface::*;
       4              : 
       5              : use futures::future::BoxFuture;
       6              : use futures::{Stream, StreamExt};
       7              : use itertools::Itertools;
       8              : use pageserver_api::shard::ShardIdentity;
       9              : use pin_project_lite::pin_project;
      10              : use std::collections::BinaryHeap;
      11              : use std::collections::VecDeque;
      12              : use std::fmt::Display;
      13              : use std::future::Future;
      14              : use std::ops::{DerefMut, Range};
      15              : use std::pin::Pin;
      16              : use std::task::{ready, Poll};
      17              : use utils::lsn::Lsn;
      18              : 
      19              : pub const PAGE_SZ: u64 = 8192;
      20              : 
      21            3 : pub fn keyspace_total_size<K>(
      22            3 :     keyspace: &CompactionKeySpace<K>,
      23            3 :     shard_identity: &ShardIdentity,
      24            3 : ) -> u64
      25            3 : where
      26            3 :     K: CompactionKey,
      27            3 : {
      28            3 :     keyspace
      29            3 :         .iter()
      30            3 :         .map(|r| K::key_range_size(r, shard_identity) as u64)
      31            3 :         .sum()
      32            3 : }
      33              : 
      34        13821 : pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
      35        13821 :     !(a.end <= b.start || b.end <= a.start)
      36        13821 : }
      37              : 
      38              : /// Whether a fully contains b, example as below
      39              : /// ```plain
      40              : /// |      a       |
      41              : ///       |  b  |
      42              : /// ```
      43          120 : pub fn fully_contains<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
      44          120 :     a.start <= b.start && a.end >= b.end
      45          120 : }
      46              : 
      47         2397 : pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
      48         2397 :     let x = std::mem::take(a);
      49         2397 :     let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
      50         2397 :         .into_iter()
      51         2397 :         .kmerge_by(|a, b| a.start < b.start);
      52         2397 :     let mut ranges = Vec::new();
      53         2397 :     if let Some(first) = all_ranges_iter.next() {
      54         2397 :         let (mut start, mut end) = (first.start, first.end);
      55              : 
      56         4792 :         for r in all_ranges_iter {
      57         2395 :             assert!(r.start >= start);
      58         2395 :             if r.start > end {
      59            0 :                 ranges.push(start..end);
      60            0 :                 start = r.start;
      61            0 :                 end = r.end;
      62         2395 :             } else if r.end > end {
      63            0 :                 end = r.end;
      64         2395 :             }
      65              :         }
      66         2397 :         ranges.push(start..end);
      67            0 :     }
      68         2397 :     *a = ranges
      69         2397 : }
      70              : 
      71            3 : pub fn intersect_keyspace<K: Ord + Clone + Copy>(
      72            3 :     a: &CompactionKeySpace<K>,
      73            3 :     r: &Range<K>,
      74            3 : ) -> CompactionKeySpace<K> {
      75            3 :     let mut ranges: Vec<Range<K>> = Vec::new();
      76              : 
      77            3 :     for x in a.iter() {
      78            3 :         if x.end <= r.start {
      79            0 :             continue;
      80            3 :         }
      81            3 :         if x.start >= r.end {
      82            0 :             break;
      83            3 :         }
      84            3 :         ranges.push(x.clone())
      85              :     }
      86              : 
      87              :     // trim the ends
      88            3 :     if let Some(first) = ranges.first_mut() {
      89            3 :         first.start = std::cmp::max(first.start, r.start);
      90            3 :     }
      91            3 :     if let Some(last) = ranges.last_mut() {
      92            3 :         last.end = std::cmp::min(last.end, r.end);
      93            3 :     }
      94            3 :     ranges
      95            3 : }
      96              : 
      97              : /// Create a stream that iterates through all DeltaEntrys among all input
      98              : /// layers, in key-lsn order.
      99              : ///
     100              : /// This is public because the create_delta() implementation likely wants to use this too
     101              : /// TODO: move to a more shared place
     102           49 : pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
     103           49 :     layers: &'a [E::DeltaLayer],
     104           49 :     ctx: &'a E::RequestContext,
     105           49 : ) -> MergeDeltaKeys<'a, E> {
     106           49 :     // Use a binary heap to merge the layers. Each input layer is initially
     107           49 :     // represented by a LazyLoadLayer::Unloaded element, which uses the start of
     108           49 :     // the layer's key range as the key. The first time a layer reaches the top
     109           49 :     // of the heap, all the keys of the layer are loaded into a sorted vector.
     110           49 :     //
     111           49 :     // This helps to keep the memory usage reasonable: we only need to hold in
     112           49 :     // memory the DeltaEntrys of the layers that overlap with the "current" key.
     113           49 :     let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
     114         1610 :     for l in layers {
     115         1561 :         heap.push(LazyLoadLayer::Unloaded(l));
     116         1561 :     }
     117           49 :     MergeDeltaKeys {
     118           49 :         heap,
     119           49 :         ctx,
     120           49 :         load_future: None,
     121           49 :     }
     122           49 : }
     123              : 
     124            3 : pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
     125            3 :     layers: &'a [E::DeltaLayer],
     126            3 :     ctx: &'a E::RequestContext,
     127            3 : ) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
     128            3 : {
     129            3 :     let mut keys = Vec::new();
     130           50 :     for l in layers {
     131              :         // Boxing and casting to LoadFuture is required to obtain the right Sync bound.
     132              :         // If we do l.load_keys(ctx).await? directly, there is a compilation error.
     133           47 :         let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
     134           47 :         keys.extend(load_future.await?.into_iter());
     135              :     }
     136      5296342 :     keys.sort_by_key(|k| (k.key(), k.lsn()));
     137            3 :     let stream = futures::stream::iter(keys.into_iter());
     138            3 :     Ok(stream)
     139            3 : }
     140              : 
     141              : enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
     142              :     Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
     143              :     Unloaded(&'a E::DeltaLayer),
     144              : }
     145              : impl<E: CompactionJobExecutor> LazyLoadLayer<'_, E> {
     146     40914026 :     fn min_key(&self) -> E::Key {
     147     40914026 :         match self {
     148     33609559 :             Self::Loaded(entries) => entries.front().unwrap().key(),
     149      7304467 :             Self::Unloaded(dl) => dl.key_range().start,
     150              :         }
     151     40914026 :     }
     152     40914026 :     fn min_lsn(&self) -> Lsn {
     153     40914026 :         match self {
     154     33609559 :             Self::Loaded(entries) => entries.front().unwrap().lsn(),
     155      7304467 :             Self::Unloaded(dl) => dl.lsn_range().start,
     156              :         }
     157     40914026 :     }
     158              : }
     159              : impl<E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'_, E> {
     160     20457013 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     161     20457013 :         Some(self.cmp(other))
     162     20457013 :     }
     163              : }
     164              : impl<E: CompactionJobExecutor> Ord for LazyLoadLayer<'_, E> {
     165     20457013 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     166     20457013 :         // reverse order so that we get a min-heap
     167     20457013 :         (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
     168     20457013 :     }
     169              : }
     170              : impl<E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'_, E> {
     171            0 :     fn eq(&self, other: &Self) -> bool {
     172            0 :         self.cmp(other) == std::cmp::Ordering::Equal
     173            0 :     }
     174              : }
     175              : impl<E: CompactionJobExecutor> Eq for LazyLoadLayer<'_, E> {}
     176              : 
     177              : type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
     178              : 
     179              : // Stream returned by `merge_delta_keys`
     180              : pin_project! {
     181              : #[allow(clippy::type_complexity)]
     182              : pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
     183              :     heap: BinaryHeap<LazyLoadLayer<'a, E>>,
     184              : 
     185              :     #[pin]
     186              :     load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
     187              : 
     188              :     ctx: &'a E::RequestContext,
     189              : }
     190              : }
     191              : 
     192              : impl<'a, E> Stream for MergeDeltaKeys<'a, E>
     193              : where
     194              :     E: CompactionJobExecutor + 'a,
     195              : {
     196              :     type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
     197              : 
     198      5522610 :     fn poll_next(
     199      5522610 :         self: Pin<&mut Self>,
     200      5522610 :         cx: &mut std::task::Context<'_>,
     201      5522610 :     ) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
     202      5522610 :         let mut this = self.project();
     203              :         loop {
     204      5524171 :             if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
     205              :                 // We are waiting for loading the keys to finish
     206         1561 :                 match ready!(load_future.as_mut().poll(cx)) {
     207         1561 :                     Ok(entries) => {
     208         1561 :                         this.load_future.set(None);
     209         1561 :                         *this.heap.peek_mut().unwrap() =
     210         1561 :                             LazyLoadLayer::Loaded(VecDeque::from(entries));
     211         1561 :                     }
     212            0 :                     Err(e) => {
     213            0 :                         return Poll::Ready(Some(Err(e)));
     214              :                     }
     215              :                 }
     216      5522610 :             }
     217              : 
     218              :             // If the topmost layer in the heap hasn't been loaded yet, start
     219              :             // loading it. Otherwise return the next entry from it and update
     220              :             // the layer's position in the heap (this decreaseKey operation is
     221              :             // performed implicitly when `top` is dropped).
     222      5524171 :             if let Some(mut top) = this.heap.peek_mut() {
     223      5524122 :                 match top.deref_mut() {
     224         1561 :                     LazyLoadLayer::Unloaded(ref mut l) => {
     225         1561 :                         let fut = l.load_keys(this.ctx);
     226         1561 :                         this.load_future.set(Some(Box::pin(fut)));
     227         1561 :                         continue;
     228              :                     }
     229      5522561 :                     LazyLoadLayer::Loaded(ref mut entries) => {
     230      5522561 :                         let result = entries.pop_front().unwrap();
     231      5522561 :                         if entries.is_empty() {
     232         1561 :                             std::collections::binary_heap::PeekMut::pop(top);
     233      5521000 :                         }
     234      5522561 :                         return Poll::Ready(Some(Ok(result)));
     235              :                     }
     236              :                 }
     237              :             } else {
     238           49 :                 return Poll::Ready(None);
     239              :             }
     240              :         }
     241      5522610 :     }
     242              : }
     243              : 
     244              : // Accumulate values at key boundaries
     245              : pub struct KeySize<K> {
     246              :     pub key: K,
     247              :     pub num_values: u64,
     248              :     pub size: u64,
     249              :     /// The lsns to partition at (if empty then no per-lsn partitioning)
     250              :     pub partition_lsns: Vec<(Lsn, u64)>,
     251              : }
     252              : 
     253            3 : pub fn accum_key_values<'a, I, K, D, E>(
     254            3 :     input: I,
     255            3 :     target_size: u64,
     256            3 : ) -> impl Stream<Item = Result<KeySize<K>, E>>
     257            3 : where
     258            3 :     K: Eq + PartialOrd + Display + Copy,
     259            3 :     I: Stream<Item = Result<D, E>>,
     260            3 :     D: CompactionDeltaEntry<'a, K>,
     261            3 : {
     262            3 :     async_stream::try_stream! {
     263            3 :         // Initialize the state from the first value
     264            3 :         let mut input = std::pin::pin!(input);
     265            3 : 
     266            3 :         if let Some(first) = input.next().await {
     267            3 :             let first = first?;
     268            3 :             let mut part_size = first.size();
     269            3 :             let mut accum: KeySize<K> = KeySize {
     270            3 :                 key: first.key(),
     271            3 :                 num_values: 1,
     272            3 :                 size: part_size,
     273            3 :                 partition_lsns: Vec::new(),
     274            3 :             };
     275            3 :             let mut last_key = accum.key;
     276            3 :             while let Some(this) = input.next().await {
     277            3 :                 let this = this?;
     278            3 :                 if this.key() == accum.key {
     279            3 :                     let add_size = this.size();
     280            3 :                     if part_size + add_size > target_size {
     281            3 :                         accum.partition_lsns.push((this.lsn(), part_size));
     282            3 :                         part_size = 0;
     283            3 :                     }
     284            3 :                     part_size += add_size;
     285            3 :                     accum.size += add_size;
     286            3 :                     accum.num_values += 1;
     287            3 :                 } else {
     288            3 :                     assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
     289            3 :                     last_key = accum.key;
     290            3 :                     yield accum;
     291            3 :                     part_size = this.size();
     292            3 :                     accum = KeySize {
     293            3 :                         key: this.key(),
     294            3 :                         num_values: 1,
     295            3 :                         size: part_size,
     296            3 :                         partition_lsns: Vec::new(),
     297            3 :                     };
     298            3 :                 }
     299            3 :             }
     300            3 :             assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
     301            3 :             yield accum;
     302            3 :         }
     303            3 :     }
     304            3 : }
        

Generated by: LCOV version 2.1-beta