|             Line data    Source code 
       1              : use std::ops::Range;
       2              : 
       3              : use itertools::Itertools;
       4              : 
       5              : use crate::key::Key;
       6              : use crate::shard::{ShardCount, ShardIdentity};
       7              : 
       8              : ///
       9              : /// Represents a set of Keys, in a compact form.
      10              : ///
      11              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      12              : pub struct KeySpace {
      13              :     /// Contiguous ranges of keys that belong to the key space. In key order,
      14              :     /// and with no overlap.
      15              :     pub ranges: Vec<Range<Key>>,
      16              : }
      17              : 
      18              : impl std::fmt::Display for KeySpace {
      19            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      20            0 :         write!(f, "[")?;
      21            0 :         for range in &self.ranges {
      22            0 :             write!(f, "{}..{},", range.start, range.end)?;
      23              :         }
      24            0 :         write!(f, "]")
      25            0 :     }
      26              : }
      27              : 
      28              : /// A wrapper type for sparse keyspaces.
      29              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      30              : pub struct SparseKeySpace(pub KeySpace);
      31              : 
      32              : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
      33              : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
      34              : /// shard.
      35              : ///
      36              : /// When we iterate over keys within this object, we will skip any keys that don't belong
      37              : /// to this shard.
      38              : ///
      39              : /// The start + end keys may not belong to the shard: these specify where layer files should
      40              : /// start  + end, but we will never actually read/write those keys.
      41              : #[derive(Clone, Debug, PartialEq, Eq)]
      42              : pub struct ShardedRange<'a> {
      43              :     pub shard_identity: &'a ShardIdentity,
      44              :     pub range: Range<Key>,
      45              : }
      46              : 
      47              : // Calculate the size of a range within the blocks of the same relation, or spanning only the
      48              : // top page in the previous relation's space.
      49      3788848 : pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
      50      3788848 :     debug_assert!(is_contiguous_range(range));
      51      3788848 :     if range.start.field6 == 0xffffffff {
      52       405867 :         range.end.field6 + 1
      53              :     } else {
      54      3382981 :         range.end.field6 - range.start.field6
      55              :     }
      56      3788848 : }
      57              : 
      58              : /// Return true if this key range includes only keys in the same relation's data blocks, or
      59              : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
      60              : ///
      61              : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
      62              : /// be on our shard.  Later in ShardedRange we do the extra work to figure out how much
      63              : /// of a given contiguous range is present on one shard.
      64              : ///
      65              : /// This matters, because:
      66              : /// - Within such ranges, keys are used contiguously.  Outside such ranges it is sparse.
      67              : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
      68      7580871 : pub fn is_contiguous_range(range: &Range<Key>) -> bool {
      69      7580871 :     range.start.field1 == range.end.field1
      70      7575709 :         && range.start.field2 == range.end.field2
      71      7575705 :         && range.start.field3 == range.end.field3
      72      7575703 :         && range.start.field4 == range.end.field4
      73      7575702 :         && (range.start.field5 == range.end.field5
      74       811738 :             || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
      75      7580871 : }
      76              : 
      77              : impl<'a> ShardedRange<'a> {
      78         2024 :     pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
      79         2024 :         Self {
      80         2024 :             shard_identity,
      81         2024 :             range,
      82         2024 :         }
      83         2024 :     }
      84              : 
      85              :     /// Break up this range into chunks, each of which has at least one local key in it if the
      86              :     /// total range has at least one local key.
      87         2017 :     pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
      88              :         // Optimization for single-key case (e.g. logical size keys)
      89         2017 :         if self.range.end == self.range.start.add(1) {
      90          837 :             return vec![(
      91          837 :                 if self.shard_identity.is_key_disposable(&self.range.start) {
      92            0 :                     0
      93              :                 } else {
      94          837 :                     1
      95              :                 },
      96          837 :                 self.range,
      97              :             )];
      98         1180 :         }
      99              : 
     100         1180 :         if !is_contiguous_range(&self.range) {
     101              :             // Ranges that span relations are not fragmented.  We only get these ranges as a result
     102              :             // of operations that act on existing layers, so we trust that the existing range is
     103              :             // reasonably small.
     104            2 :             return vec![(u32::MAX, self.range)];
     105         1178 :         }
     106              : 
     107         1178 :         let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
     108              : 
     109         1178 :         let mut cursor = self.range.start;
     110         3423 :         while cursor < self.range.end {
     111         2245 :             let advance_by = self.distance_to_next_boundary(cursor);
     112         2245 :             let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
     113              : 
     114              :             // If the previous fragment is undersized, then we seek to consume enough
     115              :             // blocks to complete it.
     116         2245 :             let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
     117         1067 :                 Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
     118          135 :                 Some(frag) => {
     119              :                     // Prev block is complete, want the full number.
     120              :                     (
     121          135 :                         target_nblocks,
     122          135 :                         if is_fragment_disposable {
     123              :                             // If this current range will be empty (not shard-local data), we will merge into previous
     124            0 :                             Some(frag)
     125              :                         } else {
     126          135 :                             None
     127              :                         },
     128              :                     )
     129              :                 }
     130              :                 None => {
     131              :                     // First iteration, want the full number
     132         1178 :                     (target_nblocks, None)
     133              :                 }
     134              :             };
     135              : 
     136         2245 :             let advance_by = if is_fragment_disposable {
     137         1371 :                 advance_by
     138              :             } else {
     139          874 :                 std::cmp::min(advance_by, want_blocks)
     140              :             };
     141              : 
     142         2245 :             let next_cursor = cursor.add(advance_by);
     143              : 
     144         2245 :             let this_frag = (
     145         2245 :                 if is_fragment_disposable {
     146         1371 :                     0
     147              :                 } else {
     148          874 :                     advance_by
     149              :                 },
     150         2245 :                 cursor..next_cursor,
     151              :             );
     152         2245 :             cursor = next_cursor;
     153              : 
     154         2245 :             if let Some(last_fragment) = merge_last_fragment {
     155          932 :                 // Previous fragment was short or this one is empty, merge into it
     156          932 :                 last_fragment.0 += this_frag.0;
     157          932 :                 last_fragment.1.end = this_frag.1.end;
     158         1313 :             } else {
     159         1313 :                 fragments.push(this_frag);
     160         1313 :             }
     161              :         }
     162              : 
     163         1178 :         fragments
     164         2017 :     }
     165              : 
     166              :     /// Estimate the physical pages that are within this range, on this shard.  This returns
     167              :     /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
     168         1019 :     pub fn page_count(&self) -> u32 {
     169              :         // Special cases for single keys like logical sizes
     170         1019 :         if self.range.end == self.range.start.add(1) {
     171            6 :             return if self.shard_identity.is_key_disposable(&self.range.start) {
     172            3 :                 0
     173              :             } else {
     174            3 :                 1
     175              :             };
     176         1013 :         }
     177              : 
     178              :         // We can only do an authentic calculation of contiguous key ranges
     179         1013 :         if !is_contiguous_range(&self.range) {
     180            4 :             return u32::MAX;
     181         1009 :         }
     182              : 
     183              :         // Special case for single sharded tenants: our logical and physical sizes are the same
     184         1009 :         if self.shard_identity.count < ShardCount::new(2) {
     185          524 :             return contiguous_range_len(&self.range);
     186          485 :         }
     187              : 
     188              :         // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
     189              :         // to Self, and add the stripe's block count to our total if so.
     190          485 :         let mut result: u64 = 0;
     191          485 :         let mut cursor = self.range.start;
     192         1902 :         while cursor < self.range.end {
     193              :             // Count up to the next stripe_size boundary or end of range
     194         1417 :             let advance_by = self.distance_to_next_boundary(cursor);
     195              : 
     196              :             // If this blocks in this stripe belong to us, add them to our count
     197         1417 :             if !self.shard_identity.is_key_disposable(&cursor) {
     198           46 :                 result += advance_by as u64;
     199         1371 :             }
     200              : 
     201         1417 :             cursor = cursor.add(advance_by);
     202              :         }
     203              : 
     204          485 :         if result > u32::MAX as u64 {
     205            0 :             u32::MAX
     206              :         } else {
     207          485 :             result as u32
     208              :         }
     209         1019 :     }
     210              : 
     211              :     /// Advance the cursor to the next potential fragment boundary: this is either
     212              :     /// a stripe boundary, or the end of the range.
     213         3662 :     fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
     214         3662 :         let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
     215              : 
     216         3662 :         if self.shard_identity.count < ShardCount::new(2) {
     217              :             // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
     218          816 :             return distance_to_range_end;
     219         2846 :         }
     220              : 
     221         2846 :         if cursor.field6 == 0xffffffff {
     222              :             // We are wrapping from one relation's logical size to the next relation's first data block
     223            4 :             return 1;
     224         2842 :         }
     225              : 
     226         2842 :         let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
     227         2842 :         let stripe_remainder = self.shard_identity.stripe_size.0
     228         2842 :             - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
     229              : 
     230         2842 :         if cfg!(debug_assertions) {
     231              :             // We should never overflow field5 and field6 -- our callers check this earlier
     232              :             // and would have returned their u32::MAX cases if the input range violated this.
     233         2842 :             let next_cursor = cursor.add(stripe_remainder);
     234         2842 :             debug_assert!(
     235         2842 :                 next_cursor.field1 == cursor.field1
     236         2842 :                     && next_cursor.field2 == cursor.field2
     237         2842 :                     && next_cursor.field3 == cursor.field3
     238         2842 :                     && next_cursor.field4 == cursor.field4
     239         2842 :                     && next_cursor.field5 == cursor.field5
     240              :             )
     241            0 :         }
     242              : 
     243         2842 :         std::cmp::min(stripe_remainder, distance_to_range_end)
     244         3662 :     }
     245              : 
     246              :     /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
     247              :     /// this function simply calculates the number of pages in the space, without accounting for those
     248              :     /// pages that would not actually be stored on this node.
     249              :     ///
     250              :     /// Don't use this function in code that works with physical entities like layer files.
     251      3789826 :     pub fn raw_size(range: &Range<Key>) -> u32 {
     252      3789826 :         if is_contiguous_range(range) {
     253      3784662 :             contiguous_range_len(range)
     254              :         } else {
     255         5164 :             u32::MAX
     256              :         }
     257      3789826 :     }
     258              : }
     259              : 
     260              : impl KeySpace {
     261              :     /// Create a key space with a single range.
     262       315571 :     pub fn single(key_range: Range<Key>) -> Self {
     263       315571 :         Self {
     264       315571 :             ranges: vec![key_range],
     265       315571 :         }
     266       315571 :     }
     267              : 
     268              :     /// Partition a key space into roughly chunks of roughly 'target_size' bytes
     269              :     /// in each partition.
     270              :     ///
     271          169 :     pub fn partition(
     272          169 :         &self,
     273          169 :         shard_identity: &ShardIdentity,
     274          169 :         target_size: u64,
     275          169 :         block_size: u64,
     276          169 :     ) -> KeyPartitioning {
     277          169 :         let target_nblocks = (target_size / block_size) as u32;
     278              : 
     279          169 :         let mut parts = Vec::new();
     280          169 :         let mut current_part = Vec::new();
     281          169 :         let mut current_part_size: usize = 0;
     282         1174 :         for range in &self.ranges {
     283              :             // While doing partitioning, wrap the range in ShardedRange so that our size calculations
     284              :             // will respect shard striping rather than assuming all keys within a range are present.
     285         1005 :             let range = ShardedRange::new(range.clone(), shard_identity);
     286              : 
     287              :             // Chunk up the range into parts that each contain up to target_size local blocks
     288         1007 :             for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
     289              :                 // If appending the next contiguous range in the keyspace to the current
     290              :                 // partition would cause it to be too large, and our current partition
     291              :                 // covers at least one block that is physically present in this shard,
     292              :                 // then start a new partition
     293         1007 :                 if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
     294           12 :                     && current_part_size > 0
     295           12 :                 {
     296           12 :                     parts.push(KeySpace {
     297           12 :                         ranges: current_part,
     298           12 :                     });
     299           12 :                     current_part = Vec::new();
     300           12 :                     current_part_size = 0;
     301          995 :                 }
     302         1007 :                 current_part.push(frag_range.start..frag_range.end);
     303         1007 :                 current_part_size += frag_on_shard_size as usize;
     304              :             }
     305              :         }
     306              : 
     307              :         // add last partition that wasn't full yet.
     308          169 :         if !current_part.is_empty() {
     309          169 :             parts.push(KeySpace {
     310          169 :                 ranges: current_part,
     311          169 :             });
     312          169 :         }
     313              : 
     314          169 :         KeyPartitioning { parts }
     315          169 :     }
     316              : 
     317      1310012 :     pub fn is_empty(&self) -> bool {
     318      1310012 :         self.total_raw_size() == 0
     319      1310012 :     }
     320              : 
     321              :     /// Merge another keyspace into the current one.
     322              :     /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
     323       489241 :     pub fn merge(&mut self, other: &KeySpace) {
     324       489241 :         let all_ranges = self
     325       489241 :             .ranges
     326       489241 :             .iter()
     327       489241 :             .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
     328              : 
     329       489241 :         let mut accum = KeySpaceAccum::new();
     330       489241 :         let mut prev: Option<&Range<Key>> = None;
     331      1151111 :         for range in all_ranges {
     332       661870 :             if let Some(prev) = prev {
     333       301088 :                 let overlap =
     334       301088 :                     std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
     335       301088 :                 assert!(
     336       301088 :                     !overlap,
     337            0 :                     "Attempt to merge ovelapping keyspaces: {prev:?} overlaps {range:?}"
     338              :                 );
     339       360782 :             }
     340              : 
     341       661870 :             accum.add_range(range.clone());
     342       661870 :             prev = Some(range);
     343              :         }
     344              : 
     345       489241 :         self.ranges = accum.to_keyspace().ranges;
     346       489241 :     }
     347              : 
     348              :     /// Remove all keys in `other` from `self`.
     349              :     /// This can involve splitting or removing of existing ranges.
     350              :     /// Returns the removed keyspace
     351      1562266 :     pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
     352      1562266 :         let (self_start, self_end) = match (self.start(), self.end()) {
     353      1133093 :             (Some(start), Some(end)) => (start, end),
     354              :             _ => {
     355              :                 // self is empty
     356       429173 :                 return KeySpace::default();
     357              :             }
     358              :         };
     359              : 
     360              :         // Key spaces are sorted by definition, so skip ahead to the first
     361              :         // potentially intersecting range. Similarly, ignore ranges that start
     362              :         // after the current keyspace ends.
     363      1133093 :         let other_ranges = other
     364      1133093 :             .ranges
     365      1133093 :             .iter()
     366      1133093 :             .skip_while(|range| self_start >= range.end)
     367      1133093 :             .take_while(|range| self_end > range.start);
     368              : 
     369      1133093 :         let mut removed_accum = KeySpaceRandomAccum::new();
     370      1811183 :         for range in other_ranges {
     371      1338647 :             while let Some(overlap_at) = self.overlaps_at(range) {
     372       660557 :                 let overlapped = self.ranges[overlap_at].clone();
     373              : 
     374       660557 :                 if overlapped.start < range.start && overlapped.end <= range.end {
     375         1823 :                     // Higher part of the range is completely overlapped.
     376         1823 :                     removed_accum.add_range(range.start..self.ranges[overlap_at].end);
     377         1823 :                     self.ranges[overlap_at].end = range.start;
     378       658734 :                 }
     379       660557 :                 if overlapped.start >= range.start && overlapped.end > range.end {
     380         1968 :                     // Lower part of the range is completely overlapped.
     381         1968 :                     removed_accum.add_range(self.ranges[overlap_at].start..range.end);
     382         1968 :                     self.ranges[overlap_at].start = range.end;
     383       658589 :                 }
     384       660557 :                 if overlapped.start < range.start && overlapped.end > range.end {
     385          628 :                     // Middle part of the range is overlapped.
     386          628 :                     removed_accum.add_range(range.clone());
     387          628 :                     self.ranges[overlap_at].end = range.start;
     388          628 :                     self.ranges
     389          628 :                         .insert(overlap_at + 1, range.end..overlapped.end);
     390       659929 :                 }
     391       660557 :                 if overlapped.start >= range.start && overlapped.end <= range.end {
     392       656138 :                     // Whole range is overlapped
     393       656138 :                     removed_accum.add_range(self.ranges[overlap_at].clone());
     394       656138 :                     self.ranges.remove(overlap_at);
     395       656138 :                 }
     396              :             }
     397              :         }
     398              : 
     399      1133093 :         removed_accum.to_keyspace()
     400      1562266 :     }
     401              : 
     402      1562271 :     pub fn start(&self) -> Option<Key> {
     403      1562271 :         self.ranges.first().map(|range| range.start)
     404      1562271 :     }
     405              : 
     406      1562431 :     pub fn end(&self) -> Option<Key> {
     407      1562431 :         self.ranges.last().map(|range| range.end)
     408      1562431 :     }
     409              : 
     410              :     /// The size of the keyspace in pages, before accounting for sharding
     411      1320906 :     pub fn total_raw_size(&self) -> usize {
     412      1320906 :         self.ranges
     413      1320906 :             .iter()
     414      1320906 :             .map(|range| ShardedRange::raw_size(range) as usize)
     415      1320906 :             .sum()
     416      1320906 :     }
     417              : 
     418      1761209 :     fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
     419      1761209 :         match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
     420         4538 :             Ok(0) => None,
     421       818468 :             Err(0) => None,
     422         2299 :             Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     423       935904 :             Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     424       203930 :             _ => None,
     425              :         }
     426      1761209 :     }
     427              : 
     428              :     ///
     429              :     /// Check if key space contains overlapping range
     430              :     ///
     431       422562 :     pub fn overlaps(&self, range: &Range<Key>) -> bool {
     432       422562 :         self.overlaps_at(range).is_some()
     433       422562 :     }
     434              : 
     435              :     /// Check if the keyspace contains a key
     436       421655 :     pub fn contains(&self, key: &Key) -> bool {
     437       421655 :         self.overlaps(&(*key..key.next()))
     438       421655 :     }
     439              : }
     440              : 
     441              : ///
     442              : /// Represents a partitioning of the key space.
     443              : ///
     444              : /// The only kind of partitioning we do is to partition the key space into
     445              : /// partitions that are roughly equal in physical size (see KeySpace::partition).
     446              : /// But this data structure could represent any partitioning.
     447              : ///
     448              : #[derive(Clone, Debug, Default)]
     449              : pub struct KeyPartitioning {
     450              :     pub parts: Vec<KeySpace>,
     451              : }
     452              : 
     453              : /// Represents a partitioning of the sparse key space.
     454              : #[derive(Clone, Debug, Default)]
     455              : pub struct SparseKeyPartitioning {
     456              :     pub parts: Vec<SparseKeySpace>,
     457              : }
     458              : 
     459              : impl KeyPartitioning {
     460          470 :     pub fn new() -> Self {
     461          470 :         KeyPartitioning { parts: Vec::new() }
     462          470 :     }
     463              : 
     464              :     /// Convert a key partitioning to a sparse partition.
     465          235 :     pub fn into_sparse(self) -> SparseKeyPartitioning {
     466          235 :         SparseKeyPartitioning {
     467          235 :             parts: self.parts.into_iter().map(SparseKeySpace).collect(),
     468          235 :         }
     469          235 :     }
     470              : }
     471              : 
     472              : impl SparseKeyPartitioning {
     473              :     /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
     474              :     /// cause long/dead loops.
     475          191 :     pub fn into_dense(self) -> KeyPartitioning {
     476              :         KeyPartitioning {
     477          191 :             parts: self.parts.into_iter().map(|x| x.0).collect(),
     478              :         }
     479          191 :     }
     480              : }
     481              : 
     482              : ///
     483              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     484              : /// object. This takes care of merging adjacent keys and key ranges into
     485              : /// contiguous ranges.
     486              : ///
     487              : #[derive(Clone, Debug, Default)]
     488              : pub struct KeySpaceAccum {
     489              :     accum: Option<Range<Key>>,
     490              : 
     491              :     ranges: Vec<Range<Key>>,
     492              :     size: u64,
     493              : }
     494              : 
     495              : impl KeySpaceAccum {
     496       544357 :     pub fn new() -> Self {
     497       544357 :         Self {
     498       544357 :             accum: None,
     499       544357 :             ranges: Vec::new(),
     500       544357 :             size: 0,
     501       544357 :         }
     502       544357 :     }
     503              : 
     504              :     #[inline(always)]
     505      2071653 :     pub fn add_key(&mut self, key: Key) {
     506      2071653 :         self.add_range(singleton_range(key))
     507            0 :     }
     508              : 
     509              :     #[inline(always)]
     510      3205688 :     pub fn add_range(&mut self, range: Range<Key>) {
     511      3205688 :         self.size += ShardedRange::raw_size(&range) as u64;
     512              : 
     513      3205688 :         match self.accum.as_mut() {
     514      2369127 :             Some(accum) => {
     515      2369127 :                 if range.start == accum.end {
     516      2081712 :                     accum.end = range.end;
     517      2081712 :                 } else {
     518              :                     // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
     519              :                     // a new range here if the skipped region was all keys that don't belong on this shard.
     520              :                     // (https://github.com/neondatabase/neon/issues/6247)
     521       287415 :                     assert!(range.start > accum.end);
     522       287415 :                     self.ranges.push(accum.clone());
     523       287415 :                     *accum = range;
     524              :                 }
     525              :             }
     526       836561 :             None => self.accum = Some(range),
     527              :         }
     528       661876 :     }
     529              : 
     530       965015 :     pub fn to_keyspace(mut self) -> KeySpace {
     531       965015 :         if let Some(accum) = self.accum.take() {
     532       836554 :             self.ranges.push(accum);
     533       836554 :         }
     534       965015 :         KeySpace {
     535       965015 :             ranges: self.ranges,
     536       965015 :         }
     537       965015 :     }
     538              : 
     539          702 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     540          702 :         std::mem::take(self).to_keyspace()
     541          702 :     }
     542              : 
     543              :     // The total number of keys in this object, ignoring any sharding effects that might cause some of
     544              :     // the keys to be omitted in storage on this shard.
     545         1520 :     pub fn raw_size(&self) -> u64 {
     546         1520 :         self.size
     547         1520 :     }
     548              : }
     549              : 
     550              : ///
     551              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     552              : /// object. Key ranges may be inserted in any order and can overlap.
     553              : ///
     554              : #[derive(Clone, Debug, Default)]
     555              : pub struct KeySpaceRandomAccum {
     556              :     ranges: Vec<Range<Key>>,
     557              : }
     558              : 
     559              : impl KeySpaceRandomAccum {
     560      3660394 :     pub fn new() -> Self {
     561      3660394 :         Self { ranges: Vec::new() }
     562      3660394 :     }
     563              : 
     564       372857 :     pub fn add_key(&mut self, key: Key) {
     565       372857 :         self.add_range(singleton_range(key))
     566       372857 :     }
     567              : 
     568      1578352 :     pub fn add_range(&mut self, range: Range<Key>) {
     569      1578352 :         self.ranges.push(range);
     570      1578352 :     }
     571              : 
     572       503350 :     pub fn add_keyspace(&mut self, keyspace: KeySpace) {
     573       991215 :         for range in keyspace.ranges {
     574       487865 :             self.add_range(range);
     575       487865 :         }
     576       503350 :     }
     577              : 
     578      2496144 :     pub fn to_keyspace(mut self) -> KeySpace {
     579      2496144 :         let mut ranges = Vec::new();
     580      2496144 :         if !self.ranges.is_empty() {
     581      1446243 :             self.ranges.sort_by_key(|r| r.start);
     582      1446243 :             let mut start = self.ranges.first().unwrap().start;
     583      1446243 :             let mut end = self.ranges.first().unwrap().end;
     584      3024571 :             for r in self.ranges {
     585      1578328 :                 assert!(r.start >= start);
     586      1578328 :                 if r.start > end {
     587        57951 :                     ranges.push(start..end);
     588        57951 :                     start = r.start;
     589        57951 :                     end = r.end;
     590      1520377 :                 } else if r.end > end {
     591        31815 :                     end = r.end;
     592      1488562 :                 }
     593              :             }
     594      1446243 :             ranges.push(start..end);
     595      1049901 :         }
     596      2496144 :         KeySpace { ranges }
     597      2496144 :     }
     598              : 
     599      1319986 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     600      1319986 :         let mut prev_accum = KeySpaceRandomAccum::new();
     601      1319986 :         std::mem::swap(self, &mut prev_accum);
     602              : 
     603      1319986 :         prev_accum.to_keyspace()
     604      1319986 :     }
     605              : }
     606              : 
     607      2444510 : pub fn singleton_range(key: Key) -> Range<Key> {
     608      2444510 :     key..key.next()
     609      2444510 : }
     610              : 
     611              : #[cfg(test)]
     612              : mod tests {
     613              :     use std::fmt::Write;
     614              : 
     615              :     use rand::{RngCore, SeedableRng};
     616              : 
     617              :     use super::*;
     618              :     use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber, ShardStripeSize};
     619              : 
     620              :     // Helper function to create a key range.
     621              :     //
     622              :     // Make the tests below less verbose.
     623           46 :     fn kr(irange: Range<i128>) -> Range<Key> {
     624           46 :         Key::from_i128(irange.start)..Key::from_i128(irange.end)
     625           46 :     }
     626              : 
     627              :     #[allow(dead_code)]
     628            0 :     fn dump_keyspace(ks: &KeySpace) {
     629            0 :         for r in ks.ranges.iter() {
     630            0 :             println!("  {}..{}", r.start.to_i128(), r.end.to_i128());
     631            0 :         }
     632            0 :     }
     633              : 
     634           11 :     fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
     635           11 :         if actual.ranges != expected {
     636            0 :             let mut msg = String::new();
     637              : 
     638            0 :             writeln!(msg, "expected:").unwrap();
     639            0 :             for r in &expected {
     640            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     641            0 :             }
     642            0 :             writeln!(msg, "got:").unwrap();
     643            0 :             for r in &actual.ranges {
     644            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     645            0 :             }
     646            0 :             panic!("{}", msg);
     647           11 :         }
     648           11 :     }
     649              : 
     650              :     #[test]
     651            1 :     fn keyspace_consume() {
     652            1 :         let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
     653              : 
     654            1 :         let mut accum = KeySpaceAccum::new();
     655            4 :         for range in &ranges {
     656            3 :             accum.add_range(range.clone());
     657            3 :         }
     658              : 
     659            1 :         let expected_size: u64 = ranges
     660            1 :             .iter()
     661            3 :             .map(|r| ShardedRange::raw_size(r) as u64)
     662            1 :             .sum();
     663            1 :         assert_eq!(accum.raw_size(), expected_size);
     664              : 
     665            1 :         assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
     666            1 :         assert_eq!(accum.raw_size(), 0);
     667              : 
     668            1 :         assert_ks_eq(&accum.consume_keyspace(), vec![]);
     669            1 :         assert_eq!(accum.raw_size(), 0);
     670              : 
     671            4 :         for range in &ranges {
     672            3 :             accum.add_range(range.clone());
     673            3 :         }
     674            1 :         assert_ks_eq(&accum.to_keyspace(), ranges);
     675            1 :     }
     676              : 
     677              :     #[test]
     678            1 :     fn keyspace_add_range() {
     679              :         // two separate ranges
     680              :         //
     681              :         // #####
     682              :         //         #####
     683            1 :         let mut ks = KeySpaceRandomAccum::default();
     684            1 :         ks.add_range(kr(0..10));
     685            1 :         ks.add_range(kr(20..30));
     686            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
     687              : 
     688              :         // two separate ranges, added in reverse order
     689              :         //
     690              :         //         #####
     691              :         // #####
     692            1 :         let mut ks = KeySpaceRandomAccum::default();
     693            1 :         ks.add_range(kr(20..30));
     694            1 :         ks.add_range(kr(0..10));
     695              : 
     696              :         // add range that is adjacent to the end of an existing range
     697              :         //
     698              :         // #####
     699              :         //      #####
     700            1 :         ks.add_range(kr(0..10));
     701            1 :         ks.add_range(kr(10..30));
     702            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     703              : 
     704              :         // add range that is adjacent to the start of an existing range
     705              :         //
     706              :         //      #####
     707              :         // #####
     708            1 :         let mut ks = KeySpaceRandomAccum::default();
     709            1 :         ks.add_range(kr(10..30));
     710            1 :         ks.add_range(kr(0..10));
     711            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     712              : 
     713              :         // add range that overlaps with the end of an existing range
     714              :         //
     715              :         // #####
     716              :         //    #####
     717            1 :         let mut ks = KeySpaceRandomAccum::default();
     718            1 :         ks.add_range(kr(0..10));
     719            1 :         ks.add_range(kr(5..30));
     720            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     721              : 
     722              :         // add range that overlaps with the start of an existing range
     723              :         //
     724              :         //    #####
     725              :         // #####
     726            1 :         let mut ks = KeySpaceRandomAccum::default();
     727            1 :         ks.add_range(kr(5..30));
     728            1 :         ks.add_range(kr(0..10));
     729            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     730              : 
     731              :         // add range that is fully covered by an existing range
     732              :         //
     733              :         // #########
     734              :         //   #####
     735            1 :         let mut ks = KeySpaceRandomAccum::default();
     736            1 :         ks.add_range(kr(0..30));
     737            1 :         ks.add_range(kr(10..20));
     738            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     739              : 
     740              :         // add range that extends an existing range from both ends
     741              :         //
     742              :         //   #####
     743              :         // #########
     744            1 :         let mut ks = KeySpaceRandomAccum::default();
     745            1 :         ks.add_range(kr(10..20));
     746            1 :         ks.add_range(kr(0..30));
     747            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     748              : 
     749              :         // add a range that overlaps with two existing ranges, joining them
     750              :         //
     751              :         // #####   #####
     752              :         //    #######
     753            1 :         let mut ks = KeySpaceRandomAccum::default();
     754            1 :         ks.add_range(kr(0..10));
     755            1 :         ks.add_range(kr(20..30));
     756            1 :         ks.add_range(kr(5..25));
     757            1 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     758            1 :     }
     759              : 
     760              :     #[test]
     761            1 :     fn keyspace_overlaps() {
     762            1 :         let mut ks = KeySpaceRandomAccum::default();
     763            1 :         ks.add_range(kr(10..20));
     764            1 :         ks.add_range(kr(30..40));
     765            1 :         let ks = ks.to_keyspace();
     766              : 
     767              :         //        #####      #####
     768              :         // xxxx
     769            1 :         assert!(!ks.overlaps(&kr(0..5)));
     770              : 
     771              :         //        #####      #####
     772              :         //   xxxx
     773            1 :         assert!(!ks.overlaps(&kr(5..9)));
     774              : 
     775              :         //        #####      #####
     776              :         //    xxxx
     777            1 :         assert!(!ks.overlaps(&kr(5..10)));
     778              : 
     779              :         //        #####      #####
     780              :         //     xxxx
     781            1 :         assert!(ks.overlaps(&kr(5..11)));
     782              : 
     783              :         //        #####      #####
     784              :         //        xxxx
     785            1 :         assert!(ks.overlaps(&kr(10..15)));
     786              : 
     787              :         //        #####      #####
     788              :         //         xxxx
     789            1 :         assert!(ks.overlaps(&kr(15..20)));
     790              : 
     791              :         //        #####      #####
     792              :         //           xxxx
     793            1 :         assert!(ks.overlaps(&kr(15..25)));
     794              : 
     795              :         //        #####      #####
     796              :         //              xxxx
     797            1 :         assert!(!ks.overlaps(&kr(22..28)));
     798              : 
     799              :         //        #####      #####
     800              :         //               xxxx
     801            1 :         assert!(!ks.overlaps(&kr(25..30)));
     802              : 
     803              :         //        #####      #####
     804              :         //                      xxxx
     805            1 :         assert!(ks.overlaps(&kr(35..35)));
     806              : 
     807              :         //        #####      #####
     808              :         //                        xxxx
     809            1 :         assert!(!ks.overlaps(&kr(40..45)));
     810              : 
     811              :         //        #####      #####
     812              :         //                        xxxx
     813            1 :         assert!(!ks.overlaps(&kr(45..50)));
     814              : 
     815              :         //        #####      #####
     816              :         //        xxxxxxxxxxx
     817            1 :         assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
     818            1 :     }
     819              : 
     820              :     #[test]
     821            1 :     fn test_remove_full_overlapps() {
     822            1 :         let mut key_space1 = KeySpace {
     823            1 :             ranges: vec![
     824            1 :                 Key::from_i128(1)..Key::from_i128(4),
     825            1 :                 Key::from_i128(5)..Key::from_i128(8),
     826            1 :                 Key::from_i128(10)..Key::from_i128(12),
     827            1 :             ],
     828            1 :         };
     829            1 :         let key_space2 = KeySpace {
     830            1 :             ranges: vec![
     831            1 :                 Key::from_i128(2)..Key::from_i128(3),
     832            1 :                 Key::from_i128(6)..Key::from_i128(7),
     833            1 :                 Key::from_i128(11)..Key::from_i128(13),
     834            1 :             ],
     835            1 :         };
     836            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     837            1 :         let removed_expected = KeySpace {
     838            1 :             ranges: vec![
     839            1 :                 Key::from_i128(2)..Key::from_i128(3),
     840            1 :                 Key::from_i128(6)..Key::from_i128(7),
     841            1 :                 Key::from_i128(11)..Key::from_i128(12),
     842            1 :             ],
     843            1 :         };
     844            1 :         assert_eq!(removed, removed_expected);
     845              : 
     846            1 :         assert_eq!(
     847              :             key_space1.ranges,
     848            1 :             vec![
     849            1 :                 Key::from_i128(1)..Key::from_i128(2),
     850            1 :                 Key::from_i128(3)..Key::from_i128(4),
     851            1 :                 Key::from_i128(5)..Key::from_i128(6),
     852            1 :                 Key::from_i128(7)..Key::from_i128(8),
     853            1 :                 Key::from_i128(10)..Key::from_i128(11)
     854              :             ]
     855              :         );
     856            1 :     }
     857              : 
     858              :     #[test]
     859            1 :     fn test_remove_partial_overlaps() {
     860              :         // Test partial ovelaps
     861            1 :         let mut key_space1 = KeySpace {
     862            1 :             ranges: vec![
     863            1 :                 Key::from_i128(1)..Key::from_i128(5),
     864            1 :                 Key::from_i128(7)..Key::from_i128(10),
     865            1 :                 Key::from_i128(12)..Key::from_i128(15),
     866            1 :             ],
     867            1 :         };
     868            1 :         let key_space2 = KeySpace {
     869            1 :             ranges: vec![
     870            1 :                 Key::from_i128(3)..Key::from_i128(6),
     871            1 :                 Key::from_i128(8)..Key::from_i128(11),
     872            1 :                 Key::from_i128(14)..Key::from_i128(17),
     873            1 :             ],
     874            1 :         };
     875              : 
     876            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     877            1 :         let removed_expected = KeySpace {
     878            1 :             ranges: vec![
     879            1 :                 Key::from_i128(3)..Key::from_i128(5),
     880            1 :                 Key::from_i128(8)..Key::from_i128(10),
     881            1 :                 Key::from_i128(14)..Key::from_i128(15),
     882            1 :             ],
     883            1 :         };
     884            1 :         assert_eq!(removed, removed_expected);
     885              : 
     886            1 :         assert_eq!(
     887              :             key_space1.ranges,
     888            1 :             vec![
     889            1 :                 Key::from_i128(1)..Key::from_i128(3),
     890            1 :                 Key::from_i128(7)..Key::from_i128(8),
     891            1 :                 Key::from_i128(12)..Key::from_i128(14),
     892              :             ]
     893              :         );
     894            1 :     }
     895              : 
     896              :     #[test]
     897            1 :     fn test_remove_no_overlaps() {
     898            1 :         let mut key_space1 = KeySpace {
     899            1 :             ranges: vec![
     900            1 :                 Key::from_i128(1)..Key::from_i128(5),
     901            1 :                 Key::from_i128(7)..Key::from_i128(10),
     902            1 :                 Key::from_i128(12)..Key::from_i128(15),
     903            1 :             ],
     904            1 :         };
     905            1 :         let key_space2 = KeySpace {
     906            1 :             ranges: vec![
     907            1 :                 Key::from_i128(6)..Key::from_i128(7),
     908            1 :                 Key::from_i128(11)..Key::from_i128(12),
     909            1 :                 Key::from_i128(15)..Key::from_i128(17),
     910            1 :             ],
     911            1 :         };
     912              : 
     913            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     914            1 :         let removed_expected = KeySpace::default();
     915            1 :         assert_eq!(removed, removed_expected);
     916              : 
     917            1 :         assert_eq!(
     918              :             key_space1.ranges,
     919            1 :             vec![
     920            1 :                 Key::from_i128(1)..Key::from_i128(5),
     921            1 :                 Key::from_i128(7)..Key::from_i128(10),
     922            1 :                 Key::from_i128(12)..Key::from_i128(15),
     923              :             ]
     924              :         );
     925            1 :     }
     926              : 
     927              :     #[test]
     928            1 :     fn test_remove_one_range_overlaps_multiple() {
     929            1 :         let mut key_space1 = KeySpace {
     930            1 :             ranges: vec![
     931            1 :                 Key::from_i128(1)..Key::from_i128(3),
     932            1 :                 Key::from_i128(3)..Key::from_i128(6),
     933            1 :                 Key::from_i128(6)..Key::from_i128(10),
     934            1 :                 Key::from_i128(12)..Key::from_i128(15),
     935            1 :                 Key::from_i128(17)..Key::from_i128(20),
     936            1 :                 Key::from_i128(20)..Key::from_i128(30),
     937            1 :                 Key::from_i128(30)..Key::from_i128(40),
     938            1 :             ],
     939            1 :         };
     940            1 :         let key_space2 = KeySpace {
     941            1 :             ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
     942            1 :         };
     943              : 
     944            1 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     945            1 :         let removed_expected = KeySpace {
     946            1 :             ranges: vec![
     947            1 :                 Key::from_i128(9)..Key::from_i128(10),
     948            1 :                 Key::from_i128(12)..Key::from_i128(15),
     949            1 :                 Key::from_i128(17)..Key::from_i128(19),
     950            1 :             ],
     951            1 :         };
     952            1 :         assert_eq!(removed, removed_expected);
     953              : 
     954            1 :         assert_eq!(
     955              :             key_space1.ranges,
     956            1 :             vec![
     957            1 :                 Key::from_i128(1)..Key::from_i128(3),
     958            1 :                 Key::from_i128(3)..Key::from_i128(6),
     959            1 :                 Key::from_i128(6)..Key::from_i128(9),
     960            1 :                 Key::from_i128(19)..Key::from_i128(20),
     961            1 :                 Key::from_i128(20)..Key::from_i128(30),
     962            1 :                 Key::from_i128(30)..Key::from_i128(40),
     963              :             ]
     964              :         );
     965            1 :     }
     966              :     #[test]
     967            1 :     fn sharded_range_relation_gap() {
     968            1 :         let shard_identity =
     969            1 :             ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
     970              : 
     971            1 :         let range = ShardedRange::new(
     972            1 :             Range {
     973            1 :                 start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
     974            1 :                 end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
     975            1 :             },
     976            1 :             &shard_identity,
     977              :         );
     978              : 
     979              :         // Key range spans relations, expect MAX
     980            1 :         assert_eq!(range.page_count(), u32::MAX);
     981            1 :     }
     982              : 
     983              :     #[test]
     984            1 :     fn shard_identity_keyspaces_single_key() {
     985            1 :         let shard_identity =
     986            1 :             ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
     987              : 
     988            1 :         let range = ShardedRange::new(
     989            1 :             Range {
     990            1 :                 start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
     991            1 :                 end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
     992            1 :             },
     993            1 :             &shard_identity,
     994              :         );
     995              :         // Single-key range on logical size key
     996            1 :         assert_eq!(range.page_count(), 1);
     997            1 :     }
     998              : 
     999              :     /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
    1000              :     #[test]
    1001            1 :     fn contiguous_range_check() {
    1002            1 :         assert!(!is_contiguous_range(
    1003            1 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1004            1 :                 ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
    1005            1 :         ),);
    1006              : 
    1007              :         // The ranges goes all the way up to the 0xffffffff, including it: this is
    1008              :         // not considered a rel block range because 0xffffffff stores logical sizes,
    1009              :         // not blocks.
    1010            1 :         assert!(!is_contiguous_range(
    1011            1 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1012            1 :                 ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
    1013            1 :         ),);
    1014              : 
    1015              :         // Keys within the normal data region of a relation
    1016            1 :         assert!(is_contiguous_range(
    1017            1 :             &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
    1018            1 :                 ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
    1019              :         ),);
    1020              : 
    1021              :         // The logical size key of one forkno, then some blocks in the next
    1022            1 :         assert!(is_contiguous_range(
    1023            1 :             &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
    1024            1 :                 ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
    1025              :         ),);
    1026            1 :     }
    1027              : 
    1028              :     #[test]
    1029            1 :     fn shard_identity_keyspaces_forkno_gap() {
    1030            1 :         let shard_identity =
    1031            1 :             ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
    1032              : 
    1033            1 :         let range = ShardedRange::new(
    1034            1 :             Range {
    1035            1 :                 start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
    1036            1 :                 end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
    1037            1 :             },
    1038            1 :             &shard_identity,
    1039              :         );
    1040              : 
    1041              :         // Range spanning the end of one forkno and the start of the next: we do not attempt to
    1042              :         // calculate a valid size, because we have no way to know if they keys between start
    1043              :         // and end are actually in use.
    1044            1 :         assert_eq!(range.page_count(), u32::MAX);
    1045            1 :     }
    1046              : 
    1047              :     #[test]
    1048            1 :     fn shard_identity_keyspaces_one_relation() {
    1049            5 :         for shard_number in 0..4 {
    1050            4 :             let shard_identity = ShardIdentity::new(
    1051            4 :                 ShardNumber(shard_number),
    1052            4 :                 ShardCount::new(4),
    1053              :                 DEFAULT_STRIPE_SIZE,
    1054              :             )
    1055            4 :             .unwrap();
    1056              : 
    1057            4 :             let range = ShardedRange::new(
    1058            4 :                 Range {
    1059            4 :                     start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
    1060            4 :                     end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
    1061            4 :                 },
    1062            4 :                 &shard_identity,
    1063              :             );
    1064              : 
    1065              :             // Very simple case: range covering block zero of one relation, where that block maps to shard zero
    1066            4 :             if shard_number == 0 {
    1067            1 :                 assert_eq!(range.page_count(), 1);
    1068              :             } else {
    1069              :                 // Other shards should perceive the range's size as zero
    1070            3 :                 assert_eq!(range.page_count(), 0);
    1071              :             }
    1072              :         }
    1073            1 :     }
    1074              : 
    1075              :     /// Test helper: construct a ShardedRange and call fragment() on it, returning
    1076              :     /// the total page count in the range and the fragments.
    1077         1012 :     fn do_fragment(
    1078         1012 :         range_start: Key,
    1079         1012 :         range_end: Key,
    1080         1012 :         shard_identity: &ShardIdentity,
    1081         1012 :         target_nblocks: u32,
    1082         1012 :     ) -> (u32, Vec<(u32, Range<Key>)>) {
    1083         1012 :         let range = ShardedRange::new(
    1084         1012 :             Range {
    1085         1012 :                 start: range_start,
    1086         1012 :                 end: range_end,
    1087         1012 :             },
    1088         1012 :             shard_identity,
    1089              :         );
    1090              : 
    1091         1012 :         let page_count = range.page_count();
    1092         1012 :         let fragments = range.fragment(target_nblocks);
    1093              : 
    1094              :         // Invariant: we always get at least one fragment
    1095         1012 :         assert!(!fragments.is_empty());
    1096              : 
    1097              :         // Invariant: the first/last fragment start/end should equal the input start/end
    1098         1012 :         assert_eq!(fragments.first().unwrap().1.start, range_start);
    1099         1012 :         assert_eq!(fragments.last().unwrap().1.end, range_end);
    1100              : 
    1101         1012 :         if page_count > 0 {
    1102              :             // Invariant: every fragment must contain at least one shard-local page, if the
    1103              :             // total range contains at least one shard-local page
    1104          702 :             let all_nonzero = fragments.iter().all(|f| f.0 > 0);
    1105          569 :             if !all_nonzero {
    1106            0 :                 eprintln!("Found a zero-length fragment: {fragments:?}");
    1107          569 :             }
    1108          569 :             assert!(all_nonzero);
    1109              :         } else {
    1110              :             // A range with no shard-local pages should always be returned as a single fragment
    1111          443 :             assert_eq!(fragments, vec![(0, range_start..range_end)]);
    1112              :         }
    1113              : 
    1114              :         // Invariant: fragments must be ordered and non-overlapping
    1115         1012 :         let mut last: Option<Range<Key>> = None;
    1116         2157 :         for frag in &fragments {
    1117         1145 :             if let Some(last) = last {
    1118          133 :                 assert!(frag.1.start >= last.end);
    1119          133 :                 assert!(frag.1.start > last.start);
    1120         1012 :             }
    1121         1145 :             last = Some(frag.1.clone())
    1122              :         }
    1123              : 
    1124              :         // Invariant: fragments respect target_nblocks
    1125         2157 :         for frag in &fragments {
    1126         1145 :             assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
    1127              :         }
    1128              : 
    1129         1012 :         (page_count, fragments)
    1130         1012 :     }
    1131              : 
    1132              :     /// Really simple tests for fragment(), on a range that just contains a single stripe
    1133              :     /// for a single tenant.
    1134              :     #[test]
    1135            1 :     fn sharded_range_fragment_simple() {
    1136              :         const SHARD_COUNT: u8 = 4;
    1137              :         const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
    1138              : 
    1139            1 :         let shard_identity = ShardIdentity::new(
    1140            1 :             ShardNumber(0),
    1141            1 :             ShardCount::new(SHARD_COUNT),
    1142            1 :             ShardStripeSize(STRIPE_SIZE),
    1143              :         )
    1144            1 :         .unwrap();
    1145              : 
    1146              :         // A range which we happen to know covers exactly one stripe which belongs to this shard
    1147            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1148            1 :         let mut input_end = input_start;
    1149            1 :         input_end.field6 += STRIPE_SIZE; // field6 is block number
    1150              : 
    1151              :         // Ask for stripe_size blocks, we get the whole stripe
    1152            1 :         assert_eq!(
    1153            1 :             do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE),
    1154            1 :             (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
    1155              :         );
    1156              : 
    1157              :         // Ask for more, we still get the whole stripe
    1158            1 :         assert_eq!(
    1159            1 :             do_fragment(input_start, input_end, &shard_identity, 10 * STRIPE_SIZE),
    1160            1 :             (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
    1161              :         );
    1162              : 
    1163              :         // Ask for target_nblocks of half the stripe size, we get two halves
    1164            1 :         assert_eq!(
    1165            1 :             do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE / 2),
    1166            1 :             (
    1167            1 :                 STRIPE_SIZE,
    1168            1 :                 vec![
    1169            1 :                     (
    1170            1 :                         STRIPE_SIZE / 2,
    1171            1 :                         input_start..input_start.add(STRIPE_SIZE / 2)
    1172            1 :                     ),
    1173            1 :                     (STRIPE_SIZE / 2, input_start.add(STRIPE_SIZE / 2)..input_end)
    1174            1 :                 ]
    1175            1 :             )
    1176              :         );
    1177            1 :     }
    1178              : 
    1179              :     #[test]
    1180            1 :     fn sharded_range_fragment_multi_stripe() {
    1181              :         const SHARD_COUNT: u8 = 4;
    1182              :         const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
    1183              :         const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
    1184              : 
    1185            1 :         let shard_identity = ShardIdentity::new(
    1186            1 :             ShardNumber(0),
    1187            1 :             ShardCount::new(SHARD_COUNT),
    1188            1 :             ShardStripeSize(STRIPE_SIZE),
    1189              :         )
    1190            1 :         .unwrap();
    1191              : 
    1192              :         // A range which covers multiple stripes, exactly one of which belongs to the current shard.
    1193            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1194            1 :         let mut input_end = input_start;
    1195            1 :         input_end.field6 += RANGE_SIZE; // field6 is block number
    1196              : 
    1197              :         // Ask for all the blocks, get a fragment that covers the whole range but reports
    1198              :         // its size to be just the blocks belonging to our shard.
    1199            1 :         assert_eq!(
    1200            1 :             do_fragment(input_start, input_end, &shard_identity, RANGE_SIZE),
    1201            1 :             (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
    1202              :         );
    1203              : 
    1204              :         // Ask for a sub-stripe quantity that results in 3 fragments.
    1205            1 :         let limit = STRIPE_SIZE / 3 + 1;
    1206            1 :         assert_eq!(
    1207            1 :             do_fragment(input_start, input_end, &shard_identity, limit),
    1208            1 :             (
    1209            1 :                 STRIPE_SIZE,
    1210            1 :                 vec![
    1211            1 :                     (limit, input_start..input_start.add(limit)),
    1212            1 :                     (limit, input_start.add(limit)..input_start.add(2 * limit)),
    1213            1 :                     (
    1214            1 :                         STRIPE_SIZE - 2 * limit,
    1215            1 :                         input_start.add(2 * limit)..input_end
    1216            1 :                     ),
    1217            1 :                 ]
    1218            1 :             )
    1219              :         );
    1220              : 
    1221              :         // Try on a range that starts slightly after our owned stripe
    1222            1 :         assert_eq!(
    1223            1 :             do_fragment(input_start.add(1), input_end, &shard_identity, RANGE_SIZE),
    1224            1 :             (
    1225            1 :                 STRIPE_SIZE - 1,
    1226            1 :                 vec![(STRIPE_SIZE - 1, input_start.add(1)..input_end)]
    1227            1 :             )
    1228              :         );
    1229            1 :     }
    1230              : 
    1231              :     /// Test our calculations work correctly when we start a range from the logical size key of
    1232              :     /// a previous relation.
    1233              :     #[test]
    1234            1 :     fn sharded_range_fragment_starting_from_logical_size() {
    1235              :         const SHARD_COUNT: u8 = 4;
    1236              :         const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
    1237              :         const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
    1238              : 
    1239            1 :         let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
    1240            1 :         let mut input_end = Key::from_hex("000000067f00000001000000ae0100000000").unwrap();
    1241            1 :         input_end.field6 += RANGE_SIZE; // field6 is block number
    1242              : 
    1243              :         // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
    1244            1 :         let shard_identity = ShardIdentity::new(
    1245            1 :             ShardNumber(0),
    1246            1 :             ShardCount::new(SHARD_COUNT),
    1247            1 :             ShardStripeSize(STRIPE_SIZE),
    1248              :         )
    1249            1 :         .unwrap();
    1250            1 :         assert_eq!(
    1251            1 :             do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
    1252            1 :             (
    1253            1 :                 STRIPE_SIZE + 1,
    1254            1 :                 vec![(STRIPE_SIZE + 1, input_start..input_end)]
    1255            1 :             )
    1256              :         );
    1257              : 
    1258              :         // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
    1259              :         // store all logical sizes)
    1260            1 :         let shard_identity = ShardIdentity::new(
    1261            1 :             ShardNumber(1),
    1262            1 :             ShardCount::new(SHARD_COUNT),
    1263            1 :             ShardStripeSize(STRIPE_SIZE),
    1264              :         )
    1265            1 :         .unwrap();
    1266            1 :         assert_eq!(
    1267            1 :             do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
    1268            1 :             (1, vec![(1, input_start..input_end)])
    1269              :         );
    1270            1 :     }
    1271              : 
    1272              :     /// Test that ShardedRange behaves properly when used on un-sharded data
    1273              :     #[test]
    1274            1 :     fn sharded_range_fragment_unsharded() {
    1275            1 :         let shard_identity = ShardIdentity::unsharded();
    1276              : 
    1277            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1278            1 :         let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
    1279            1 :         assert_eq!(
    1280            1 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1281            1 :             (
    1282            1 :                 0x10000,
    1283            1 :                 vec![
    1284            1 :                     (0x8000, input_start..input_start.add(0x8000)),
    1285            1 :                     (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
    1286            1 :                 ]
    1287            1 :             )
    1288              :         );
    1289            1 :     }
    1290              : 
    1291              :     #[test]
    1292            1 :     fn sharded_range_fragment_cross_relation() {
    1293            1 :         let shard_identity = ShardIdentity::unsharded();
    1294              : 
    1295              :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1296            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1297            1 :         let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
    1298            1 :         assert_eq!(
    1299            1 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1300            1 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1301              :         );
    1302              : 
    1303              :         // Same, but using a sharded identity
    1304            1 :         let shard_identity =
    1305            1 :             ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
    1306            1 :         assert_eq!(
    1307            1 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1308            1 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1309              :         );
    1310            1 :     }
    1311              : 
    1312              :     #[test]
    1313            1 :     fn sharded_range_fragment_tiny_nblocks() {
    1314            1 :         let shard_identity = ShardIdentity::unsharded();
    1315              : 
    1316              :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1317            1 :         let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
    1318            1 :         let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
    1319            1 :         assert_eq!(
    1320            1 :             do_fragment(input_start, input_end, &shard_identity, 16),
    1321            1 :             (
    1322            1 :                 0x38,
    1323            1 :                 vec![
    1324            1 :                     (16, input_start..input_start.add(16)),
    1325            1 :                     (16, input_start.add(16)..input_start.add(32)),
    1326            1 :                     (16, input_start.add(32)..input_start.add(48)),
    1327            1 :                     (8, input_start.add(48)..input_end),
    1328            1 :                 ]
    1329            1 :             )
    1330              :         );
    1331            1 :     }
    1332              : 
    1333              :     #[test]
    1334            1 :     fn sharded_range_fragment_fuzz() {
    1335              :         // Use a fixed seed: we don't want to explicitly pick values, but we do want
    1336              :         // the test to be reproducible.
    1337            1 :         let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
    1338              : 
    1339         1001 :         for _i in 0..1000 {
    1340         1000 :             let shard_identity = if prng.next_u32() % 2 == 0 {
    1341          519 :                 ShardIdentity::unsharded()
    1342              :             } else {
    1343          481 :                 let shard_count = prng.next_u32() % 127 + 1;
    1344          481 :                 ShardIdentity::new(
    1345          481 :                     ShardNumber((prng.next_u32() % shard_count) as u8),
    1346          481 :                     ShardCount::new(shard_count as u8),
    1347              :                     DEFAULT_STRIPE_SIZE,
    1348              :                 )
    1349          481 :                 .unwrap()
    1350              :             };
    1351              : 
    1352         1000 :             let target_nblocks = prng.next_u32() % 65536 + 1;
    1353              : 
    1354         1000 :             let start_offset = prng.next_u32() % 16384;
    1355              : 
    1356              :             // Try ranges up to 4GiB in size, that are always at least 1
    1357         1000 :             let range_size = prng.next_u32() % 8192 + 1;
    1358              : 
    1359              :             // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1360         1000 :             let input_start = Key::from_hex("000000067F00000001000004E10000000000")
    1361         1000 :                 .unwrap()
    1362         1000 :                 .add(start_offset);
    1363         1000 :             let input_end = input_start.add(range_size);
    1364              : 
    1365              :             // This test's main success conditions are the invariants baked into do_fragment
    1366         1000 :             let (_total_size, fragments) =
    1367         1000 :                 do_fragment(input_start, input_end, &shard_identity, target_nblocks);
    1368              : 
    1369              :             // Pick a random key within the range and check it appears in the output
    1370         1000 :             let example_key = input_start.add(prng.next_u32() % range_size);
    1371              : 
    1372              :             // Panic on unwrap if it isn't found
    1373         1000 :             let example_key_frag = fragments
    1374         1000 :                 .iter()
    1375         1073 :                 .find(|f| f.1.contains(&example_key))
    1376         1000 :                 .unwrap();
    1377              : 
    1378              :             // Check that the fragment containing our random key has a nonzero size if
    1379              :             // that key is shard-local
    1380         1000 :             let example_key_local = !shard_identity.is_key_disposable(&example_key);
    1381         1000 :             if example_key_local {
    1382          536 :                 assert!(example_key_frag.0 > 0);
    1383          464 :             }
    1384              :         }
    1385            1 :     }
    1386              : }
         |