LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - keyspace.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 97.1 % 967 939
Test Date: 2025-03-12 16:10:49 Functions: 97.1 % 70 68

            Line data    Source code
       1              : use std::ops::Range;
       2              : 
       3              : use itertools::Itertools;
       4              : use postgres_ffi::BLCKSZ;
       5              : 
       6              : use crate::key::Key;
       7              : use crate::shard::{ShardCount, ShardIdentity};
       8              : 
       9              : ///
      10              : /// Represents a set of Keys, in a compact form.
      11              : ///
      12              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      13              : pub struct KeySpace {
      14              :     /// Contiguous ranges of keys that belong to the key space. In key order,
      15              :     /// and with no overlap.
      16              :     pub ranges: Vec<Range<Key>>,
      17              : }
      18              : 
      19              : impl std::fmt::Display for KeySpace {
      20            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      21            0 :         write!(f, "[")?;
      22            0 :         for range in &self.ranges {
      23            0 :             write!(f, "{}..{},", range.start, range.end)?;
      24              :         }
      25            0 :         write!(f, "]")
      26            0 :     }
      27              : }
      28              : 
      29              : /// A wrapper type for sparse keyspaces.
      30              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      31              : pub struct SparseKeySpace(pub KeySpace);
      32              : 
      33              : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
      34              : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
      35              : /// shard.
      36              : ///
      37              : /// When we iterate over keys within this object, we will skip any keys that don't belong
      38              : /// to this shard.
      39              : ///
      40              : /// The start + end keys may not belong to the shard: these specify where layer files should
      41              : /// start  + end, but we will never actually read/write those keys.
      42              : #[derive(Clone, Debug, PartialEq, Eq)]
      43              : pub struct ShardedRange<'a> {
      44              :     pub shard_identity: &'a ShardIdentity,
      45              :     pub range: Range<Key>,
      46              : }
      47              : 
      48              : // Calculate the size of a range within the blocks of the same relation, or spanning only the
      49              : // top page in the previous relation's space.
      50     13903516 : pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
      51     13903516 :     debug_assert!(is_contiguous_range(range));
      52     13903516 :     if range.start.field6 == 0xffffffff {
      53      1654212 :         range.end.field6 + 1
      54              :     } else {
      55     12249304 :         range.end.field6 - range.start.field6
      56              :     }
      57     13903516 : }
      58              : 
      59              : /// Return true if this key range includes only keys in the same relation's data blocks, or
      60              : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
      61              : ///
      62              : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
      63              : /// be on our shard.  Later in ShardedRange we do the extra work to figure out how much
      64              : /// of a given contiguous range is present on one shard.
      65              : ///
      66              : /// This matters, because:
      67              : /// - Within such ranges, keys are used contiguously.  Outside such ranges it is sparse.
      68              : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
      69     27810735 : pub fn is_contiguous_range(range: &Range<Key>) -> bool {
      70     27810735 :     range.start.field1 == range.end.field1
      71     27806887 :         && range.start.field2 == range.end.field2
      72     27806883 :         && range.start.field3 == range.end.field3
      73     27806875 :         && range.start.field4 == range.end.field4
      74     27806874 :         && (range.start.field5 == range.end.field5
      75      3308428 :             || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
      76     27810735 : }
      77              : 
      78              : impl<'a> ShardedRange<'a> {
      79         4895 :     pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
      80         4895 :         Self {
      81         4895 :             shard_identity,
      82         4895 :             range,
      83         4895 :         }
      84         4895 :     }
      85              : 
      86              :     /// Break up this range into chunks, each of which has at least one local key in it if the
      87              :     /// total range has at least one local key.
      88         4888 :     pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
      89         4888 :         // Optimization for single-key case (e.g. logical size keys)
      90         4888 :         if self.range.end == self.range.start.add(1) {
      91         3225 :             return vec![(
      92         3225 :                 if self.shard_identity.is_key_disposable(&self.range.start) {
      93            0 :                     0
      94              :                 } else {
      95         3225 :                     1
      96              :                 },
      97         3225 :                 self.range,
      98              :             )];
      99         1663 :         }
     100         1663 : 
     101         1663 :         if !is_contiguous_range(&self.range) {
     102              :             // Ranges that span relations are not fragmented.  We only get these ranges as a result
     103              :             // of operations that act on existing layers, so we trust that the existing range is
     104              :             // reasonably small.
     105            2 :             return vec![(u32::MAX, self.range)];
     106         1661 :         }
     107         1661 : 
     108         1661 :         let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
     109         1661 : 
     110         1661 :         let mut cursor = self.range.start;
     111         3474 :         while cursor < self.range.end {
     112         1813 :             let advance_by = self.distance_to_next_boundary(cursor);
     113         1813 :             let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
     114              : 
     115              :             // If the previous fragment is undersized, then we seek to consume enough
     116              :             // blocks to complete it.
     117         1813 :             let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
     118          152 :                 Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
     119          141 :                 Some(frag) => {
     120          141 :                     // Prev block is complete, want the full number.
     121          141 :                     (
     122          141 :                         target_nblocks,
     123          141 :                         if is_fragment_disposable {
     124              :                             // If this current range will be empty (not shard-local data), we will merge into previous
     125            0 :                             Some(frag)
     126              :                         } else {
     127          141 :                             None
     128              :                         },
     129              :                     )
     130              :                 }
     131              :                 None => {
     132              :                     // First iteration, want the full number
     133         1661 :                     (target_nblocks, None)
     134              :                 }
     135              :             };
     136              : 
     137         1813 :             let advance_by = if is_fragment_disposable {
     138          468 :                 advance_by
     139              :             } else {
     140         1345 :                 std::cmp::min(advance_by, want_blocks)
     141              :             };
     142              : 
     143         1813 :             let next_cursor = cursor.add(advance_by);
     144              : 
     145         1813 :             let this_frag = (
     146         1813 :                 if is_fragment_disposable {
     147          468 :                     0
     148              :                 } else {
     149         1345 :                     advance_by
     150              :                 },
     151         1813 :                 cursor..next_cursor,
     152         1813 :             );
     153         1813 :             cursor = next_cursor;
     154              : 
     155         1813 :             if let Some(last_fragment) = merge_last_fragment {
     156           11 :                 // Previous fragment was short or this one is empty, merge into it
     157           11 :                 last_fragment.0 += this_frag.0;
     158           11 :                 last_fragment.1.end = this_frag.1.end;
     159         1802 :             } else {
     160         1802 :                 fragments.push(this_frag);
     161         1802 :             }
     162              :         }
     163              : 
     164         1661 :         fragments
     165         4888 :     }
     166              : 
     167              :     /// Estimate the physical pages that are within this range, on this shard.  This returns
     168              :     /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
     169         1019 :     pub fn page_count(&self) -> u32 {
     170         1019 :         // Special cases for single keys like logical sizes
     171         1019 :         if self.range.end == self.range.start.add(1) {
     172            6 :             return if self.shard_identity.is_key_disposable(&self.range.start) {
     173            3 :                 0
     174              :             } else {
     175            3 :                 1
     176              :             };
     177         1013 :         }
     178         1013 : 
     179         1013 :         // We can only do an authentic calculation of contiguous key ranges
     180         1013 :         if !is_contiguous_range(&self.range) {
     181            4 :             return u32::MAX;
     182         1009 :         }
     183         1009 : 
     184         1009 :         // Special case for single sharded tenants: our logical and physical sizes are the same
     185         1009 :         if self.shard_identity.count < ShardCount::new(2) {
     186          524 :             return contiguous_range_len(&self.range);
     187          485 :         }
     188          485 : 
     189          485 :         // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
     190          485 :         // to Self, and add the stripe's block count to our total if so.
     191          485 :         let mut result: u64 = 0;
     192          485 :         let mut cursor = self.range.start;
     193          981 :         while cursor < self.range.end {
     194              :             // Count up to the next stripe_size boundary or end of range
     195          496 :             let advance_by = self.distance_to_next_boundary(cursor);
     196          496 : 
     197          496 :             // If this blocks in this stripe belong to us, add them to our count
     198          496 :             if !self.shard_identity.is_key_disposable(&cursor) {
     199           28 :                 result += advance_by as u64;
     200          468 :             }
     201              : 
     202          496 :             cursor = cursor.add(advance_by);
     203              :         }
     204              : 
     205          485 :         if result > u32::MAX as u64 {
     206            0 :             u32::MAX
     207              :         } else {
     208          485 :             result as u32
     209              :         }
     210         1019 :     }
     211              : 
     212              :     /// Advance the cursor to the next potential fragment boundary: this is either
     213              :     /// a stripe boundary, or the end of the range.
     214         2309 :     fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
     215         2309 :         let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
     216         2309 : 
     217         2309 :         if self.shard_identity.count < ShardCount::new(2) {
     218              :             // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
     219         1310 :             return distance_to_range_end;
     220          999 :         }
     221          999 : 
     222          999 :         if cursor.field6 == 0xffffffff {
     223              :             // We are wrapping from one relation's logical size to the next relation's first data block
     224            4 :             return 1;
     225          995 :         }
     226          995 : 
     227          995 :         let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
     228          995 :         let stripe_remainder = self.shard_identity.stripe_size.0
     229          995 :             - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
     230          995 : 
     231          995 :         if cfg!(debug_assertions) {
     232              :             // We should never overflow field5 and field6 -- our callers check this earlier
     233              :             // and would have returned their u32::MAX cases if the input range violated this.
     234          995 :             let next_cursor = cursor.add(stripe_remainder);
     235          995 :             debug_assert!(
     236          995 :                 next_cursor.field1 == cursor.field1
     237          995 :                     && next_cursor.field2 == cursor.field2
     238          995 :                     && next_cursor.field3 == cursor.field3
     239          995 :                     && next_cursor.field4 == cursor.field4
     240          995 :                     && next_cursor.field5 == cursor.field5
     241              :             )
     242            0 :         }
     243              : 
     244          995 :         std::cmp::min(stripe_remainder, distance_to_range_end)
     245         2309 :     }
     246              : 
     247              :     /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
     248              :     /// this function simply calculates the number of pages in the space, without accounting for those
     249              :     /// pages that would not actually be stored on this node.
     250              :     ///
     251              :     /// Don't use this function in code that works with physical entities like layer files.
     252     13904539 :     pub fn raw_size(range: &Range<Key>) -> u32 {
     253     13904539 :         if is_contiguous_range(range) {
     254     13900683 :             contiguous_range_len(range)
     255              :         } else {
     256         3856 :             u32::MAX
     257              :         }
     258     13904539 :     }
     259              : }
     260              : 
     261              : impl KeySpace {
     262              :     /// Create a key space with a single range.
     263        45408 :     pub fn single(key_range: Range<Key>) -> Self {
     264        45408 :         Self {
     265        45408 :             ranges: vec![key_range],
     266        45408 :         }
     267        45408 :     }
     268              : 
     269              :     /// Partition a key space into roughly chunks of roughly 'target_size' bytes
     270              :     /// in each partition.
     271              :     ///
     272          652 :     pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
     273          652 :         // Assume that each value is 8k in size.
     274          652 :         let target_nblocks = (target_size / BLCKSZ as u64) as u32;
     275          652 : 
     276          652 :         let mut parts = Vec::new();
     277          652 :         let mut current_part = Vec::new();
     278          652 :         let mut current_part_size: usize = 0;
     279         4528 :         for range in &self.ranges {
     280              :             // While doing partitioning, wrap the range in ShardedRange so that our size calculations
     281              :             // will respect shard striping rather than assuming all keys within a range are present.
     282         3876 :             let range = ShardedRange::new(range.clone(), shard_identity);
     283              : 
     284              :             // Chunk up the range into parts that each contain up to target_size local blocks
     285         3884 :             for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
     286              :                 // If appending the next contiguous range in the keyspace to the current
     287              :                 // partition would cause it to be too large, and our current partition
     288              :                 // covers at least one block that is physically present in this shard,
     289              :                 // then start a new partition
     290         3884 :                 if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
     291           48 :                     && current_part_size > 0
     292           48 :                 {
     293           48 :                     parts.push(KeySpace {
     294           48 :                         ranges: current_part,
     295           48 :                     });
     296           48 :                     current_part = Vec::new();
     297           48 :                     current_part_size = 0;
     298         3836 :                 }
     299         3884 :                 current_part.push(frag_range.start..frag_range.end);
     300         3884 :                 current_part_size += frag_on_shard_size as usize;
     301              :             }
     302              :         }
     303              : 
     304              :         // add last partition that wasn't full yet.
     305          652 :         if !current_part.is_empty() {
     306          652 :             parts.push(KeySpace {
     307          652 :                 ranges: current_part,
     308          652 :             });
     309          652 :         }
     310              : 
     311          652 :         KeyPartitioning { parts }
     312          652 :     }
     313              : 
     314      5551992 :     pub fn is_empty(&self) -> bool {
     315      5551992 :         self.total_raw_size() == 0
     316      5551992 :     }
     317              : 
     318              :     /// Merge another keyspace into the current one.
     319              :     /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
     320      3397488 :     pub fn merge(&mut self, other: &KeySpace) {
     321      3397488 :         let all_ranges = self
     322      3397488 :             .ranges
     323      3397488 :             .iter()
     324      3397488 :             .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
     325      3397488 : 
     326      3397488 :         let mut accum = KeySpaceAccum::new();
     327      3397488 :         let mut prev: Option<&Range<Key>> = None;
     328      4604938 :         for range in all_ranges {
     329      1207450 :             if let Some(prev) = prev {
     330           68 :                 let overlap =
     331           68 :                     std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
     332           68 :                 assert!(
     333           68 :                     !overlap,
     334            0 :                     "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
     335              :                     prev, range
     336              :                 );
     337      1207382 :             }
     338              : 
     339      1207450 :             accum.add_range(range.clone());
     340      1207450 :             prev = Some(range);
     341              :         }
     342              : 
     343      3397488 :         self.ranges = accum.to_keyspace().ranges;
     344      3397488 :     }
     345              : 
     346              :     /// Remove all keys in `other` from `self`.
     347              :     /// This can involve splitting or removing of existing ranges.
     348              :     /// Returns the removed keyspace
     349      7751112 :     pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
     350      7751112 :         let (self_start, self_end) = match (self.start(), self.end()) {
     351      6092757 :             (Some(start), Some(end)) => (start, end),
     352              :             _ => {
     353              :                 // self is empty
     354      1658355 :                 return KeySpace::default();
     355              :             }
     356              :         };
     357              : 
     358              :         // Key spaces are sorted by definition, so skip ahead to the first
     359              :         // potentially intersecting range. Similarly, ignore ranges that start
     360              :         // after the current keyspace ends.
     361      6092757 :         let other_ranges = other
     362      6092757 :             .ranges
     363      6092757 :             .iter()
     364      6092757 :             .skip_while(|range| self_start >= range.end)
     365      6092757 :             .take_while(|range| self_end > range.start);
     366      6092757 : 
     367      6092757 :         let mut removed_accum = KeySpaceRandomAccum::new();
     368      8599374 :         for range in other_ranges {
     369      5013710 :             while let Some(overlap_at) = self.overlaps_at(range) {
     370      2507093 :                 let overlapped = self.ranges[overlap_at].clone();
     371      2507093 : 
     372      2507093 :                 if overlapped.start < range.start && overlapped.end <= range.end {
     373            9 :                     // Higher part of the range is completely overlapped.
     374            9 :                     removed_accum.add_range(range.start..self.ranges[overlap_at].end);
     375            9 :                     self.ranges[overlap_at].end = range.start;
     376      2507084 :                 }
     377      2507093 :                 if overlapped.start >= range.start && overlapped.end > range.end {
     378            9 :                     // Lower part of the range is completely overlapped.
     379            9 :                     removed_accum.add_range(self.ranges[overlap_at].start..range.end);
     380            9 :                     self.ranges[overlap_at].start = range.end;
     381      2507084 :                 }
     382      2507093 :                 if overlapped.start < range.start && overlapped.end > range.end {
     383           26 :                     // Middle part of the range is overlapped.
     384           26 :                     removed_accum.add_range(range.clone());
     385           26 :                     self.ranges[overlap_at].end = range.start;
     386           26 :                     self.ranges
     387           26 :                         .insert(overlap_at + 1, range.end..overlapped.end);
     388      2507067 :                 }
     389      2507093 :                 if overlapped.start >= range.start && overlapped.end <= range.end {
     390      2507049 :                     // Whole range is overlapped
     391      2507049 :                     removed_accum.add_range(self.ranges[overlap_at].clone());
     392      2507049 :                     self.ranges.remove(overlap_at);
     393      2507049 :                 }
     394              :             }
     395              :         }
     396              : 
     397      6092757 :         removed_accum.to_keyspace()
     398      7751112 :     }
     399              : 
     400      7751188 :     pub fn start(&self) -> Option<Key> {
     401      7751188 :         self.ranges.first().map(|range| range.start)
     402      7751188 :     }
     403              : 
     404      7751772 :     pub fn end(&self) -> Option<Key> {
     405      7751772 :         self.ranges.last().map(|range| range.end)
     406      7751772 :     }
     407              : 
     408              :     /// The size of the keyspace in pages, before accounting for sharding
     409      5591424 :     pub fn total_raw_size(&self) -> usize {
     410      5591424 :         self.ranges
     411      5591424 :             .iter()
     412      5591424 :             .map(|range| ShardedRange::raw_size(range) as usize)
     413      5591424 :             .sum()
     414      5591424 :     }
     415              : 
     416      5019335 :     fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
     417      5019335 :         match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
     418           25 :             Ok(0) => None,
     419      2508538 :             Err(0) => None,
     420           37 :             Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     421      2510735 :             Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     422         1529 :             _ => None,
     423              :         }
     424      5019335 :     }
     425              : 
     426              :     ///
     427              :     /// Check if key space contains overlapping range
     428              :     ///
     429         5625 :     pub fn overlaps(&self, range: &Range<Key>) -> bool {
     430         5625 :         self.overlaps_at(range).is_some()
     431         5625 :     }
     432              : 
     433              :     /// Check if the keyspace contains a key
     434         1580 :     pub fn contains(&self, key: &Key) -> bool {
     435         1580 :         self.overlaps(&(*key..key.next()))
     436         1580 :     }
     437              : }
     438              : 
     439              : ///
     440              : /// Represents a partitioning of the key space.
     441              : ///
     442              : /// The only kind of partitioning we do is to partition the key space into
     443              : /// partitions that are roughly equal in physical size (see KeySpace::partition).
     444              : /// But this data structure could represent any partitioning.
     445              : ///
     446              : #[derive(Clone, Debug, Default)]
     447              : pub struct KeyPartitioning {
     448              :     pub parts: Vec<KeySpace>,
     449              : }
     450              : 
     451              : /// Represents a partitioning of the sparse key space.
     452              : #[derive(Clone, Debug, Default)]
     453              : pub struct SparseKeyPartitioning {
     454              :     pub parts: Vec<SparseKeySpace>,
     455              : }
     456              : 
     457              : impl KeyPartitioning {
     458         1808 :     pub fn new() -> Self {
     459         1808 :         KeyPartitioning { parts: Vec::new() }
     460         1808 :     }
     461              : 
     462              :     /// Convert a key partitioning to a sparse partition.
     463          904 :     pub fn into_sparse(self) -> SparseKeyPartitioning {
     464          904 :         SparseKeyPartitioning {
     465          904 :             parts: self.parts.into_iter().map(SparseKeySpace).collect(),
     466          904 :         }
     467          904 :     }
     468              : }
     469              : 
     470              : impl SparseKeyPartitioning {
     471              :     /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
     472              :     /// cause long/dead loops.
     473         1148 :     pub fn into_dense(self) -> KeyPartitioning {
     474         1148 :         KeyPartitioning {
     475         1148 :             parts: self.parts.into_iter().map(|x| x.0).collect(),
     476         1148 :         }
     477         1148 :     }
     478              : }
     479              : 
     480              : ///
     481              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     482              : /// object. This takes care of merging adjacent keys and key ranges into
     483              : /// contiguous ranges.
     484              : ///
     485              : #[derive(Clone, Debug, Default)]
     486              : pub struct KeySpaceAccum {
     487              :     accum: Option<Range<Key>>,
     488              : 
     489              :     ranges: Vec<Range<Key>>,
     490              :     size: u64,
     491              : }
     492              : 
     493              : impl KeySpaceAccum {
     494      3654351 :     pub fn new() -> Self {
     495      3654351 :         Self {
     496      3654351 :             accum: None,
     497      3654351 :             ranges: Vec::new(),
     498      3654351 :             size: 0,
     499      3654351 :         }
     500      3654351 :     }
     501              : 
     502              :     #[inline(always)]
     503      8323044 :     pub fn add_key(&mut self, key: Key) {
     504      8323044 :         self.add_range(singleton_range(key))
     505      8323044 :     }
     506              : 
     507              :     #[inline(always)]
     508     11267137 :     pub fn add_range(&mut self, range: Range<Key>) {
     509     11267137 :         self.size += ShardedRange::raw_size(&range) as u64;
     510     11267137 : 
     511     11267137 :         match self.accum.as_mut() {
     512      8270876 :             Some(accum) => {
     513      8270876 :                 if range.start == accum.end {
     514      8256960 :                     accum.end = range.end;
     515      8256960 :                 } else {
     516              :                     // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
     517              :                     // a new range here if the skipped region was all keys that don't belong on this shard.
     518              :                     // (https://github.com/neondatabase/neon/issues/6247)
     519        13916 :                     assert!(range.start > accum.end);
     520        13916 :                     self.ranges.push(accum.clone());
     521        13916 :                     *accum = range;
     522              :                 }
     523              :             }
     524      2996261 :             None => self.accum = Some(range),
     525              :         }
     526     11267137 :     }
     527              : 
     528      5186344 :     pub fn to_keyspace(mut self) -> KeySpace {
     529      5186344 :         if let Some(accum) = self.accum.take() {
     530      2996233 :             self.ranges.push(accum);
     531      2996233 :         }
     532      5186344 :         KeySpace {
     533      5186344 :             ranges: self.ranges,
     534      5186344 :         }
     535      5186344 :     }
     536              : 
     537         2658 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     538         2658 :         std::mem::take(self).to_keyspace()
     539         2658 :     }
     540              : 
     541              :     // The total number of keys in this object, ignoring any sharding effects that might cause some of
     542              :     // the keys to be omitted in storage on this shard.
     543         5759 :     pub fn raw_size(&self) -> u64 {
     544         5759 :         self.size
     545         5759 :     }
     546              : }
     547              : 
     548              : ///
     549              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     550              : /// object. Key ranges may be inserted in any order and can overlap.
     551              : ///
     552              : #[derive(Clone, Debug, Default)]
     553              : pub struct KeySpaceRandomAccum {
     554              :     ranges: Vec<Range<Key>>,
     555              : }
     556              : 
     557              : impl KeySpaceRandomAccum {
     558     17546781 :     pub fn new() -> Self {
     559     17546781 :         Self { ranges: Vec::new() }
     560     17546781 :     }
     561              : 
     562      1208636 :     pub fn add_key(&mut self, key: Key) {
     563      1208636 :         self.add_range(singleton_range(key))
     564      1208636 :     }
     565              : 
     566      5625879 :     pub fn add_range(&mut self, range: Range<Key>) {
     567      5625879 :         self.ranges.push(range);
     568      5625879 :     }
     569              : 
     570      1693729 :     pub fn add_keyspace(&mut self, keyspace: KeySpace) {
     571      3387466 :         for range in keyspace.ranges {
     572      1693737 :             self.add_range(range);
     573      1693737 :         }
     574      1693729 :     }
     575              : 
     576     12893958 :     pub fn to_keyspace(mut self) -> KeySpace {
     577     12893958 :         let mut ranges = Vec::new();
     578     12893958 :         if !self.ranges.is_empty() {
     579      5455927 :             self.ranges.sort_by_key(|r| r.start);
     580      5455927 :             let mut start = self.ranges.first().unwrap().start;
     581      5455927 :             let mut end = self.ranges.first().unwrap().end;
     582     11081712 :             for r in self.ranges {
     583      5625785 :                 assert!(r.start >= start);
     584      5625785 :                 if r.start > end {
     585         3168 :                     ranges.push(start..end);
     586         3168 :                     start = r.start;
     587         3168 :                     end = r.end;
     588      5622617 :                 } else if r.end > end {
     589         9218 :                     end = r.end;
     590      5613399 :                 }
     591              :             }
     592      5455927 :             ranges.push(start..end);
     593      7438031 :         }
     594     12893958 :         KeySpace { ranges }
     595     12893958 :     }
     596              : 
     597      6794936 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     598      6794936 :         let mut prev_accum = KeySpaceRandomAccum::new();
     599      6794936 :         std::mem::swap(self, &mut prev_accum);
     600      6794936 : 
     601      6794936 :         prev_accum.to_keyspace()
     602      6794936 :     }
     603              : }
     604              : 
     605      9531680 : pub fn singleton_range(key: Key) -> Range<Key> {
     606      9531680 :     key..key.next()
     607      9531680 : }
     608              : 
     609              : #[cfg(test)]
     610              : mod tests {
     611              :     use std::fmt::Write;
     612              : 
     613              :     use rand::{RngCore, SeedableRng};
     614              : 
     615              :     use super::*;
     616              :     use crate::models::ShardParameters;
     617              :     use crate::shard::{ShardCount, ShardNumber};
     618              : 
     619              :     // Helper function to create a key range.
     620              :     //
     621              :     // Make the tests below less verbose.
     622           46 :     fn kr(irange: Range<i128>) -> Range<Key> {
     623           46 :         Key::from_i128(irange.start)..Key::from_i128(irange.end)
     624           46 :     }
     625              : 
     626              :     #[allow(dead_code)]
     627            0 :     fn dump_keyspace(ks: &KeySpace) {
     628            0 :         for r in ks.ranges.iter() {
     629            0 :             println!("  {}..{}", r.start.to_i128(), r.end.to_i128());
     630            0 :         }
     631            0 :     }
     632              : 
     633           11 :     fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
     634           11 :         if actual.ranges != expected {
     635            0 :             let mut msg = String::new();
     636            0 : 
     637            0 :             writeln!(msg, "expected:").unwrap();
     638            0 :             for r in &expected {
     639            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     640            0 :             }
     641            0 :             writeln!(msg, "got:").unwrap();
     642            0 :             for r in &actual.ranges {
     643            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     644            0 :             }
     645            0 :             panic!("{}", msg);
     646           11 :         }
     647           11 :     }
     648              : 
     649              :     #[test]
     650            1 :     fn keyspace_consume() {
     651            1 :         let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
     652            1 : 
     653            1 :         let mut accum = KeySpaceAccum::new();
     654            4 :         for range in &ranges {
     655            3 :             accum.add_range(range.clone());
     656            3 :         }
     657              : 
     658            1 :         let expected_size: u64 = ranges
     659            1 :             .iter()
     660            3 :             .map(|r| ShardedRange::raw_size(r) as u64)
     661            1 :             .sum();
     662            1 :         assert_eq!(accum.raw_size(), expected_size);
     663              : 
     664            1 :         assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
     665            1 :         assert_eq!(accum.raw_size(), 0);
     666              : 
     667            1 :         assert_ks_eq(&accum.consume_keyspace(), vec![]);
     668            1 :         assert_eq!(accum.raw_size(), 0);
     669              : 
     670            4 :         for range in &ranges {
     671            3 :             accum.add_range(range.clone());
     672            3 :         }
     673            1 :         assert_ks_eq(&accum.to_keyspace(), ranges);
     674            1 :     }
     675              : 
     676              :     #[test]
     677            1 :     fn keyspace_add_range() {
     678            1 :         // two separate ranges
     679            1 :         //
     680            1 :         // #####
     681            1 :         //         #####
     682            1 :         let mut ks = KeySpaceRandomAccum::default();
     683            1 :         ks.add_range(kr(0..10));
     684            1 :         ks.add_range(kr(20..30));
     685            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
     686            1 : 
     687            1 :         // two separate ranges, added in reverse order
     688            1 :         //
     689            1 :         //         #####
     690            1 :         // #####
     691            1 :         let mut ks = KeySpaceRandomAccum::default();
     692            1 :         ks.add_range(kr(20..30));
     693            1 :         ks.add_range(kr(0..10));
     694            1 : 
     695            1 :         // add range that is adjacent to the end of an existing range
     696            1 :         //
     697            1 :         // #####
     698            1 :         //      #####
     699            1 :         ks.add_range(kr(0..10));
     700            1 :         ks.add_range(kr(10..30));
     701            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     702            1 : 
     703            1 :         // add range that is adjacent to the start of an existing range
     704            1 :         //
     705            1 :         //      #####
     706            1 :         // #####
     707            1 :         let mut ks = KeySpaceRandomAccum::default();
     708            1 :         ks.add_range(kr(10..30));
     709            1 :         ks.add_range(kr(0..10));
     710            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     711            1 : 
     712            1 :         // add range that overlaps with the end of an existing range
     713            1 :         //
     714            1 :         // #####
     715            1 :         //    #####
     716            1 :         let mut ks = KeySpaceRandomAccum::default();
     717            1 :         ks.add_range(kr(0..10));
     718            1 :         ks.add_range(kr(5..30));
     719            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     720            1 : 
     721            1 :         // add range that overlaps with the start of an existing range
     722            1 :         //
     723            1 :         //    #####
     724            1 :         // #####
     725            1 :         let mut ks = KeySpaceRandomAccum::default();
     726            1 :         ks.add_range(kr(5..30));
     727            1 :         ks.add_range(kr(0..10));
     728            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     729            1 : 
     730            1 :         // add range that is fully covered by an existing range
     731            1 :         //
     732            1 :         // #########
     733            1 :         //   #####
     734            1 :         let mut ks = KeySpaceRandomAccum::default();
     735            1 :         ks.add_range(kr(0..30));
     736            1 :         ks.add_range(kr(10..20));
     737            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     738            1 : 
     739            1 :         // add range that extends an existing range from both ends
     740            1 :         //
     741            1 :         //   #####
     742            1 :         // #########
     743            1 :         let mut ks = KeySpaceRandomAccum::default();
     744            1 :         ks.add_range(kr(10..20));
     745            1 :         ks.add_range(kr(0..30));
     746            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     747            1 : 
     748            1 :         // add a range that overlaps with two existing ranges, joining them
     749            1 :         //
     750            1 :         // #####   #####
     751            1 :         //    #######
     752            1 :         let mut ks = KeySpaceRandomAccum::default();
     753            1 :         ks.add_range(kr(0..10));
     754            1 :         ks.add_range(kr(20..30));
     755            1 :         ks.add_range(kr(5..25));
     756            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     757            1 :     }
     758              : 
     759              :     #[test]
     760            1 :     fn keyspace_overlaps() {
     761            1 :         let mut ks = KeySpaceRandomAccum::default();
     762            1 :         ks.add_range(kr(10..20));
     763            1 :         ks.add_range(kr(30..40));
     764            1 :         let ks = ks.to_keyspace();
     765            1 : 
     766            1 :         //        #####      #####
     767            1 :         // xxxx
     768            1 :         assert!(!ks.overlaps(&kr(0..5)));
     769              : 
     770              :         //        #####      #####
     771              :         //   xxxx
     772            1 :         assert!(!ks.overlaps(&kr(5..9)));
     773              : 
     774              :         //        #####      #####
     775              :         //    xxxx
     776            1 :         assert!(!ks.overlaps(&kr(5..10)));
     777              : 
     778              :         //        #####      #####
     779              :         //     xxxx
     780            1 :         assert!(ks.overlaps(&kr(5..11)));
     781              : 
     782              :         //        #####      #####
     783              :         //        xxxx
     784            1 :         assert!(ks.overlaps(&kr(10..15)));
     785              : 
     786              :         //        #####      #####
     787              :         //         xxxx
     788            1 :         assert!(ks.overlaps(&kr(15..20)));
     789              : 
     790              :         //        #####      #####
     791              :         //           xxxx
     792            1 :         assert!(ks.overlaps(&kr(15..25)));
     793              : 
     794              :         //        #####      #####
     795              :         //              xxxx
     796            1 :         assert!(!ks.overlaps(&kr(22..28)));
     797              : 
     798              :         //        #####      #####
     799              :         //               xxxx
     800            1 :         assert!(!ks.overlaps(&kr(25..30)));
     801              : 
     802              :         //        #####      #####
     803              :         //                      xxxx
     804            1 :         assert!(ks.overlaps(&kr(35..35)));
     805              : 
     806              :         //        #####      #####
     807              :         //                        xxxx
     808            1 :         assert!(!ks.overlaps(&kr(40..45)));
     809              : 
     810              :         //        #####      #####
     811              :         //                        xxxx
     812            1 :         assert!(!ks.overlaps(&kr(45..50)));
     813              : 
     814              :         //        #####      #####
     815              :         //        xxxxxxxxxxx
     816            1 :         assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
     817            1 :     }
     818              : 
     819              :     #[test]
     820            1 :     fn test_remove_full_overlapps() {
     821            1 :         let mut key_space1 = KeySpace {
     822            1 :             ranges: vec![
     823            1 :                 Key::from_i128(1)..Key::from_i128(4),
     824            1 :                 Key::from_i128(5)..Key::from_i128(8),
     825            1 :                 Key::from_i128(10)..Key::from_i128(12),
     826            1 :             ],
     827            1 :         };
     828            1 :         let key_space2 = KeySpace {
     829            1 :             ranges: vec![
     830            1 :                 Key::from_i128(2)..Key::from_i128(3),
     831            1 :                 Key::from_i128(6)..Key::from_i128(7),
     832            1 :                 Key::from_i128(11)..Key::from_i128(13),
     833            1 :             ],
     834            1 :         };
     835            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     836            1 :         let removed_expected = KeySpace {
     837            1 :             ranges: vec![
     838            1 :                 Key::from_i128(2)..Key::from_i128(3),
     839            1 :                 Key::from_i128(6)..Key::from_i128(7),
     840            1 :                 Key::from_i128(11)..Key::from_i128(12),
     841            1 :             ],
     842            1 :         };
     843            1 :         assert_eq!(removed, removed_expected);
     844              : 
     845            1 :         assert_eq!(
     846            1 :             key_space1.ranges,
     847            1 :             vec![
     848            1 :                 Key::from_i128(1)..Key::from_i128(2),
     849            1 :                 Key::from_i128(3)..Key::from_i128(4),
     850            1 :                 Key::from_i128(5)..Key::from_i128(6),
     851            1 :                 Key::from_i128(7)..Key::from_i128(8),
     852            1 :                 Key::from_i128(10)..Key::from_i128(11)
     853            1 :             ]
     854            1 :         );
     855            1 :     }
     856              : 
     857              :     #[test]
     858            1 :     fn test_remove_partial_overlaps() {
     859            1 :         // Test partial ovelaps
     860            1 :         let mut key_space1 = KeySpace {
     861            1 :             ranges: vec![
     862            1 :                 Key::from_i128(1)..Key::from_i128(5),
     863            1 :                 Key::from_i128(7)..Key::from_i128(10),
     864            1 :                 Key::from_i128(12)..Key::from_i128(15),
     865            1 :             ],
     866            1 :         };
     867            1 :         let key_space2 = KeySpace {
     868            1 :             ranges: vec![
     869            1 :                 Key::from_i128(3)..Key::from_i128(6),
     870            1 :                 Key::from_i128(8)..Key::from_i128(11),
     871            1 :                 Key::from_i128(14)..Key::from_i128(17),
     872            1 :             ],
     873            1 :         };
     874            1 : 
     875            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     876            1 :         let removed_expected = KeySpace {
     877            1 :             ranges: vec![
     878            1 :                 Key::from_i128(3)..Key::from_i128(5),
     879            1 :                 Key::from_i128(8)..Key::from_i128(10),
     880            1 :                 Key::from_i128(14)..Key::from_i128(15),
     881            1 :             ],
     882            1 :         };
     883            1 :         assert_eq!(removed, removed_expected);
     884              : 
     885            1 :         assert_eq!(
     886            1 :             key_space1.ranges,
     887            1 :             vec![
     888            1 :                 Key::from_i128(1)..Key::from_i128(3),
     889            1 :                 Key::from_i128(7)..Key::from_i128(8),
     890            1 :                 Key::from_i128(12)..Key::from_i128(14),
     891            1 :             ]
     892            1 :         );
     893            1 :     }
     894              : 
     895              :     #[test]
     896            1 :     fn test_remove_no_overlaps() {
     897            1 :         let mut key_space1 = KeySpace {
     898            1 :             ranges: vec![
     899            1 :                 Key::from_i128(1)..Key::from_i128(5),
     900            1 :                 Key::from_i128(7)..Key::from_i128(10),
     901            1 :                 Key::from_i128(12)..Key::from_i128(15),
     902            1 :             ],
     903            1 :         };
     904            1 :         let key_space2 = KeySpace {
     905            1 :             ranges: vec![
     906            1 :                 Key::from_i128(6)..Key::from_i128(7),
     907            1 :                 Key::from_i128(11)..Key::from_i128(12),
     908            1 :                 Key::from_i128(15)..Key::from_i128(17),
     909            1 :             ],
     910            1 :         };
     911            1 : 
     912            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     913            1 :         let removed_expected = KeySpace::default();
     914            1 :         assert_eq!(removed, removed_expected);
     915              : 
     916            1 :         assert_eq!(
     917            1 :             key_space1.ranges,
     918            1 :             vec![
     919            1 :                 Key::from_i128(1)..Key::from_i128(5),
     920            1 :                 Key::from_i128(7)..Key::from_i128(10),
     921            1 :                 Key::from_i128(12)..Key::from_i128(15),
     922            1 :             ]
     923            1 :         );
     924            1 :     }
     925              : 
     926              :     #[test]
     927            1 :     fn test_remove_one_range_overlaps_multiple() {
     928            1 :         let mut key_space1 = KeySpace {
     929            1 :             ranges: vec![
     930            1 :                 Key::from_i128(1)..Key::from_i128(3),
     931            1 :                 Key::from_i128(3)..Key::from_i128(6),
     932            1 :                 Key::from_i128(6)..Key::from_i128(10),
     933            1 :                 Key::from_i128(12)..Key::from_i128(15),
     934            1 :                 Key::from_i128(17)..Key::from_i128(20),
     935            1 :                 Key::from_i128(20)..Key::from_i128(30),
     936            1 :                 Key::from_i128(30)..Key::from_i128(40),
     937            1 :             ],
     938            1 :         };
     939            1 :         let key_space2 = KeySpace {
     940            1 :             ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
     941            1 :         };
     942            1 : 
     943            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     944            1 :         let removed_expected = KeySpace {
     945            1 :             ranges: vec![
     946            1 :                 Key::from_i128(9)..Key::from_i128(10),
     947            1 :                 Key::from_i128(12)..Key::from_i128(15),
     948            1 :                 Key::from_i128(17)..Key::from_i128(19),
     949            1 :             ],
     950            1 :         };
     951            1 :         assert_eq!(removed, removed_expected);
     952              : 
     953            1 :         assert_eq!(
     954            1 :             key_space1.ranges,
     955            1 :             vec![
     956            1 :                 Key::from_i128(1)..Key::from_i128(3),
     957            1 :                 Key::from_i128(3)..Key::from_i128(6),
     958            1 :                 Key::from_i128(6)..Key::from_i128(9),
     959            1 :                 Key::from_i128(19)..Key::from_i128(20),
     960            1 :                 Key::from_i128(20)..Key::from_i128(30),
     961            1 :                 Key::from_i128(30)..Key::from_i128(40),
     962            1 :             ]
     963            1 :         );
     964            1 :     }
     965              :     #[test]
     966            1 :     fn sharded_range_relation_gap() {
     967            1 :         let shard_identity = ShardIdentity::new(
     968            1 :             ShardNumber(0),
     969            1 :             ShardCount::new(4),
     970            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     971            1 :         )
     972            1 :         .unwrap();
     973            1 : 
     974            1 :         let range = ShardedRange::new(
     975            1 :             Range {
     976            1 :                 start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
     977            1 :                 end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
     978            1 :             },
     979            1 :             &shard_identity,
     980            1 :         );
     981            1 : 
     982            1 :         // Key range spans relations, expect MAX
     983            1 :         assert_eq!(range.page_count(), u32::MAX);
     984            1 :     }
     985              : 
     986              :     #[test]
     987            1 :     fn shard_identity_keyspaces_single_key() {
     988            1 :         let shard_identity = ShardIdentity::new(
     989            1 :             ShardNumber(1),
     990            1 :             ShardCount::new(4),
     991            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     992            1 :         )
     993            1 :         .unwrap();
     994            1 : 
     995            1 :         let range = ShardedRange::new(
     996            1 :             Range {
     997            1 :                 start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
     998            1 :                 end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
     999            1 :             },
    1000            1 :             &shard_identity,
    1001            1 :         );
    1002            1 :         // Single-key range on logical size key
    1003            1 :         assert_eq!(range.page_count(), 1);
    1004            1 :     }
    1005              : 
    1006              :     /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
    1007              :     #[test]
    1008            1 :     fn contiguous_range_check() {
    1009            1 :         assert!(!is_contiguous_range(
    1010            1 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1011            1 :                 ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
    1012            1 :         ),);
    1013              : 
    1014              :         // The ranges goes all the way up to the 0xffffffff, including it: this is
    1015              :         // not considered a rel block range because 0xffffffff stores logical sizes,
    1016              :         // not blocks.
    1017            1 :         assert!(!is_contiguous_range(
    1018            1 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1019            1 :                 ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
    1020            1 :         ),);
    1021              : 
    1022              :         // Keys within the normal data region of a relation
    1023            1 :         assert!(is_contiguous_range(
    1024            1 :             &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
    1025            1 :                 ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
    1026            1 :         ),);
    1027              : 
    1028              :         // The logical size key of one forkno, then some blocks in the next
    1029            1 :         assert!(is_contiguous_range(
    1030            1 :             &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
    1031            1 :                 ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
    1032            1 :         ),);
    1033            1 :     }
    1034              : 
    1035              :     #[test]
    1036            1 :     fn shard_identity_keyspaces_forkno_gap() {
    1037            1 :         let shard_identity = ShardIdentity::new(
    1038            1 :             ShardNumber(1),
    1039            1 :             ShardCount::new(4),
    1040            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1041            1 :         )
    1042            1 :         .unwrap();
    1043            1 : 
    1044            1 :         let range = ShardedRange::new(
    1045            1 :             Range {
    1046            1 :                 start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
    1047            1 :                 end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
    1048            1 :             },
    1049            1 :             &shard_identity,
    1050            1 :         );
    1051            1 : 
    1052            1 :         // Range spanning the end of one forkno and the start of the next: we do not attempt to
    1053            1 :         // calculate a valid size, because we have no way to know if they keys between start
    1054            1 :         // and end are actually in use.
    1055            1 :         assert_eq!(range.page_count(), u32::MAX);
    1056            1 :     }
    1057              : 
    1058              :     #[test]
    1059            1 :     fn shard_identity_keyspaces_one_relation() {
    1060            5 :         for shard_number in 0..4 {
    1061            4 :             let shard_identity = ShardIdentity::new(
    1062            4 :                 ShardNumber(shard_number),
    1063            4 :                 ShardCount::new(4),
    1064            4 :                 ShardParameters::DEFAULT_STRIPE_SIZE,
    1065            4 :             )
    1066            4 :             .unwrap();
    1067            4 : 
    1068            4 :             let range = ShardedRange::new(
    1069            4 :                 Range {
    1070            4 :                     start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
    1071            4 :                     end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
    1072            4 :                 },
    1073            4 :                 &shard_identity,
    1074            4 :             );
    1075            4 : 
    1076            4 :             // Very simple case: range covering block zero of one relation, where that block maps to shard zero
    1077            4 :             if shard_number == 0 {
    1078            1 :                 assert_eq!(range.page_count(), 1);
    1079              :             } else {
    1080              :                 // Other shards should perceive the range's size as zero
    1081            3 :                 assert_eq!(range.page_count(), 0);
    1082              :             }
    1083              :         }
    1084            1 :     }
    1085              : 
    1086              :     /// Test helper: construct a ShardedRange and call fragment() on it, returning
    1087              :     /// the total page count in the range and the fragments.
    1088         1012 :     fn do_fragment(
    1089         1012 :         range_start: Key,
    1090         1012 :         range_end: Key,
    1091         1012 :         shard_identity: &ShardIdentity,
    1092         1012 :         target_nblocks: u32,
    1093         1012 :     ) -> (u32, Vec<(u32, Range<Key>)>) {
    1094         1012 :         let range = ShardedRange::new(
    1095         1012 :             Range {
    1096         1012 :                 start: range_start,
    1097         1012 :                 end: range_end,
    1098         1012 :             },
    1099         1012 :             shard_identity,
    1100         1012 :         );
    1101         1012 : 
    1102         1012 :         let page_count = range.page_count();
    1103         1012 :         let fragments = range.fragment(target_nblocks);
    1104         1012 : 
    1105         1012 :         // Invariant: we always get at least one fragment
    1106         1012 :         assert!(!fragments.is_empty());
    1107              : 
    1108              :         // Invariant: the first/last fragment start/end should equal the input start/end
    1109         1012 :         assert_eq!(fragments.first().unwrap().1.start, range_start);
    1110         1012 :         assert_eq!(fragments.last().unwrap().1.end, range_end);
    1111              : 
    1112         1012 :         if page_count > 0 {
    1113              :             // Invariant: every fragment must contain at least one shard-local page, if the
    1114              :             // total range contains at least one shard-local page
    1115          687 :             let all_nonzero = fragments.iter().all(|f| f.0 > 0);
    1116          554 :             if !all_nonzero {
    1117            0 :                 eprintln!("Found a zero-length fragment: {:?}", fragments);
    1118          554 :             }
    1119          554 :             assert!(all_nonzero);
    1120              :         } else {
    1121              :             // A range with no shard-local pages should always be returned as a single fragment
    1122          458 :             assert_eq!(fragments, vec![(0, range_start..range_end)]);
    1123              :         }
    1124              : 
    1125              :         // Invariant: fragments must be ordered and non-overlapping
    1126         1012 :         let mut last: Option<Range<Key>> = None;
    1127         2157 :         for frag in &fragments {
    1128         1145 :             if let Some(last) = last {
    1129          133 :                 assert!(frag.1.start >= last.end);
    1130          133 :                 assert!(frag.1.start > last.start);
    1131         1012 :             }
    1132         1145 :             last = Some(frag.1.clone())
    1133              :         }
    1134              : 
    1135              :         // Invariant: fragments respect target_nblocks
    1136         2157 :         for frag in &fragments {
    1137         1145 :             assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
    1138              :         }
    1139              : 
    1140         1012 :         (page_count, fragments)
    1141         1012 :     }
    1142              : 
    1143              :     /// Really simple tests for fragment(), on a range that just contains a single stripe
    1144              :     /// for a single tenant.
    1145              :     #[test]
    1146            1 :     fn sharded_range_fragment_simple() {
    1147            1 :         let shard_identity = ShardIdentity::new(
    1148            1 :             ShardNumber(0),
    1149            1 :             ShardCount::new(4),
    1150            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1151            1 :         )
    1152            1 :         .unwrap();
    1153            1 : 
    1154            1 :         // A range which we happen to know covers exactly one stripe which belongs to this shard
    1155            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1156            1 :         let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
    1157            1 : 
    1158            1 :         // Ask for stripe_size blocks, we get the whole stripe
    1159            1 :         assert_eq!(
    1160            1 :             do_fragment(input_start, input_end, &shard_identity, 32768),
    1161            1 :             (32768, vec![(32768, input_start..input_end)])
    1162            1 :         );
    1163              : 
    1164              :         // Ask for more, we still get the whole stripe
    1165            1 :         assert_eq!(
    1166            1 :             do_fragment(input_start, input_end, &shard_identity, 10000000),
    1167            1 :             (32768, vec![(32768, input_start..input_end)])
    1168            1 :         );
    1169              : 
    1170              :         // Ask for target_nblocks of half the stripe size, we get two halves
    1171            1 :         assert_eq!(
    1172            1 :             do_fragment(input_start, input_end, &shard_identity, 16384),
    1173            1 :             (
    1174            1 :                 32768,
    1175            1 :                 vec![
    1176            1 :                     (16384, input_start..input_start.add(16384)),
    1177            1 :                     (16384, input_start.add(16384)..input_end)
    1178            1 :                 ]
    1179            1 :             )
    1180            1 :         );
    1181            1 :     }
    1182              : 
    1183              :     #[test]
    1184            1 :     fn sharded_range_fragment_multi_stripe() {
    1185            1 :         let shard_identity = ShardIdentity::new(
    1186            1 :             ShardNumber(0),
    1187            1 :             ShardCount::new(4),
    1188            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1189            1 :         )
    1190            1 :         .unwrap();
    1191            1 : 
    1192            1 :         // A range which covers multiple stripes, exactly one of which belongs to the current shard.
    1193            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1194            1 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1195            1 :         // Ask for all the blocks, get a fragment that covers the whole range but reports
    1196            1 :         // its size to be just the blocks belonging to our shard.
    1197            1 :         assert_eq!(
    1198            1 :             do_fragment(input_start, input_end, &shard_identity, 131072),
    1199            1 :             (32768, vec![(32768, input_start..input_end)])
    1200            1 :         );
    1201              : 
    1202              :         // Ask for a sub-stripe quantity
    1203            1 :         assert_eq!(
    1204            1 :             do_fragment(input_start, input_end, &shard_identity, 16000),
    1205            1 :             (
    1206            1 :                 32768,
    1207            1 :                 vec![
    1208            1 :                     (16000, input_start..input_start.add(16000)),
    1209            1 :                     (16000, input_start.add(16000)..input_start.add(32000)),
    1210            1 :                     (768, input_start.add(32000)..input_end),
    1211            1 :                 ]
    1212            1 :             )
    1213            1 :         );
    1214              : 
    1215              :         // Try on a range that starts slightly after our owned stripe
    1216            1 :         assert_eq!(
    1217            1 :             do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
    1218            1 :             (32767, vec![(32767, input_start.add(1)..input_end)])
    1219            1 :         );
    1220            1 :     }
    1221              : 
    1222              :     /// Test our calculations work correctly when we start a range from the logical size key of
    1223              :     /// a previous relation.
    1224              :     #[test]
    1225            1 :     fn sharded_range_fragment_starting_from_logical_size() {
    1226            1 :         let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
    1227            1 :         let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
    1228            1 : 
    1229            1 :         // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
    1230            1 :         let shard_identity = ShardIdentity::new(
    1231            1 :             ShardNumber(0),
    1232            1 :             ShardCount::new(4),
    1233            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1234            1 :         )
    1235            1 :         .unwrap();
    1236            1 :         assert_eq!(
    1237            1 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1238            1 :             (0x8001, vec![(0x8001, input_start..input_end)])
    1239            1 :         );
    1240              : 
    1241              :         // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
    1242              :         // store all logical sizes)
    1243            1 :         let shard_identity = ShardIdentity::new(
    1244            1 :             ShardNumber(1),
    1245            1 :             ShardCount::new(4),
    1246            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1247            1 :         )
    1248            1 :         .unwrap();
    1249            1 :         assert_eq!(
    1250            1 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1251            1 :             (0x1, vec![(0x1, input_start..input_end)])
    1252            1 :         );
    1253            1 :     }
    1254              : 
    1255              :     /// Test that ShardedRange behaves properly when used on un-sharded data
    1256              :     #[test]
    1257            1 :     fn sharded_range_fragment_unsharded() {
    1258            1 :         let shard_identity = ShardIdentity::unsharded();
    1259            1 : 
    1260            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1261            1 :         let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
    1262            1 :         assert_eq!(
    1263            1 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1264            1 :             (
    1265            1 :                 0x10000,
    1266            1 :                 vec![
    1267            1 :                     (0x8000, input_start..input_start.add(0x8000)),
    1268            1 :                     (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
    1269            1 :                 ]
    1270            1 :             )
    1271            1 :         );
    1272            1 :     }
    1273              : 
    1274              :     #[test]
    1275            1 :     fn sharded_range_fragment_cross_relation() {
    1276            1 :         let shard_identity = ShardIdentity::unsharded();
    1277            1 : 
    1278            1 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1279            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1280            1 :         let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
    1281            1 :         assert_eq!(
    1282            1 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1283            1 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1284            1 :         );
    1285              : 
    1286              :         // Same, but using a sharded identity
    1287            1 :         let shard_identity = ShardIdentity::new(
    1288            1 :             ShardNumber(0),
    1289            1 :             ShardCount::new(4),
    1290            1 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1291            1 :         )
    1292            1 :         .unwrap();
    1293            1 :         assert_eq!(
    1294            1 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1295            1 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1296            1 :         );
    1297            1 :     }
    1298              : 
    1299              :     #[test]
    1300            1 :     fn sharded_range_fragment_tiny_nblocks() {
    1301            1 :         let shard_identity = ShardIdentity::unsharded();
    1302            1 : 
    1303            1 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1304            1 :         let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
    1305            1 :         let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
    1306            1 :         assert_eq!(
    1307            1 :             do_fragment(input_start, input_end, &shard_identity, 16),
    1308            1 :             (
    1309            1 :                 0x38,
    1310            1 :                 vec![
    1311            1 :                     (16, input_start..input_start.add(16)),
    1312            1 :                     (16, input_start.add(16)..input_start.add(32)),
    1313            1 :                     (16, input_start.add(32)..input_start.add(48)),
    1314            1 :                     (8, input_start.add(48)..input_end),
    1315            1 :                 ]
    1316            1 :             )
    1317            1 :         );
    1318            1 :     }
    1319              : 
    1320              :     #[test]
    1321            1 :     fn sharded_range_fragment_fuzz() {
    1322            1 :         // Use a fixed seed: we don't want to explicitly pick values, but we do want
    1323            1 :         // the test to be reproducible.
    1324            1 :         let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
    1325              : 
    1326         1001 :         for _i in 0..1000 {
    1327         1000 :             let shard_identity = if prng.next_u32() % 2 == 0 {
    1328          519 :                 ShardIdentity::unsharded()
    1329              :             } else {
    1330          481 :                 let shard_count = prng.next_u32() % 127 + 1;
    1331          481 :                 ShardIdentity::new(
    1332          481 :                     ShardNumber((prng.next_u32() % shard_count) as u8),
    1333          481 :                     ShardCount::new(shard_count as u8),
    1334          481 :                     ShardParameters::DEFAULT_STRIPE_SIZE,
    1335          481 :                 )
    1336          481 :                 .unwrap()
    1337              :             };
    1338              : 
    1339         1000 :             let target_nblocks = prng.next_u32() % 65536 + 1;
    1340         1000 : 
    1341         1000 :             let start_offset = prng.next_u32() % 16384;
    1342         1000 : 
    1343         1000 :             // Try ranges up to 4GiB in size, that are always at least 1
    1344         1000 :             let range_size = prng.next_u32() % 8192 + 1;
    1345         1000 : 
    1346         1000 :             // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1347         1000 :             let input_start = Key::from_hex("000000067F00000001000004E10000000000")
    1348         1000 :                 .unwrap()
    1349         1000 :                 .add(start_offset);
    1350         1000 :             let input_end = input_start.add(range_size);
    1351         1000 : 
    1352         1000 :             // This test's main success conditions are the invariants baked into do_fragment
    1353         1000 :             let (_total_size, fragments) =
    1354         1000 :                 do_fragment(input_start, input_end, &shard_identity, target_nblocks);
    1355         1000 : 
    1356         1000 :             // Pick a random key within the range and check it appears in the output
    1357         1000 :             let example_key = input_start.add(prng.next_u32() % range_size);
    1358         1000 : 
    1359         1000 :             // Panic on unwrap if it isn't found
    1360         1000 :             let example_key_frag = fragments
    1361         1000 :                 .iter()
    1362         1073 :                 .find(|f| f.1.contains(&example_key))
    1363         1000 :                 .unwrap();
    1364         1000 : 
    1365         1000 :             // Check that the fragment containing our random key has a nonzero size if
    1366         1000 :             // that key is shard-local
    1367         1000 :             let example_key_local = !shard_identity.is_key_disposable(&example_key);
    1368         1000 :             if example_key_local {
    1369          542 :                 assert!(example_key_frag.0 > 0);
    1370          458 :             }
    1371              :         }
    1372            1 :     }
    1373              : }
        

Generated by: LCOV version 2.1-beta