|             Line data    Source code 
       1              : use postgres_ffi::BLCKSZ;
       2              : use std::ops::Range;
       3              : 
       4              : use crate::{
       5              :     key::Key,
       6              :     shard::{ShardCount, ShardIdentity},
       7              : };
       8              : use itertools::Itertools;
       9              : 
      10              : ///
      11              : /// Represents a set of Keys, in a compact form.
      12              : ///
      13              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      14              : pub struct KeySpace {
      15              :     /// Contiguous ranges of keys that belong to the key space. In key order,
      16              :     /// and with no overlap.
      17              :     pub ranges: Vec<Range<Key>>,
      18              : }
      19              : 
      20              : /// A wrapper type for sparse keyspaces.
      21              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      22              : pub struct SparseKeySpace(pub KeySpace);
      23              : 
      24              : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
      25              : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
      26              : /// shard.
      27              : ///
      28              : /// When we iterate over keys within this object, we will skip any keys that don't belong
      29              : /// to this shard.
      30              : ///
      31              : /// The start + end keys may not belong to the shard: these specify where layer files should
      32              : /// start  + end, but we will never actually read/write those keys.
      33              : #[derive(Clone, Debug, PartialEq, Eq)]
      34              : pub struct ShardedRange<'a> {
      35              :     pub shard_identity: &'a ShardIdentity,
      36              :     pub range: Range<Key>,
      37              : }
      38              : 
      39              : // Calculate the size of a range within the blocks of the same relation, or spanning only the
      40              : // top page in the previous relation's space.
      41      4349692 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
      42      4349692 :     debug_assert!(is_contiguous_range(range));
      43      4349692 :     if range.start.field6 == 0xffffffff {
      44            8 :         range.end.field6 + 1
      45              :     } else {
      46      4349684 :         range.end.field6 - range.start.field6
      47              :     }
      48      4349692 : }
      49              : 
      50              : /// Return true if this key range includes only keys in the same relation's data blocks, or
      51              : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
      52              : ///
      53              : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
      54              : /// be on our shard.  Later in ShardedRange we do the extra work to figure out how much
      55              : /// of a given contiguous range is present on one shard.
      56              : ///
      57              : /// This matters, because:
      58              : /// - Within such ranges, keys are used contiguously.  Outside such ranges it is sparse.
      59              : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
      60      8699334 : fn is_contiguous_range(range: &Range<Key>) -> bool {
      61      8699334 :     range.start.field1 == range.end.field1
      62      8699178 :         && range.start.field2 == range.end.field2
      63      8699150 :         && range.start.field3 == range.end.field3
      64      8699080 :         && range.start.field4 == range.end.field4
      65      8699078 :         && (range.start.field5 == range.end.field5
      66           24 :             || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
      67      8699334 : }
      68              : 
      69              : impl<'a> ShardedRange<'a> {
      70         3580 :     pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
      71         3580 :         Self {
      72         3580 :             shard_identity,
      73         3580 :             range,
      74         3580 :         }
      75         3580 :     }
      76              : 
      77              :     /// Break up this range into chunks, each of which has at least one local key in it if the
      78              :     /// total range has at least one local key.
      79         3566 :     pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
      80         3566 :         // Optimization for single-key case (e.g. logical size keys)
      81         3566 :         if self.range.end == self.range.start.add(1) {
      82         1287 :             return vec![(
      83         1287 :                 if self.shard_identity.is_key_disposable(&self.range.start) {
      84            0 :                     0
      85              :                 } else {
      86         1287 :                     1
      87              :                 },
      88         1287 :                 self.range,
      89              :             )];
      90         2279 :         }
      91         2279 : 
      92         2279 :         if !is_contiguous_range(&self.range) {
      93              :             // Ranges that span relations are not fragmented.  We only get these ranges as a result
      94              :             // of operations that act on existing layers, so we trust that the existing range is
      95              :             // reasonably small.
      96            4 :             return vec![(u32::MAX, self.range)];
      97         2275 :         }
      98         2275 : 
      99         2275 :         let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
     100         2275 : 
     101         2275 :         let mut cursor = self.range.start;
     102         4844 :         while cursor < self.range.end {
     103         2569 :             let advance_by = self.distance_to_next_boundary(cursor);
     104         2569 :             let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
     105              : 
     106              :             // If the previous fragment is undersized, then we seek to consume enough
     107              :             // blocks to complete it.
     108         2569 :             let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
     109          294 :                 Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
     110          272 :                 Some(frag) => {
     111          272 :                     // Prev block is complete, want the full number.
     112          272 :                     (
     113          272 :                         target_nblocks,
     114          272 :                         if is_fragment_disposable {
     115              :                             // If this current range will be empty (not shard-local data), we will merge into previous
     116            0 :                             Some(frag)
     117              :                         } else {
     118          272 :                             None
     119              :                         },
     120              :                     )
     121              :                 }
     122              :                 None => {
     123              :                     // First iteration, want the full number
     124         2275 :                     (target_nblocks, None)
     125              :                 }
     126              :             };
     127              : 
     128         2569 :             let advance_by = if is_fragment_disposable {
     129          936 :                 advance_by
     130              :             } else {
     131         1633 :                 std::cmp::min(advance_by, want_blocks)
     132              :             };
     133              : 
     134         2569 :             let next_cursor = cursor.add(advance_by);
     135              : 
     136         2569 :             let this_frag = (
     137         2569 :                 if is_fragment_disposable {
     138          936 :                     0
     139              :                 } else {
     140         1633 :                     advance_by
     141              :                 },
     142         2569 :                 cursor..next_cursor,
     143         2569 :             );
     144         2569 :             cursor = next_cursor;
     145              : 
     146         2569 :             if let Some(last_fragment) = merge_last_fragment {
     147           22 :                 // Previous fragment was short or this one is empty, merge into it
     148           22 :                 last_fragment.0 += this_frag.0;
     149           22 :                 last_fragment.1.end = this_frag.1.end;
     150         2547 :             } else {
     151         2547 :                 fragments.push(this_frag);
     152         2547 :             }
     153              :         }
     154              : 
     155         2275 :         fragments
     156         3566 :     }
     157              : 
     158              :     /// Estimate the physical pages that are within this range, on this shard.  This returns
     159              :     /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
     160         2038 :     pub fn page_count(&self) -> u32 {
     161         2038 :         // Special cases for single keys like logical sizes
     162         2038 :         if self.range.end == self.range.start.add(1) {
     163           12 :             return if self.shard_identity.is_key_disposable(&self.range.start) {
     164            6 :                 0
     165              :             } else {
     166            6 :                 1
     167              :             };
     168         2026 :         }
     169         2026 : 
     170         2026 :         // We can only do an authentic calculation of contiguous key ranges
     171         2026 :         if !is_contiguous_range(&self.range) {
     172            8 :             return u32::MAX;
     173         2018 :         }
     174         2018 : 
     175         2018 :         // Special case for single sharded tenants: our logical and physical sizes are the same
     176         2018 :         if self.shard_identity.count < ShardCount::new(2) {
     177         1048 :             return contiguous_range_len(&self.range);
     178          970 :         }
     179          970 : 
     180          970 :         // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
     181          970 :         // to Self, and add the stripe's block count to our total if so.
     182          970 :         let mut result: u64 = 0;
     183          970 :         let mut cursor = self.range.start;
     184         1962 :         while cursor < self.range.end {
     185              :             // Count up to the next stripe_size boundary or end of range
     186          992 :             let advance_by = self.distance_to_next_boundary(cursor);
     187          992 : 
     188          992 :             // If this blocks in this stripe belong to us, add them to our count
     189          992 :             if !self.shard_identity.is_key_disposable(&cursor) {
     190           56 :                 result += advance_by as u64;
     191          936 :             }
     192              : 
     193          992 :             cursor = cursor.add(advance_by);
     194              :         }
     195              : 
     196          970 :         if result > u32::MAX as u64 {
     197            0 :             u32::MAX
     198              :         } else {
     199          970 :             result as u32
     200              :         }
     201         2038 :     }
     202              : 
     203              :     /// Advance the cursor to the next potential fragment boundary: this is either
     204              :     /// a stripe boundary, or the end of the range.
     205         3561 :     fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
     206         3561 :         let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
     207         3561 : 
     208         3561 :         if self.shard_identity.count < ShardCount::new(2) {
     209              :             // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
     210         1563 :             return distance_to_range_end;
     211         1998 :         }
     212         1998 : 
     213         1998 :         if cursor.field6 == 0xffffffff {
     214              :             // We are wrapping from one relation's logical size to the next relation's first data block
     215            8 :             return 1;
     216         1990 :         }
     217         1990 : 
     218         1990 :         let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
     219         1990 :         let stripe_remainder = self.shard_identity.stripe_size.0
     220         1990 :             - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
     221         1990 : 
     222         1990 :         if cfg!(debug_assertions) {
     223              :             // We should never overflow field5 and field6 -- our callers check this earlier
     224              :             // and would have returned their u32::MAX cases if the input range violated this.
     225         1990 :             let next_cursor = cursor.add(stripe_remainder);
     226         1990 :             debug_assert!(
     227         1990 :                 next_cursor.field1 == cursor.field1
     228         1990 :                     && next_cursor.field2 == cursor.field2
     229         1990 :                     && next_cursor.field3 == cursor.field3
     230         1990 :                     && next_cursor.field4 == cursor.field4
     231         1990 :                     && next_cursor.field5 == cursor.field5
     232              :             )
     233            0 :         }
     234              : 
     235         1990 :         std::cmp::min(stripe_remainder, distance_to_range_end)
     236         3561 :     }
     237              : 
     238              :     /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
     239              :     /// this function simply calculates the number of pages in the space, without accounting for those
     240              :     /// pages that would not actually be stored on this node.
     241              :     ///
     242              :     /// Don't use this function in code that works with physical entities like layer files.
     243      4345329 :     pub fn raw_size(range: &Range<Key>) -> u32 {
     244      4345329 :         if is_contiguous_range(range) {
     245      4345083 :             contiguous_range_len(range)
     246              :         } else {
     247          246 :             u32::MAX
     248              :         }
     249      4345329 :     }
     250              : }
     251              : 
     252              : impl KeySpace {
     253              :     /// Create a key space with a single range.
     254          132 :     pub fn single(key_range: Range<Key>) -> Self {
     255          132 :         Self {
     256          132 :             ranges: vec![key_range],
     257          132 :         }
     258          132 :     }
     259              : 
     260              :     /// Partition a key space into roughly chunks of roughly 'target_size' bytes
     261              :     /// in each partition.
     262              :     ///
     263          257 :     pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
     264          257 :         // Assume that each value is 8k in size.
     265          257 :         let target_nblocks = (target_size / BLCKSZ as u64) as u32;
     266          257 : 
     267          257 :         let mut parts = Vec::new();
     268          257 :         let mut current_part = Vec::new();
     269          257 :         let mut current_part_size: usize = 0;
     270         1799 :         for range in &self.ranges {
     271              :             // While doing partitioning, wrap the range in ShardedRange so that our size calculations
     272              :             // will respect shard striping rather than assuming all keys within a range are present.
     273         1542 :             let range = ShardedRange::new(range.clone(), shard_identity);
     274              : 
     275              :             // Chunk up the range into parts that each contain up to target_size local blocks
     276         1548 :             for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
     277              :                 // If appending the next contiguous range in the keyspace to the current
     278              :                 // partition would cause it to be too large, and our current partition
     279              :                 // covers at least one block that is physically present in this shard,
     280              :                 // then start a new partition
     281         1548 :                 if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
     282           26 :                     && current_part_size > 0
     283           26 :                 {
     284           26 :                     parts.push(KeySpace {
     285           26 :                         ranges: current_part,
     286           26 :                     });
     287           26 :                     current_part = Vec::new();
     288           26 :                     current_part_size = 0;
     289         1522 :                 }
     290         1548 :                 current_part.push(frag_range.start..frag_range.end);
     291         1548 :                 current_part_size += frag_on_shard_size as usize;
     292              :             }
     293              :         }
     294              : 
     295              :         // add last partition that wasn't full yet.
     296          257 :         if !current_part.is_empty() {
     297          257 :             parts.push(KeySpace {
     298          257 :                 ranges: current_part,
     299          257 :             });
     300          257 :         }
     301              : 
     302          257 :         KeyPartitioning { parts }
     303          257 :     }
     304              : 
     305          618 :     pub fn is_empty(&self) -> bool {
     306          618 :         self.total_raw_size() == 0
     307          618 :     }
     308              : 
     309              :     /// Merge another keyspace into the current one.
     310              :     /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
     311          414 :     pub fn merge(&mut self, other: &KeySpace) {
     312          414 :         let all_ranges = self
     313          414 :             .ranges
     314          414 :             .iter()
     315        69906 :             .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
     316          414 : 
     317          414 :         let mut accum = KeySpaceAccum::new();
     318          414 :         let mut prev: Option<&Range<Key>> = None;
     319       107591 :         for range in all_ranges {
     320       107177 :             if let Some(prev) = prev {
     321       106969 :                 let overlap =
     322       106969 :                     std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
     323       106969 :                 assert!(
     324       106969 :                     !overlap,
     325            0 :                     "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
     326              :                     prev, range
     327              :                 );
     328          208 :             }
     329              : 
     330       107177 :             accum.add_range(range.clone());
     331       107177 :             prev = Some(range);
     332              :         }
     333              : 
     334          414 :         self.ranges = accum.to_keyspace().ranges;
     335          414 :     }
     336              : 
     337              :     /// Remove all keys in `other` from `self`.
     338              :     /// This can involve splitting or removing of existing ranges.
     339              :     /// Returns the removed keyspace
     340          866 :     pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
     341          866 :         let (self_start, self_end) = match (self.start(), self.end()) {
     342          796 :             (Some(start), Some(end)) => (start, end),
     343              :             _ => {
     344              :                 // self is empty
     345           70 :                 return KeySpace::default();
     346              :             }
     347              :         };
     348              : 
     349              :         // Key spaces are sorted by definition, so skip ahead to the first
     350              :         // potentially intersecting range. Similarly, ignore ranges that start
     351              :         // after the current keyspace ends.
     352          796 :         let other_ranges = other
     353          796 :             .ranges
     354          796 :             .iter()
     355        10548 :             .skip_while(|range| self_start >= range.end)
     356        70362 :             .take_while(|range| self_end > range.start);
     357          796 : 
     358          796 :         let mut removed_accum = KeySpaceRandomAccum::new();
     359        71108 :         for range in other_ranges {
     360       202596 :             while let Some(overlap_at) = self.overlaps_at(range) {
     361       132284 :                 let overlapped = self.ranges[overlap_at].clone();
     362       132284 : 
     363       132284 :                 if overlapped.start < range.start && overlapped.end <= range.end {
     364           16 :                     // Higher part of the range is completely overlapped.
     365           16 :                     removed_accum.add_range(range.start..self.ranges[overlap_at].end);
     366           16 :                     self.ranges[overlap_at].end = range.start;
     367       132268 :                 }
     368       132284 :                 if overlapped.start >= range.start && overlapped.end > range.end {
     369           70 :                     // Lower part of the range is completely overlapped.
     370           70 :                     removed_accum.add_range(self.ranges[overlap_at].start..range.end);
     371           70 :                     self.ranges[overlap_at].start = range.end;
     372       132214 :                 }
     373       132284 :                 if overlapped.start < range.start && overlapped.end > range.end {
     374        69974 :                     // Middle part of the range is overlapped.
     375        69974 :                     removed_accum.add_range(range.clone());
     376        69974 :                     self.ranges[overlap_at].end = range.start;
     377        69974 :                     self.ranges
     378        69974 :                         .insert(overlap_at + 1, range.end..overlapped.end);
     379        69974 :                 }
     380       132284 :                 if overlapped.start >= range.start && overlapped.end <= range.end {
     381        62224 :                     // Whole range is overlapped
     382        62224 :                     removed_accum.add_range(self.ranges[overlap_at].clone());
     383        62224 :                     self.ranges.remove(overlap_at);
     384        70060 :                 }
     385              :             }
     386              :         }
     387              : 
     388          796 :         removed_accum.to_keyspace()
     389          866 :     }
     390              : 
     391          886 :     pub fn start(&self) -> Option<Key> {
     392          886 :         self.ranges.first().map(|range| range.start)
     393          886 :     }
     394              : 
     395          866 :     pub fn end(&self) -> Option<Key> {
     396          866 :         self.ranges.last().map(|range| range.end)
     397          866 :     }
     398              : 
     399              :     /// The size of the keyspace in pages, before accounting for sharding
     400         1704 :     pub fn total_raw_size(&self) -> usize {
     401         1704 :         self.ranges
     402         1704 :             .iter()
     403        73224 :             .map(|range| ShardedRange::raw_size(range) as usize)
     404         1704 :             .sum()
     405         1704 :     }
     406              : 
     407       278696 :     fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
     408      2077699 :         match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
     409          114 :             Ok(0) => None,
     410         2589 :             Err(0) => None,
     411       112073 :             Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     412       163920 :             Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     413       112776 :             _ => None,
     414              :         }
     415       278696 :     }
     416              : 
     417              :     ///
     418              :     /// Check if key space contains overlapping range
     419              :     ///
     420        76100 :     pub fn overlaps(&self, range: &Range<Key>) -> bool {
     421        76100 :         self.overlaps_at(range).is_some()
     422        76100 :     }
     423              : 
     424              :     /// Check if the keyspace contains a key
     425        74323 :     pub fn contains(&self, key: &Key) -> bool {
     426        74323 :         self.overlaps(&(*key..key.next()))
     427        74323 :     }
     428              : }
     429              : 
     430              : ///
     431              : /// Represents a partitioning of the key space.
     432              : ///
     433              : /// The only kind of partitioning we do is to partition the key space into
     434              : /// partitions that are roughly equal in physical size (see KeySpace::partition).
     435              : /// But this data structure could represent any partitioning.
     436              : ///
     437              : #[derive(Clone, Debug, Default)]
     438              : pub struct KeyPartitioning {
     439              :     pub parts: Vec<KeySpace>,
     440              : }
     441              : 
     442              : /// Represents a partitioning of the sparse key space.
     443              : #[derive(Clone, Debug, Default)]
     444              : pub struct SparseKeyPartitioning {
     445              :     pub parts: Vec<SparseKeySpace>,
     446              : }
     447              : 
     448              : impl KeyPartitioning {
     449          760 :     pub fn new() -> Self {
     450          760 :         KeyPartitioning { parts: Vec::new() }
     451          760 :     }
     452              : 
     453              :     /// Convert a key partitioning to a sparse partition.
     454          380 :     pub fn into_sparse(self) -> SparseKeyPartitioning {
     455          380 :         SparseKeyPartitioning {
     456          380 :             parts: self.parts.into_iter().map(SparseKeySpace).collect(),
     457          380 :         }
     458          380 :     }
     459              : }
     460              : 
     461              : impl SparseKeyPartitioning {
     462              :     /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
     463              :     /// cause long/dead loops.
     464          364 :     pub fn into_dense(self) -> KeyPartitioning {
     465          364 :         KeyPartitioning {
     466          364 :             parts: self.parts.into_iter().map(|x| x.0).collect(),
     467          364 :         }
     468          364 :     }
     469              : }
     470              : 
     471              : ///
     472              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     473              : /// object. This takes care of merging adjacent keys and key ranges into
     474              : /// contiguous ranges.
     475              : ///
     476              : #[derive(Clone, Debug, Default)]
     477              : pub struct KeySpaceAccum {
     478              :     accum: Option<Range<Key>>,
     479              : 
     480              :     ranges: Vec<Range<Key>>,
     481              :     size: u64,
     482              : }
     483              : 
     484              : impl KeySpaceAccum {
     485        79998 :     pub fn new() -> Self {
     486        79998 :         Self {
     487        79998 :             accum: None,
     488        79998 :             ranges: Vec::new(),
     489        79998 :             size: 0,
     490        79998 :         }
     491        79998 :     }
     492              : 
     493              :     #[inline(always)]
     494      4081170 :     pub fn add_key(&mut self, key: Key) {
     495      4081170 :         self.add_range(singleton_range(key))
     496      4081170 :     }
     497              : 
     498              :     #[inline(always)]
     499      4272099 :     pub fn add_range(&mut self, range: Range<Key>) {
     500      4272099 :         self.size += ShardedRange::raw_size(&range) as u64;
     501      4272099 : 
     502      4272099 :         match self.accum.as_mut() {
     503      4178342 :             Some(accum) => {
     504      4178342 :                 if range.start == accum.end {
     505      4067064 :                     accum.end = range.end;
     506      4067064 :                 } else {
     507              :                     // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
     508              :                     // a new range here if the skipped region was all keys that don't belong on this shard.
     509              :                     // (https://github.com/neondatabase/neon/issues/6247)
     510       111278 :                     assert!(range.start > accum.end);
     511       111278 :                     self.ranges.push(accum.clone());
     512       111278 :                     *accum = range;
     513              :                 }
     514              :             }
     515        93757 :             None => self.accum = Some(range),
     516              :         }
     517      4272099 :     }
     518              : 
     519        89133 :     pub fn to_keyspace(mut self) -> KeySpace {
     520        89133 :         if let Some(accum) = self.accum.take() {
     521        85645 :             self.ranges.push(accum);
     522        85645 :         }
     523        89133 :         KeySpace {
     524        89133 :             ranges: self.ranges,
     525        89133 :         }
     526        89133 :     }
     527              : 
     528          938 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     529          938 :         std::mem::take(self).to_keyspace()
     530          938 :     }
     531              : 
     532              :     // The total number of keys in this object, ignoring any sharding effects that might cause some of
     533              :     // the keys to be omitted in storage on this shard.
     534         2178 :     pub fn raw_size(&self) -> u64 {
     535         2178 :         self.size
     536         2178 :     }
     537              : }
     538              : 
     539              : ///
     540              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     541              : /// object. Key ranges may be inserted in any order and can overlap.
     542              : ///
     543              : #[derive(Clone, Debug, Default)]
     544              : pub struct KeySpaceRandomAccum {
     545              :     ranges: Vec<Range<Key>>,
     546              : }
     547              : 
     548              : impl KeySpaceRandomAccum {
     549         2368 :     pub fn new() -> Self {
     550         2368 :         Self { ranges: Vec::new() }
     551         2368 :     }
     552              : 
     553        40508 :     pub fn add_key(&mut self, key: Key) {
     554        40508 :         self.add_range(singleton_range(key))
     555        40508 :     }
     556              : 
     557       236896 :     pub fn add_range(&mut self, range: Range<Key>) {
     558       236896 :         self.ranges.push(range);
     559       236896 :     }
     560              : 
     561        63994 :     pub fn add_keyspace(&mut self, keyspace: KeySpace) {
     562       127990 :         for range in keyspace.ranges {
     563        63996 :             self.add_range(range);
     564        63996 :         }
     565        63994 :     }
     566              : 
     567         1630 :     pub fn to_keyspace(mut self) -> KeySpace {
     568         1630 :         let mut ranges = Vec::new();
     569         1630 :         if !self.ranges.is_empty() {
     570       473028 :             self.ranges.sort_by_key(|r| r.start);
     571          936 :             let mut start = self.ranges.first().unwrap().start;
     572          936 :             let mut end = self.ranges.first().unwrap().end;
     573       237760 :             for r in self.ranges {
     574       236824 :                 assert!(r.start >= start);
     575       236824 :                 if r.start > end {
     576       235494 :                     ranges.push(start..end);
     577       235494 :                     start = r.start;
     578       235494 :                     end = r.end;
     579       235494 :                 } else if r.end > end {
     580          380 :                     end = r.end;
     581          950 :                 }
     582              :             }
     583          936 :             ranges.push(start..end);
     584          694 :         }
     585         1630 :         KeySpace { ranges }
     586         1630 :     }
     587              : 
     588          816 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     589          816 :         let mut prev_accum = KeySpaceRandomAccum::new();
     590          816 :         std::mem::swap(self, &mut prev_accum);
     591          816 : 
     592          816 :         prev_accum.to_keyspace()
     593          816 :     }
     594              : }
     595              : 
     596      4121678 : pub fn singleton_range(key: Key) -> Range<Key> {
     597      4121678 :     key..key.next()
     598      4121678 : }
     599              : 
     600              : #[cfg(test)]
     601              : mod tests {
     602              :     use rand::{RngCore, SeedableRng};
     603              : 
     604              :     use crate::{
     605              :         models::ShardParameters,
     606              :         shard::{ShardCount, ShardNumber},
     607              :     };
     608              : 
     609              :     use super::*;
     610              :     use std::fmt::Write;
     611              : 
     612              :     // Helper function to create a key range.
     613              :     //
     614              :     // Make the tests below less verbose.
     615           92 :     fn kr(irange: Range<i128>) -> Range<Key> {
     616           92 :         Key::from_i128(irange.start)..Key::from_i128(irange.end)
     617           92 :     }
     618              : 
     619              :     #[allow(dead_code)]
     620            0 :     fn dump_keyspace(ks: &KeySpace) {
     621            0 :         for r in ks.ranges.iter() {
     622            0 :             println!("  {}..{}", r.start.to_i128(), r.end.to_i128());
     623            0 :         }
     624            0 :     }
     625              : 
     626           22 :     fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
     627           22 :         if actual.ranges != expected {
     628            0 :             let mut msg = String::new();
     629            0 : 
     630            0 :             writeln!(msg, "expected:").unwrap();
     631            0 :             for r in &expected {
     632            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     633            0 :             }
     634            0 :             writeln!(msg, "got:").unwrap();
     635            0 :             for r in &actual.ranges {
     636            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     637            0 :             }
     638            0 :             panic!("{}", msg);
     639           22 :         }
     640           22 :     }
     641              : 
     642              :     #[test]
     643            2 :     fn keyspace_consume() {
     644            2 :         let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
     645            2 : 
     646            2 :         let mut accum = KeySpaceAccum::new();
     647            8 :         for range in &ranges {
     648            6 :             accum.add_range(range.clone());
     649            6 :         }
     650              : 
     651            2 :         let expected_size: u64 = ranges
     652            2 :             .iter()
     653            6 :             .map(|r| ShardedRange::raw_size(r) as u64)
     654            2 :             .sum();
     655            2 :         assert_eq!(accum.raw_size(), expected_size);
     656              : 
     657            2 :         assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
     658            2 :         assert_eq!(accum.raw_size(), 0);
     659              : 
     660            2 :         assert_ks_eq(&accum.consume_keyspace(), vec![]);
     661            2 :         assert_eq!(accum.raw_size(), 0);
     662              : 
     663            8 :         for range in &ranges {
     664            6 :             accum.add_range(range.clone());
     665            6 :         }
     666            2 :         assert_ks_eq(&accum.to_keyspace(), ranges);
     667            2 :     }
     668              : 
     669              :     #[test]
     670            2 :     fn keyspace_add_range() {
     671            2 :         // two separate ranges
     672            2 :         //
     673            2 :         // #####
     674            2 :         //         #####
     675            2 :         let mut ks = KeySpaceRandomAccum::default();
     676            2 :         ks.add_range(kr(0..10));
     677            2 :         ks.add_range(kr(20..30));
     678            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
     679            2 : 
     680            2 :         // two separate ranges, added in reverse order
     681            2 :         //
     682            2 :         //         #####
     683            2 :         // #####
     684            2 :         let mut ks = KeySpaceRandomAccum::default();
     685            2 :         ks.add_range(kr(20..30));
     686            2 :         ks.add_range(kr(0..10));
     687            2 : 
     688            2 :         // add range that is adjacent to the end of an existing range
     689            2 :         //
     690            2 :         // #####
     691            2 :         //      #####
     692            2 :         ks.add_range(kr(0..10));
     693            2 :         ks.add_range(kr(10..30));
     694            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     695            2 : 
     696            2 :         // add range that is adjacent to the start of an existing range
     697            2 :         //
     698            2 :         //      #####
     699            2 :         // #####
     700            2 :         let mut ks = KeySpaceRandomAccum::default();
     701            2 :         ks.add_range(kr(10..30));
     702            2 :         ks.add_range(kr(0..10));
     703            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     704            2 : 
     705            2 :         // add range that overlaps with the end of an existing range
     706            2 :         //
     707            2 :         // #####
     708            2 :         //    #####
     709            2 :         let mut ks = KeySpaceRandomAccum::default();
     710            2 :         ks.add_range(kr(0..10));
     711            2 :         ks.add_range(kr(5..30));
     712            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     713            2 : 
     714            2 :         // add range that overlaps with the start of an existing range
     715            2 :         //
     716            2 :         //    #####
     717            2 :         // #####
     718            2 :         let mut ks = KeySpaceRandomAccum::default();
     719            2 :         ks.add_range(kr(5..30));
     720            2 :         ks.add_range(kr(0..10));
     721            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     722            2 : 
     723            2 :         // add range that is fully covered by an existing range
     724            2 :         //
     725            2 :         // #########
     726            2 :         //   #####
     727            2 :         let mut ks = KeySpaceRandomAccum::default();
     728            2 :         ks.add_range(kr(0..30));
     729            2 :         ks.add_range(kr(10..20));
     730            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     731            2 : 
     732            2 :         // add range that extends an existing range from both ends
     733            2 :         //
     734            2 :         //   #####
     735            2 :         // #########
     736            2 :         let mut ks = KeySpaceRandomAccum::default();
     737            2 :         ks.add_range(kr(10..20));
     738            2 :         ks.add_range(kr(0..30));
     739            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     740            2 : 
     741            2 :         // add a range that overlaps with two existing ranges, joining them
     742            2 :         //
     743            2 :         // #####   #####
     744            2 :         //    #######
     745            2 :         let mut ks = KeySpaceRandomAccum::default();
     746            2 :         ks.add_range(kr(0..10));
     747            2 :         ks.add_range(kr(20..30));
     748            2 :         ks.add_range(kr(5..25));
     749            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     750            2 :     }
     751              : 
     752              :     #[test]
     753            2 :     fn keyspace_overlaps() {
     754            2 :         let mut ks = KeySpaceRandomAccum::default();
     755            2 :         ks.add_range(kr(10..20));
     756            2 :         ks.add_range(kr(30..40));
     757            2 :         let ks = ks.to_keyspace();
     758            2 : 
     759            2 :         //        #####      #####
     760            2 :         // xxxx
     761            2 :         assert!(!ks.overlaps(&kr(0..5)));
     762              : 
     763              :         //        #####      #####
     764              :         //   xxxx
     765            2 :         assert!(!ks.overlaps(&kr(5..9)));
     766              : 
     767              :         //        #####      #####
     768              :         //    xxxx
     769            2 :         assert!(!ks.overlaps(&kr(5..10)));
     770              : 
     771              :         //        #####      #####
     772              :         //     xxxx
     773            2 :         assert!(ks.overlaps(&kr(5..11)));
     774              : 
     775              :         //        #####      #####
     776              :         //        xxxx
     777            2 :         assert!(ks.overlaps(&kr(10..15)));
     778              : 
     779              :         //        #####      #####
     780              :         //         xxxx
     781            2 :         assert!(ks.overlaps(&kr(15..20)));
     782              : 
     783              :         //        #####      #####
     784              :         //           xxxx
     785            2 :         assert!(ks.overlaps(&kr(15..25)));
     786              : 
     787              :         //        #####      #####
     788              :         //              xxxx
     789            2 :         assert!(!ks.overlaps(&kr(22..28)));
     790              : 
     791              :         //        #####      #####
     792              :         //               xxxx
     793            2 :         assert!(!ks.overlaps(&kr(25..30)));
     794              : 
     795              :         //        #####      #####
     796              :         //                      xxxx
     797            2 :         assert!(ks.overlaps(&kr(35..35)));
     798              : 
     799              :         //        #####      #####
     800              :         //                        xxxx
     801            2 :         assert!(!ks.overlaps(&kr(40..45)));
     802              : 
     803              :         //        #####      #####
     804              :         //                        xxxx
     805            2 :         assert!(!ks.overlaps(&kr(45..50)));
     806              : 
     807              :         //        #####      #####
     808              :         //        xxxxxxxxxxx
     809            2 :         assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
     810            2 :     }
     811              : 
     812              :     #[test]
     813            2 :     fn test_remove_full_overlapps() {
     814            2 :         let mut key_space1 = KeySpace {
     815            2 :             ranges: vec![
     816            2 :                 Key::from_i128(1)..Key::from_i128(4),
     817            2 :                 Key::from_i128(5)..Key::from_i128(8),
     818            2 :                 Key::from_i128(10)..Key::from_i128(12),
     819            2 :             ],
     820            2 :         };
     821            2 :         let key_space2 = KeySpace {
     822            2 :             ranges: vec![
     823            2 :                 Key::from_i128(2)..Key::from_i128(3),
     824            2 :                 Key::from_i128(6)..Key::from_i128(7),
     825            2 :                 Key::from_i128(11)..Key::from_i128(13),
     826            2 :             ],
     827            2 :         };
     828            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     829            2 :         let removed_expected = KeySpace {
     830            2 :             ranges: vec![
     831            2 :                 Key::from_i128(2)..Key::from_i128(3),
     832            2 :                 Key::from_i128(6)..Key::from_i128(7),
     833            2 :                 Key::from_i128(11)..Key::from_i128(12),
     834            2 :             ],
     835            2 :         };
     836            2 :         assert_eq!(removed, removed_expected);
     837              : 
     838            2 :         assert_eq!(
     839            2 :             key_space1.ranges,
     840            2 :             vec![
     841            2 :                 Key::from_i128(1)..Key::from_i128(2),
     842            2 :                 Key::from_i128(3)..Key::from_i128(4),
     843            2 :                 Key::from_i128(5)..Key::from_i128(6),
     844            2 :                 Key::from_i128(7)..Key::from_i128(8),
     845            2 :                 Key::from_i128(10)..Key::from_i128(11)
     846            2 :             ]
     847            2 :         );
     848            2 :     }
     849              : 
     850              :     #[test]
     851            2 :     fn test_remove_partial_overlaps() {
     852            2 :         // Test partial ovelaps
     853            2 :         let mut key_space1 = KeySpace {
     854            2 :             ranges: vec![
     855            2 :                 Key::from_i128(1)..Key::from_i128(5),
     856            2 :                 Key::from_i128(7)..Key::from_i128(10),
     857            2 :                 Key::from_i128(12)..Key::from_i128(15),
     858            2 :             ],
     859            2 :         };
     860            2 :         let key_space2 = KeySpace {
     861            2 :             ranges: vec![
     862            2 :                 Key::from_i128(3)..Key::from_i128(6),
     863            2 :                 Key::from_i128(8)..Key::from_i128(11),
     864            2 :                 Key::from_i128(14)..Key::from_i128(17),
     865            2 :             ],
     866            2 :         };
     867            2 : 
     868            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     869            2 :         let removed_expected = KeySpace {
     870            2 :             ranges: vec![
     871            2 :                 Key::from_i128(3)..Key::from_i128(5),
     872            2 :                 Key::from_i128(8)..Key::from_i128(10),
     873            2 :                 Key::from_i128(14)..Key::from_i128(15),
     874            2 :             ],
     875            2 :         };
     876            2 :         assert_eq!(removed, removed_expected);
     877              : 
     878            2 :         assert_eq!(
     879            2 :             key_space1.ranges,
     880            2 :             vec![
     881            2 :                 Key::from_i128(1)..Key::from_i128(3),
     882            2 :                 Key::from_i128(7)..Key::from_i128(8),
     883            2 :                 Key::from_i128(12)..Key::from_i128(14),
     884            2 :             ]
     885            2 :         );
     886            2 :     }
     887              : 
     888              :     #[test]
     889            2 :     fn test_remove_no_overlaps() {
     890            2 :         let mut key_space1 = KeySpace {
     891            2 :             ranges: vec![
     892            2 :                 Key::from_i128(1)..Key::from_i128(5),
     893            2 :                 Key::from_i128(7)..Key::from_i128(10),
     894            2 :                 Key::from_i128(12)..Key::from_i128(15),
     895            2 :             ],
     896            2 :         };
     897            2 :         let key_space2 = KeySpace {
     898            2 :             ranges: vec![
     899            2 :                 Key::from_i128(6)..Key::from_i128(7),
     900            2 :                 Key::from_i128(11)..Key::from_i128(12),
     901            2 :                 Key::from_i128(15)..Key::from_i128(17),
     902            2 :             ],
     903            2 :         };
     904            2 : 
     905            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     906            2 :         let removed_expected = KeySpace::default();
     907            2 :         assert_eq!(removed, removed_expected);
     908              : 
     909            2 :         assert_eq!(
     910            2 :             key_space1.ranges,
     911            2 :             vec![
     912            2 :                 Key::from_i128(1)..Key::from_i128(5),
     913            2 :                 Key::from_i128(7)..Key::from_i128(10),
     914            2 :                 Key::from_i128(12)..Key::from_i128(15),
     915            2 :             ]
     916            2 :         );
     917            2 :     }
     918              : 
     919              :     #[test]
     920            2 :     fn test_remove_one_range_overlaps_multiple() {
     921            2 :         let mut key_space1 = KeySpace {
     922            2 :             ranges: vec![
     923            2 :                 Key::from_i128(1)..Key::from_i128(3),
     924            2 :                 Key::from_i128(3)..Key::from_i128(6),
     925            2 :                 Key::from_i128(6)..Key::from_i128(10),
     926            2 :                 Key::from_i128(12)..Key::from_i128(15),
     927            2 :                 Key::from_i128(17)..Key::from_i128(20),
     928            2 :                 Key::from_i128(20)..Key::from_i128(30),
     929            2 :                 Key::from_i128(30)..Key::from_i128(40),
     930            2 :             ],
     931            2 :         };
     932            2 :         let key_space2 = KeySpace {
     933            2 :             ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
     934            2 :         };
     935            2 : 
     936            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     937            2 :         let removed_expected = KeySpace {
     938            2 :             ranges: vec![
     939            2 :                 Key::from_i128(9)..Key::from_i128(10),
     940            2 :                 Key::from_i128(12)..Key::from_i128(15),
     941            2 :                 Key::from_i128(17)..Key::from_i128(19),
     942            2 :             ],
     943            2 :         };
     944            2 :         assert_eq!(removed, removed_expected);
     945              : 
     946            2 :         assert_eq!(
     947            2 :             key_space1.ranges,
     948            2 :             vec![
     949            2 :                 Key::from_i128(1)..Key::from_i128(3),
     950            2 :                 Key::from_i128(3)..Key::from_i128(6),
     951            2 :                 Key::from_i128(6)..Key::from_i128(9),
     952            2 :                 Key::from_i128(19)..Key::from_i128(20),
     953            2 :                 Key::from_i128(20)..Key::from_i128(30),
     954            2 :                 Key::from_i128(30)..Key::from_i128(40),
     955            2 :             ]
     956            2 :         );
     957            2 :     }
     958              :     #[test]
     959            2 :     fn sharded_range_relation_gap() {
     960            2 :         let shard_identity = ShardIdentity::new(
     961            2 :             ShardNumber(0),
     962            2 :             ShardCount::new(4),
     963            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     964            2 :         )
     965            2 :         .unwrap();
     966            2 : 
     967            2 :         let range = ShardedRange::new(
     968            2 :             Range {
     969            2 :                 start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
     970            2 :                 end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
     971            2 :             },
     972            2 :             &shard_identity,
     973            2 :         );
     974            2 : 
     975            2 :         // Key range spans relations, expect MAX
     976            2 :         assert_eq!(range.page_count(), u32::MAX);
     977            2 :     }
     978              : 
     979              :     #[test]
     980            2 :     fn shard_identity_keyspaces_single_key() {
     981            2 :         let shard_identity = ShardIdentity::new(
     982            2 :             ShardNumber(1),
     983            2 :             ShardCount::new(4),
     984            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     985            2 :         )
     986            2 :         .unwrap();
     987            2 : 
     988            2 :         let range = ShardedRange::new(
     989            2 :             Range {
     990            2 :                 start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
     991            2 :                 end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
     992            2 :             },
     993            2 :             &shard_identity,
     994            2 :         );
     995            2 :         // Single-key range on logical size key
     996            2 :         assert_eq!(range.page_count(), 1);
     997            2 :     }
     998              : 
     999              :     /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
    1000              :     #[test]
    1001            2 :     fn contiguous_range_check() {
    1002            2 :         assert!(!is_contiguous_range(
    1003            2 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1004            2 :                 ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
    1005            2 :         ),);
    1006              : 
    1007              :         // The ranges goes all the way up to the 0xffffffff, including it: this is
    1008              :         // not considered a rel block range because 0xffffffff stores logical sizes,
    1009              :         // not blocks.
    1010            2 :         assert!(!is_contiguous_range(
    1011            2 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1012            2 :                 ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
    1013            2 :         ),);
    1014              : 
    1015              :         // Keys within the normal data region of a relation
    1016            2 :         assert!(is_contiguous_range(
    1017            2 :             &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
    1018            2 :                 ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
    1019            2 :         ),);
    1020              : 
    1021              :         // The logical size key of one forkno, then some blocks in the next
    1022            2 :         assert!(is_contiguous_range(
    1023            2 :             &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
    1024            2 :                 ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
    1025            2 :         ),);
    1026            2 :     }
    1027              : 
    1028              :     #[test]
    1029            2 :     fn shard_identity_keyspaces_forkno_gap() {
    1030            2 :         let shard_identity = ShardIdentity::new(
    1031            2 :             ShardNumber(1),
    1032            2 :             ShardCount::new(4),
    1033            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1034            2 :         )
    1035            2 :         .unwrap();
    1036            2 : 
    1037            2 :         let range = ShardedRange::new(
    1038            2 :             Range {
    1039            2 :                 start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
    1040            2 :                 end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
    1041            2 :             },
    1042            2 :             &shard_identity,
    1043            2 :         );
    1044            2 : 
    1045            2 :         // Range spanning the end of one forkno and the start of the next: we do not attempt to
    1046            2 :         // calculate a valid size, because we have no way to know if they keys between start
    1047            2 :         // and end are actually in use.
    1048            2 :         assert_eq!(range.page_count(), u32::MAX);
    1049            2 :     }
    1050              : 
    1051              :     #[test]
    1052            2 :     fn shard_identity_keyspaces_one_relation() {
    1053           10 :         for shard_number in 0..4 {
    1054            8 :             let shard_identity = ShardIdentity::new(
    1055            8 :                 ShardNumber(shard_number),
    1056            8 :                 ShardCount::new(4),
    1057            8 :                 ShardParameters::DEFAULT_STRIPE_SIZE,
    1058            8 :             )
    1059            8 :             .unwrap();
    1060            8 : 
    1061            8 :             let range = ShardedRange::new(
    1062            8 :                 Range {
    1063            8 :                     start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
    1064            8 :                     end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
    1065            8 :                 },
    1066            8 :                 &shard_identity,
    1067            8 :             );
    1068            8 : 
    1069            8 :             // Very simple case: range covering block zero of one relation, where that block maps to shard zero
    1070            8 :             if shard_number == 0 {
    1071            2 :                 assert_eq!(range.page_count(), 1);
    1072              :             } else {
    1073              :                 // Other shards should perceive the range's size as zero
    1074            6 :                 assert_eq!(range.page_count(), 0);
    1075              :             }
    1076              :         }
    1077            2 :     }
    1078              : 
    1079              :     /// Test helper: construct a ShardedRange and call fragment() on it, returning
    1080              :     /// the total page count in the range and the fragments.
    1081         2024 :     fn do_fragment(
    1082         2024 :         range_start: Key,
    1083         2024 :         range_end: Key,
    1084         2024 :         shard_identity: &ShardIdentity,
    1085         2024 :         target_nblocks: u32,
    1086         2024 :     ) -> (u32, Vec<(u32, Range<Key>)>) {
    1087         2024 :         let range = ShardedRange::new(
    1088         2024 :             Range {
    1089         2024 :                 start: range_start,
    1090         2024 :                 end: range_end,
    1091         2024 :             },
    1092         2024 :             shard_identity,
    1093         2024 :         );
    1094         2024 : 
    1095         2024 :         let page_count = range.page_count();
    1096         2024 :         let fragments = range.fragment(target_nblocks);
    1097         2024 : 
    1098         2024 :         // Invariant: we always get at least one fragment
    1099         2024 :         assert!(!fragments.is_empty());
    1100              : 
    1101              :         // Invariant: the first/last fragment start/end should equal the input start/end
    1102         2024 :         assert_eq!(fragments.first().unwrap().1.start, range_start);
    1103         2024 :         assert_eq!(fragments.last().unwrap().1.end, range_end);
    1104              : 
    1105         2024 :         if page_count > 0 {
    1106              :             // Invariant: every fragment must contain at least one shard-local page, if the
    1107              :             // total range contains at least one shard-local page
    1108         1374 :             let all_nonzero = fragments.iter().all(|f| f.0 > 0);
    1109         1108 :             if !all_nonzero {
    1110            0 :                 eprintln!("Found a zero-length fragment: {:?}", fragments);
    1111         1108 :             }
    1112         1108 :             assert!(all_nonzero);
    1113              :         } else {
    1114              :             // A range with no shard-local pages should always be returned as a single fragment
    1115          916 :             assert_eq!(fragments, vec![(0, range_start..range_end)]);
    1116              :         }
    1117              : 
    1118              :         // Invariant: fragments must be ordered and non-overlapping
    1119         2024 :         let mut last: Option<Range<Key>> = None;
    1120         4314 :         for frag in &fragments {
    1121         2290 :             if let Some(last) = last {
    1122          266 :                 assert!(frag.1.start >= last.end);
    1123          266 :                 assert!(frag.1.start > last.start);
    1124         2024 :             }
    1125         2290 :             last = Some(frag.1.clone())
    1126              :         }
    1127              : 
    1128              :         // Invariant: fragments respect target_nblocks
    1129         4314 :         for frag in &fragments {
    1130         2290 :             assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
    1131              :         }
    1132              : 
    1133         2024 :         (page_count, fragments)
    1134         2024 :     }
    1135              : 
    1136              :     /// Really simple tests for fragment(), on a range that just contains a single stripe
    1137              :     /// for a single tenant.
    1138              :     #[test]
    1139            2 :     fn sharded_range_fragment_simple() {
    1140            2 :         let shard_identity = ShardIdentity::new(
    1141            2 :             ShardNumber(0),
    1142            2 :             ShardCount::new(4),
    1143            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1144            2 :         )
    1145            2 :         .unwrap();
    1146            2 : 
    1147            2 :         // A range which we happen to know covers exactly one stripe which belongs to this shard
    1148            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1149            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
    1150            2 : 
    1151            2 :         // Ask for stripe_size blocks, we get the whole stripe
    1152            2 :         assert_eq!(
    1153            2 :             do_fragment(input_start, input_end, &shard_identity, 32768),
    1154            2 :             (32768, vec![(32768, input_start..input_end)])
    1155            2 :         );
    1156              : 
    1157              :         // Ask for more, we still get the whole stripe
    1158            2 :         assert_eq!(
    1159            2 :             do_fragment(input_start, input_end, &shard_identity, 10000000),
    1160            2 :             (32768, vec![(32768, input_start..input_end)])
    1161            2 :         );
    1162              : 
    1163              :         // Ask for target_nblocks of half the stripe size, we get two halves
    1164            2 :         assert_eq!(
    1165            2 :             do_fragment(input_start, input_end, &shard_identity, 16384),
    1166            2 :             (
    1167            2 :                 32768,
    1168            2 :                 vec![
    1169            2 :                     (16384, input_start..input_start.add(16384)),
    1170            2 :                     (16384, input_start.add(16384)..input_end)
    1171            2 :                 ]
    1172            2 :             )
    1173            2 :         );
    1174            2 :     }
    1175              : 
    1176              :     #[test]
    1177            2 :     fn sharded_range_fragment_multi_stripe() {
    1178            2 :         let shard_identity = ShardIdentity::new(
    1179            2 :             ShardNumber(0),
    1180            2 :             ShardCount::new(4),
    1181            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1182            2 :         )
    1183            2 :         .unwrap();
    1184            2 : 
    1185            2 :         // A range which covers multiple stripes, exactly one of which belongs to the current shard.
    1186            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1187            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1188            2 :         // Ask for all the blocks, get a fragment that covers the whole range but reports
    1189            2 :         // its size to be just the blocks belonging to our shard.
    1190            2 :         assert_eq!(
    1191            2 :             do_fragment(input_start, input_end, &shard_identity, 131072),
    1192            2 :             (32768, vec![(32768, input_start..input_end)])
    1193            2 :         );
    1194              : 
    1195              :         // Ask for a sub-stripe quantity
    1196            2 :         assert_eq!(
    1197            2 :             do_fragment(input_start, input_end, &shard_identity, 16000),
    1198            2 :             (
    1199            2 :                 32768,
    1200            2 :                 vec![
    1201            2 :                     (16000, input_start..input_start.add(16000)),
    1202            2 :                     (16000, input_start.add(16000)..input_start.add(32000)),
    1203            2 :                     (768, input_start.add(32000)..input_end),
    1204            2 :                 ]
    1205            2 :             )
    1206            2 :         );
    1207              : 
    1208              :         // Try on a range that starts slightly after our owned stripe
    1209            2 :         assert_eq!(
    1210            2 :             do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
    1211            2 :             (32767, vec![(32767, input_start.add(1)..input_end)])
    1212            2 :         );
    1213            2 :     }
    1214              : 
    1215              :     /// Test our calculations work correctly when we start a range from the logical size key of
    1216              :     /// a previous relation.
    1217              :     #[test]
    1218            2 :     fn sharded_range_fragment_starting_from_logical_size() {
    1219            2 :         let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
    1220            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
    1221            2 : 
    1222            2 :         // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
    1223            2 :         let shard_identity = ShardIdentity::new(
    1224            2 :             ShardNumber(0),
    1225            2 :             ShardCount::new(4),
    1226            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1227            2 :         )
    1228            2 :         .unwrap();
    1229            2 :         assert_eq!(
    1230            2 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1231            2 :             (0x8001, vec![(0x8001, input_start..input_end)])
    1232            2 :         );
    1233              : 
    1234              :         // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
    1235              :         // store all logical sizes)
    1236            2 :         let shard_identity = ShardIdentity::new(
    1237            2 :             ShardNumber(1),
    1238            2 :             ShardCount::new(4),
    1239            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1240            2 :         )
    1241            2 :         .unwrap();
    1242            2 :         assert_eq!(
    1243            2 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1244            2 :             (0x1, vec![(0x1, input_start..input_end)])
    1245            2 :         );
    1246            2 :     }
    1247              : 
    1248              :     /// Test that ShardedRange behaves properly when used on un-sharded data
    1249              :     #[test]
    1250            2 :     fn sharded_range_fragment_unsharded() {
    1251            2 :         let shard_identity = ShardIdentity::unsharded();
    1252            2 : 
    1253            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1254            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
    1255            2 :         assert_eq!(
    1256            2 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1257            2 :             (
    1258            2 :                 0x10000,
    1259            2 :                 vec![
    1260            2 :                     (0x8000, input_start..input_start.add(0x8000)),
    1261            2 :                     (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
    1262            2 :                 ]
    1263            2 :             )
    1264            2 :         );
    1265            2 :     }
    1266              : 
    1267              :     #[test]
    1268            2 :     fn sharded_range_fragment_cross_relation() {
    1269            2 :         let shard_identity = ShardIdentity::unsharded();
    1270            2 : 
    1271            2 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1272            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1273            2 :         let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
    1274            2 :         assert_eq!(
    1275            2 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1276            2 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1277            2 :         );
    1278              : 
    1279              :         // Same, but using a sharded identity
    1280            2 :         let shard_identity = ShardIdentity::new(
    1281            2 :             ShardNumber(0),
    1282            2 :             ShardCount::new(4),
    1283            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1284            2 :         )
    1285            2 :         .unwrap();
    1286            2 :         assert_eq!(
    1287            2 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1288            2 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1289            2 :         );
    1290            2 :     }
    1291              : 
    1292              :     #[test]
    1293            2 :     fn sharded_range_fragment_tiny_nblocks() {
    1294            2 :         let shard_identity = ShardIdentity::unsharded();
    1295            2 : 
    1296            2 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1297            2 :         let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
    1298            2 :         let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
    1299            2 :         assert_eq!(
    1300            2 :             do_fragment(input_start, input_end, &shard_identity, 16),
    1301            2 :             (
    1302            2 :                 0x38,
    1303            2 :                 vec![
    1304            2 :                     (16, input_start..input_start.add(16)),
    1305            2 :                     (16, input_start.add(16)..input_start.add(32)),
    1306            2 :                     (16, input_start.add(32)..input_start.add(48)),
    1307            2 :                     (8, input_start.add(48)..input_end),
    1308            2 :                 ]
    1309            2 :             )
    1310            2 :         );
    1311            2 :     }
    1312              : 
    1313              :     #[test]
    1314            2 :     fn sharded_range_fragment_fuzz() {
    1315            2 :         // Use a fixed seed: we don't want to explicitly pick values, but we do want
    1316            2 :         // the test to be reproducible.
    1317            2 :         let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
    1318              : 
    1319         2002 :         for _i in 0..1000 {
    1320         2000 :             let shard_identity = if prng.next_u32() % 2 == 0 {
    1321         1038 :                 ShardIdentity::unsharded()
    1322              :             } else {
    1323          962 :                 let shard_count = prng.next_u32() % 127 + 1;
    1324          962 :                 ShardIdentity::new(
    1325          962 :                     ShardNumber((prng.next_u32() % shard_count) as u8),
    1326          962 :                     ShardCount::new(shard_count as u8),
    1327          962 :                     ShardParameters::DEFAULT_STRIPE_SIZE,
    1328          962 :                 )
    1329          962 :                 .unwrap()
    1330              :             };
    1331              : 
    1332         2000 :             let target_nblocks = prng.next_u32() % 65536 + 1;
    1333         2000 : 
    1334         2000 :             let start_offset = prng.next_u32() % 16384;
    1335         2000 : 
    1336         2000 :             // Try ranges up to 4GiB in size, that are always at least 1
    1337         2000 :             let range_size = prng.next_u32() % 8192 + 1;
    1338         2000 : 
    1339         2000 :             // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1340         2000 :             let input_start = Key::from_hex("000000067F00000001000004E10000000000")
    1341         2000 :                 .unwrap()
    1342         2000 :                 .add(start_offset);
    1343         2000 :             let input_end = input_start.add(range_size);
    1344         2000 : 
    1345         2000 :             // This test's main success conditions are the invariants baked into do_fragment
    1346         2000 :             let (_total_size, fragments) =
    1347         2000 :                 do_fragment(input_start, input_end, &shard_identity, target_nblocks);
    1348         2000 : 
    1349         2000 :             // Pick a random key within the range and check it appears in the output
    1350         2000 :             let example_key = input_start.add(prng.next_u32() % range_size);
    1351         2000 : 
    1352         2000 :             // Panic on unwrap if it isn't found
    1353         2000 :             let example_key_frag = fragments
    1354         2000 :                 .iter()
    1355         2146 :                 .find(|f| f.1.contains(&example_key))
    1356         2000 :                 .unwrap();
    1357         2000 : 
    1358         2000 :             // Check that the fragment containing our random key has a nonzero size if
    1359         2000 :             // that key is shard-local
    1360         2000 :             let example_key_local = !shard_identity.is_key_disposable(&example_key);
    1361         2000 :             if example_key_local {
    1362         1084 :                 assert!(example_key_frag.0 > 0);
    1363          916 :             }
    1364              :         }
    1365            2 :     }
    1366              : }
         |