LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - keyspace.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 97.1 % 967 939
Test Date: 2024-08-29 11:33:10 Functions: 97.1 % 70 68

            Line data    Source code
       1              : use postgres_ffi::BLCKSZ;
       2              : use std::ops::Range;
       3              : 
       4              : use crate::{
       5              :     key::Key,
       6              :     shard::{ShardCount, ShardIdentity},
       7              : };
       8              : use itertools::Itertools;
       9              : 
      10              : ///
      11              : /// Represents a set of Keys, in a compact form.
      12              : ///
      13              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      14              : pub struct KeySpace {
      15              :     /// Contiguous ranges of keys that belong to the key space. In key order,
      16              :     /// and with no overlap.
      17              :     pub ranges: Vec<Range<Key>>,
      18              : }
      19              : 
      20              : impl std::fmt::Display for KeySpace {
      21            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      22            0 :         write!(f, "[")?;
      23            0 :         for range in &self.ranges {
      24            0 :             write!(f, "{}..{},", range.start, range.end)?;
      25              :         }
      26            0 :         write!(f, "]")
      27            0 :     }
      28              : }
      29              : 
      30              : /// A wrapper type for sparse keyspaces.
      31              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      32              : pub struct SparseKeySpace(pub KeySpace);
      33              : 
      34              : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
      35              : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
      36              : /// shard.
      37              : ///
      38              : /// When we iterate over keys within this object, we will skip any keys that don't belong
      39              : /// to this shard.
      40              : ///
      41              : /// The start + end keys may not belong to the shard: these specify where layer files should
      42              : /// start  + end, but we will never actually read/write those keys.
      43              : #[derive(Clone, Debug, PartialEq, Eq)]
      44              : pub struct ShardedRange<'a> {
      45              :     pub shard_identity: &'a ShardIdentity,
      46              :     pub range: Range<Key>,
      47              : }
      48              : 
      49              : // Calculate the size of a range within the blocks of the same relation, or spanning only the
      50              : // top page in the previous relation's space.
      51     21934582 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
      52     21934582 :     debug_assert!(is_contiguous_range(range));
      53     21934582 :     if range.start.field6 == 0xffffffff {
      54      2481324 :         range.end.field6 + 1
      55              :     } else {
      56     19453258 :         range.end.field6 - range.start.field6
      57              :     }
      58     21934582 : }
      59              : 
      60              : /// Return true if this key range includes only keys in the same relation's data blocks, or
      61              : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
      62              : ///
      63              : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
      64              : /// be on our shard.  Later in ShardedRange we do the extra work to figure out how much
      65              : /// of a given contiguous range is present on one shard.
      66              : ///
      67              : /// This matters, because:
      68              : /// - Within such ranges, keys are used contiguously.  Outside such ranges it is sparse.
      69              : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
      70     43873136 : fn is_contiguous_range(range: &Range<Key>) -> bool {
      71     43873136 :     range.start.field1 == range.end.field1
      72     43868660 :         && range.start.field2 == range.end.field2
      73     43868516 :         && range.start.field3 == range.end.field3
      74     43868258 :         && range.start.field4 == range.end.field4
      75     43868252 :         && (range.start.field5 == range.end.field5
      76      4962672 :             || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
      77     43873136 : }
      78              : 
      79              : impl<'a> ShardedRange<'a> {
      80        11298 :     pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
      81        11298 :         Self {
      82        11298 :             shard_identity,
      83        11298 :             range,
      84        11298 :         }
      85        11298 :     }
      86              : 
      87              :     /// Break up this range into chunks, each of which has at least one local key in it if the
      88              :     /// total range has at least one local key.
      89        11256 :     pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
      90        11256 :         // Optimization for single-key case (e.g. logical size keys)
      91        11256 :         if self.range.end == self.range.start.add(1) {
      92         4326 :             return vec![(
      93         4326 :                 if self.shard_identity.is_key_disposable(&self.range.start) {
      94            0 :                     0
      95              :                 } else {
      96         4326 :                     1
      97              :                 },
      98         4326 :                 self.range,
      99              :             )];
     100         6930 :         }
     101         6930 : 
     102         6930 :         if !is_contiguous_range(&self.range) {
     103              :             // Ranges that span relations are not fragmented.  We only get these ranges as a result
     104              :             // of operations that act on existing layers, so we trust that the existing range is
     105              :             // reasonably small.
     106           12 :             return vec![(u32::MAX, self.range)];
     107         6918 :         }
     108         6918 : 
     109         6918 :         let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
     110         6918 : 
     111         6918 :         let mut cursor = self.range.start;
     112        14712 :         while cursor < self.range.end {
     113         7794 :             let advance_by = self.distance_to_next_boundary(cursor);
     114         7794 :             let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
     115              : 
     116              :             // If the previous fragment is undersized, then we seek to consume enough
     117              :             // blocks to complete it.
     118         7794 :             let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
     119          876 :                 Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
     120          810 :                 Some(frag) => {
     121          810 :                     // Prev block is complete, want the full number.
     122          810 :                     (
     123          810 :                         target_nblocks,
     124          810 :                         if is_fragment_disposable {
     125              :                             // If this current range will be empty (not shard-local data), we will merge into previous
     126            0 :                             Some(frag)
     127              :                         } else {
     128          810 :                             None
     129              :                         },
     130              :                     )
     131              :                 }
     132              :                 None => {
     133              :                     // First iteration, want the full number
     134         6918 :                     (target_nblocks, None)
     135              :                 }
     136              :             };
     137              : 
     138         7794 :             let advance_by = if is_fragment_disposable {
     139         2808 :                 advance_by
     140              :             } else {
     141         4986 :                 std::cmp::min(advance_by, want_blocks)
     142              :             };
     143              : 
     144         7794 :             let next_cursor = cursor.add(advance_by);
     145              : 
     146         7794 :             let this_frag = (
     147         7794 :                 if is_fragment_disposable {
     148         2808 :                     0
     149              :                 } else {
     150         4986 :                     advance_by
     151              :                 },
     152         7794 :                 cursor..next_cursor,
     153         7794 :             );
     154         7794 :             cursor = next_cursor;
     155              : 
     156         7794 :             if let Some(last_fragment) = merge_last_fragment {
     157           66 :                 // Previous fragment was short or this one is empty, merge into it
     158           66 :                 last_fragment.0 += this_frag.0;
     159           66 :                 last_fragment.1.end = this_frag.1.end;
     160         7728 :             } else {
     161         7728 :                 fragments.push(this_frag);
     162         7728 :             }
     163              :         }
     164              : 
     165         6918 :         fragments
     166        11256 :     }
     167              : 
     168              :     /// Estimate the physical pages that are within this range, on this shard.  This returns
     169              :     /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
     170         6114 :     pub fn page_count(&self) -> u32 {
     171         6114 :         // Special cases for single keys like logical sizes
     172         6114 :         if self.range.end == self.range.start.add(1) {
     173           36 :             return if self.shard_identity.is_key_disposable(&self.range.start) {
     174           18 :                 0
     175              :             } else {
     176           18 :                 1
     177              :             };
     178         6078 :         }
     179         6078 : 
     180         6078 :         // We can only do an authentic calculation of contiguous key ranges
     181         6078 :         if !is_contiguous_range(&self.range) {
     182           24 :             return u32::MAX;
     183         6054 :         }
     184         6054 : 
     185         6054 :         // Special case for single sharded tenants: our logical and physical sizes are the same
     186         6054 :         if self.shard_identity.count < ShardCount::new(2) {
     187         3144 :             return contiguous_range_len(&self.range);
     188         2910 :         }
     189         2910 : 
     190         2910 :         // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
     191         2910 :         // to Self, and add the stripe's block count to our total if so.
     192         2910 :         let mut result: u64 = 0;
     193         2910 :         let mut cursor = self.range.start;
     194         5886 :         while cursor < self.range.end {
     195              :             // Count up to the next stripe_size boundary or end of range
     196         2976 :             let advance_by = self.distance_to_next_boundary(cursor);
     197         2976 : 
     198         2976 :             // If this blocks in this stripe belong to us, add them to our count
     199         2976 :             if !self.shard_identity.is_key_disposable(&cursor) {
     200          168 :                 result += advance_by as u64;
     201         2808 :             }
     202              : 
     203         2976 :             cursor = cursor.add(advance_by);
     204              :         }
     205              : 
     206         2910 :         if result > u32::MAX as u64 {
     207            0 :             u32::MAX
     208              :         } else {
     209         2910 :             result as u32
     210              :         }
     211         6114 :     }
     212              : 
     213              :     /// Advance the cursor to the next potential fragment boundary: this is either
     214              :     /// a stripe boundary, or the end of the range.
     215        10770 :     fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
     216        10770 :         let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
     217        10770 : 
     218        10770 :         if self.shard_identity.count < ShardCount::new(2) {
     219              :             // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
     220         4776 :             return distance_to_range_end;
     221         5994 :         }
     222         5994 : 
     223         5994 :         if cursor.field6 == 0xffffffff {
     224              :             // We are wrapping from one relation's logical size to the next relation's first data block
     225           24 :             return 1;
     226         5970 :         }
     227         5970 : 
     228         5970 :         let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
     229         5970 :         let stripe_remainder = self.shard_identity.stripe_size.0
     230         5970 :             - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
     231         5970 : 
     232         5970 :         if cfg!(debug_assertions) {
     233              :             // We should never overflow field5 and field6 -- our callers check this earlier
     234              :             // and would have returned their u32::MAX cases if the input range violated this.
     235         5970 :             let next_cursor = cursor.add(stripe_remainder);
     236         5970 :             debug_assert!(
     237         5970 :                 next_cursor.field1 == cursor.field1
     238         5970 :                     && next_cursor.field2 == cursor.field2
     239         5970 :                     && next_cursor.field3 == cursor.field3
     240         5970 :                     && next_cursor.field4 == cursor.field4
     241         5970 :                     && next_cursor.field5 == cursor.field5
     242              :             )
     243            0 :         }
     244              : 
     245         5970 :         std::cmp::min(stripe_remainder, distance_to_range_end)
     246        10770 :     }
     247              : 
     248              :     /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
     249              :     /// this function simply calculates the number of pages in the space, without accounting for those
     250              :     /// pages that would not actually be stored on this node.
     251              :     ///
     252              :     /// Don't use this function in code that works with physical entities like layer files.
     253     21925522 :     pub fn raw_size(range: &Range<Key>) -> u32 {
     254     21925522 :         if is_contiguous_range(range) {
     255     21920668 :             contiguous_range_len(range)
     256              :         } else {
     257         4854 :             u32::MAX
     258              :         }
     259     21925522 :     }
     260              : }
     261              : 
     262              : impl KeySpace {
     263              :     /// Create a key space with a single range.
     264        24348 :     pub fn single(key_range: Range<Key>) -> Self {
     265        24348 :         Self {
     266        24348 :             ranges: vec![key_range],
     267        24348 :         }
     268        24348 :     }
     269              : 
     270              :     /// Partition a key space into roughly chunks of roughly 'target_size' bytes
     271              :     /// in each partition.
     272              :     ///
     273          864 :     pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
     274          864 :         // Assume that each value is 8k in size.
     275          864 :         let target_nblocks = (target_size / BLCKSZ as u64) as u32;
     276          864 : 
     277          864 :         let mut parts = Vec::new();
     278          864 :         let mut current_part = Vec::new();
     279          864 :         let mut current_part_size: usize = 0;
     280         6048 :         for range in &self.ranges {
     281              :             // While doing partitioning, wrap the range in ShardedRange so that our size calculations
     282              :             // will respect shard striping rather than assuming all keys within a range are present.
     283         5184 :             let range = ShardedRange::new(range.clone(), shard_identity);
     284              : 
     285              :             // Chunk up the range into parts that each contain up to target_size local blocks
     286         5196 :             for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
     287              :                 // If appending the next contiguous range in the keyspace to the current
     288              :                 // partition would cause it to be too large, and our current partition
     289              :                 // covers at least one block that is physically present in this shard,
     290              :                 // then start a new partition
     291         5196 :                 if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
     292           72 :                     && current_part_size > 0
     293           72 :                 {
     294           72 :                     parts.push(KeySpace {
     295           72 :                         ranges: current_part,
     296           72 :                     });
     297           72 :                     current_part = Vec::new();
     298           72 :                     current_part_size = 0;
     299         5124 :                 }
     300         5196 :                 current_part.push(frag_range.start..frag_range.end);
     301         5196 :                 current_part_size += frag_on_shard_size as usize;
     302              :             }
     303              :         }
     304              : 
     305              :         // add last partition that wasn't full yet.
     306          864 :         if !current_part.is_empty() {
     307          864 :             parts.push(KeySpace {
     308          864 :                 ranges: current_part,
     309          864 :             });
     310          864 :         }
     311              : 
     312          864 :         KeyPartitioning { parts }
     313          864 :     }
     314              : 
     315      8253472 :     pub fn is_empty(&self) -> bool {
     316      8253472 :         self.total_raw_size() == 0
     317      8253472 :     }
     318              : 
     319              :     /// Merge another keyspace into the current one.
     320              :     /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
     321      5016406 :     pub fn merge(&mut self, other: &KeySpace) {
     322      5016406 :         let all_ranges = self
     323      5016406 :             .ranges
     324      5016406 :             .iter()
     325      5016406 :             .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
     326      5016406 : 
     327      5016406 :         let mut accum = KeySpaceAccum::new();
     328      5016406 :         let mut prev: Option<&Range<Key>> = None;
     329      7210831 :         for range in all_ranges {
     330      2194425 :             if let Some(prev) = prev {
     331       315249 :                 let overlap =
     332       315249 :                     std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
     333       315249 :                 assert!(
     334       315249 :                     !overlap,
     335            0 :                     "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
     336              :                     prev, range
     337              :                 );
     338      1879176 :             }
     339              : 
     340      2194425 :             accum.add_range(range.clone());
     341      2194425 :             prev = Some(range);
     342              :         }
     343              : 
     344      5016406 :         self.ranges = accum.to_keyspace().ranges;
     345      5016406 :     }
     346              : 
     347              :     /// Remove all keys in `other` from `self`.
     348              :     /// This can involve splitting or removing of existing ranges.
     349              :     /// Returns the removed keyspace
     350     10836358 :     pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
     351     10836358 :         let (self_start, self_end) = match (self.start(), self.end()) {
     352      8933758 :             (Some(start), Some(end)) => (start, end),
     353              :             _ => {
     354              :                 // self is empty
     355      1902600 :                 return KeySpace::default();
     356              :             }
     357              :         };
     358              : 
     359              :         // Key spaces are sorted by definition, so skip ahead to the first
     360              :         // potentially intersecting range. Similarly, ignore ranges that start
     361              :         // after the current keyspace ends.
     362      8933758 :         let other_ranges = other
     363      8933758 :             .ranges
     364      8933758 :             .iter()
     365      8933758 :             .skip_while(|range| self_start >= range.end)
     366      8933758 :             .take_while(|range| self_end > range.start);
     367      8933758 : 
     368      8933758 :         let mut removed_accum = KeySpaceRandomAccum::new();
     369     12933358 :         for range in other_ranges {
     370      8215698 :             while let Some(overlap_at) = self.overlaps_at(range) {
     371      4216098 :                 let overlapped = self.ranges[overlap_at].clone();
     372      4216098 : 
     373      4216098 :                 if overlapped.start < range.start && overlapped.end <= range.end {
     374           51 :                     // Higher part of the range is completely overlapped.
     375           51 :                     removed_accum.add_range(range.start..self.ranges[overlap_at].end);
     376           51 :                     self.ranges[overlap_at].end = range.start;
     377      4216047 :                 }
     378      4216098 :                 if overlapped.start >= range.start && overlapped.end > range.end {
     379          211 :                     // Lower part of the range is completely overlapped.
     380          211 :                     removed_accum.add_range(self.ranges[overlap_at].start..range.end);
     381          211 :                     self.ranges[overlap_at].start = range.end;
     382      4215887 :                 }
     383      4216098 :                 if overlapped.start < range.start && overlapped.end > range.end {
     384       240038 :                     // Middle part of the range is overlapped.
     385       240038 :                     removed_accum.add_range(range.clone());
     386       240038 :                     self.ranges[overlap_at].end = range.start;
     387       240038 :                     self.ranges
     388       240038 :                         .insert(overlap_at + 1, range.end..overlapped.end);
     389      3976060 :                 }
     390      4216098 :                 if overlapped.start >= range.start && overlapped.end <= range.end {
     391      3975798 :                     // Whole range is overlapped
     392      3975798 :                     removed_accum.add_range(self.ranges[overlap_at].clone());
     393      3975798 :                     self.ranges.remove(overlap_at);
     394      3975798 :                 }
     395              :             }
     396              :         }
     397              : 
     398      8933758 :         removed_accum.to_keyspace()
     399     10836358 :     }
     400              : 
     401     10836472 :     pub fn start(&self) -> Option<Key> {
     402     10836472 :         self.ranges.first().map(|range| range.start)
     403     10836472 :     }
     404              : 
     405     10837348 :     pub fn end(&self) -> Option<Key> {
     406     10837348 :         self.ranges.last().map(|range| range.end)
     407     10837348 :     }
     408              : 
     409              :     /// The size of the keyspace in pages, before accounting for sharding
     410     10137256 :     pub fn total_raw_size(&self) -> usize {
     411     10137256 :         self.ranges
     412     10137256 :             .iter()
     413     10137256 :             .map(|range| ShardedRange::raw_size(range) as usize)
     414     10137256 :             .sum()
     415     10137256 :     }
     416              : 
     417     10297456 :     fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
     418     12583915 :         match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
     419          336 :             Ok(0) => None,
     420      3762446 :             Err(0) => None,
     421       366694 :             Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     422      6167980 :             Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     423       368938 :             _ => None,
     424              :         }
     425     10297456 :     }
     426              : 
     427              :     ///
     428              :     /// Check if key space contains overlapping range
     429              :     ///
     430      2081758 :     pub fn overlaps(&self, range: &Range<Key>) -> bool {
     431      2081758 :         self.overlaps_at(range).is_some()
     432      2081758 :     }
     433              : 
     434              :     /// Check if the keyspace contains a key
     435      2075860 :     pub fn contains(&self, key: &Key) -> bool {
     436      2075860 :         self.overlaps(&(*key..key.next()))
     437      2075860 :     }
     438              : }
     439              : 
     440              : ///
     441              : /// Represents a partitioning of the key space.
     442              : ///
     443              : /// The only kind of partitioning we do is to partition the key space into
     444              : /// partitions that are roughly equal in physical size (see KeySpace::partition).
     445              : /// But this data structure could represent any partitioning.
     446              : ///
     447              : #[derive(Clone, Debug, Default)]
     448              : pub struct KeyPartitioning {
     449              :     pub parts: Vec<KeySpace>,
     450              : }
     451              : 
     452              : /// Represents a partitioning of the sparse key space.
     453              : #[derive(Clone, Debug, Default)]
     454              : pub struct SparseKeyPartitioning {
     455              :     pub parts: Vec<SparseKeySpace>,
     456              : }
     457              : 
     458              : impl KeyPartitioning {
     459         2472 :     pub fn new() -> Self {
     460         2472 :         KeyPartitioning { parts: Vec::new() }
     461         2472 :     }
     462              : 
     463              :     /// Convert a key partitioning to a sparse partition.
     464         1236 :     pub fn into_sparse(self) -> SparseKeyPartitioning {
     465         1236 :         SparseKeyPartitioning {
     466         1236 :             parts: self.parts.into_iter().map(SparseKeySpace).collect(),
     467         1236 :         }
     468         1236 :     }
     469              : }
     470              : 
     471              : impl SparseKeyPartitioning {
     472              :     /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
     473              :     /// cause long/dead loops.
     474         1608 :     pub fn into_dense(self) -> KeyPartitioning {
     475         1608 :         KeyPartitioning {
     476         1608 :             parts: self.parts.into_iter().map(|x| x.0).collect(),
     477         1608 :         }
     478         1608 :     }
     479              : }
     480              : 
     481              : ///
     482              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     483              : /// object. This takes care of merging adjacent keys and key ranges into
     484              : /// contiguous ranges.
     485              : ///
     486              : #[derive(Clone, Debug, Default)]
     487              : pub struct KeySpaceAccum {
     488              :     accum: Option<Range<Key>>,
     489              : 
     490              :     ranges: Vec<Range<Key>>,
     491              :     size: u64,
     492              : }
     493              : 
     494              : impl KeySpaceAccum {
     495      6575126 :     pub fn new() -> Self {
     496      6575126 :         Self {
     497      6575126 :             accum: None,
     498      6575126 :             ranges: Vec::new(),
     499      6575126 :             size: 0,
     500      6575126 :         }
     501      6575126 :     }
     502              : 
     503              :     #[inline(always)]
     504     12243918 :     pub fn add_key(&mut self, key: Key) {
     505     12243918 :         self.add_range(singleton_range(key))
     506     12243918 :     }
     507              : 
     508              :     #[inline(always)]
     509     16009055 :     pub fn add_range(&mut self, range: Range<Key>) {
     510     16009055 :         self.size += ShardedRange::raw_size(&range) as u64;
     511     16009055 : 
     512     16009055 :         match self.accum.as_mut() {
     513     12529119 :             Some(accum) => {
     514     12529119 :                 if range.start == accum.end {
     515     12200466 :                     accum.end = range.end;
     516     12200466 :                 } else {
     517              :                     // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
     518              :                     // a new range here if the skipped region was all keys that don't belong on this shard.
     519              :                     // (https://github.com/neondatabase/neon/issues/6247)
     520       328653 :                     assert!(range.start > accum.end);
     521       328653 :                     self.ranges.push(accum.clone());
     522       328653 :                     *accum = range;
     523              :                 }
     524              :             }
     525      3479936 :             None => self.accum = Some(range),
     526              :         }
     527     16009055 :     }
     528              : 
     529      5923260 :     pub fn to_keyspace(mut self) -> KeySpace {
     530      5923260 :         if let Some(accum) = self.accum.take() {
     531      2775872 :             self.ranges.push(accum);
     532      3147388 :         }
     533      5923260 :         KeySpace {
     534      5923260 :             ranges: self.ranges,
     535      5923260 :         }
     536      5923260 :     }
     537              : 
     538         3366 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     539         3366 :         std::mem::take(self).to_keyspace()
     540         3366 :     }
     541              : 
     542              :     // The total number of keys in this object, ignoring any sharding effects that might cause some of
     543              :     // the keys to be omitted in storage on this shard.
     544         7290 :     pub fn raw_size(&self) -> u64 {
     545         7290 :         self.size
     546         7290 :     }
     547              : }
     548              : 
     549              : ///
     550              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     551              : /// object. Key ranges may be inserted in any order and can overlap.
     552              : ///
     553              : #[derive(Clone, Debug, Default)]
     554              : pub struct KeySpaceRandomAccum {
     555              :     ranges: Vec<Range<Key>>,
     556              : }
     557              : 
     558              : impl KeySpaceRandomAccum {
     559     25873342 :     pub fn new() -> Self {
     560     25873342 :         Self { ranges: Vec::new() }
     561     25873342 :     }
     562              : 
     563      2000652 :     pub fn add_key(&mut self, key: Key) {
     564      2000652 :         self.add_range(singleton_range(key))
     565      2000652 :     }
     566              : 
     567      9147663 :     pub fn add_range(&mut self, range: Range<Key>) {
     568      9147663 :         self.ranges.push(range);
     569      9147663 :     }
     570              : 
     571      2649225 :     pub fn add_keyspace(&mut self, keyspace: KeySpace) {
     572      5298972 :         for range in keyspace.ranges {
     573      2649747 :             self.add_range(range);
     574      2649747 :         }
     575      2649225 :     }
     576              : 
     577     18975948 :     pub fn to_keyspace(mut self) -> KeySpace {
     578     18975948 :         let mut ranges = Vec::new();
     579     18975948 :         if !self.ranges.is_empty() {
     580      8126962 :             self.ranges.sort_by_key(|r| r.start);
     581      8126962 :             let mut start = self.ranges.first().unwrap().start;
     582      8126962 :             let mut end = self.ranges.first().unwrap().end;
     583     17274415 :             for r in self.ranges {
     584      9147453 :                 assert!(r.start >= start);
     585      9147453 :                 if r.start > end {
     586       770801 :                     ranges.push(start..end);
     587       770801 :                     start = r.start;
     588       770801 :                     end = r.end;
     589      8376652 :                 } else if r.end > end {
     590        13464 :                     end = r.end;
     591      8363188 :                 }
     592              :             }
     593      8126962 :             ranges.push(start..end);
     594     10848986 :         }
     595     18975948 :         KeySpace { ranges }
     596     18975948 :     }
     597              : 
     598     10032752 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     599     10032752 :         let mut prev_accum = KeySpaceRandomAccum::new();
     600     10032752 :         std::mem::swap(self, &mut prev_accum);
     601     10032752 : 
     602     10032752 :         prev_accum.to_keyspace()
     603     10032752 :     }
     604              : }
     605              : 
     606     14244570 : pub fn singleton_range(key: Key) -> Range<Key> {
     607     14244570 :     key..key.next()
     608     14244570 : }
     609              : 
     610              : #[cfg(test)]
     611              : mod tests {
     612              :     use rand::{RngCore, SeedableRng};
     613              : 
     614              :     use crate::{
     615              :         models::ShardParameters,
     616              :         shard::{ShardCount, ShardNumber},
     617              :     };
     618              : 
     619              :     use super::*;
     620              :     use std::fmt::Write;
     621              : 
     622              :     // Helper function to create a key range.
     623              :     //
     624              :     // Make the tests below less verbose.
     625          276 :     fn kr(irange: Range<i128>) -> Range<Key> {
     626          276 :         Key::from_i128(irange.start)..Key::from_i128(irange.end)
     627          276 :     }
     628              : 
     629              :     #[allow(dead_code)]
     630            0 :     fn dump_keyspace(ks: &KeySpace) {
     631            0 :         for r in ks.ranges.iter() {
     632            0 :             println!("  {}..{}", r.start.to_i128(), r.end.to_i128());
     633            0 :         }
     634            0 :     }
     635              : 
     636           66 :     fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
     637           66 :         if actual.ranges != expected {
     638            0 :             let mut msg = String::new();
     639            0 : 
     640            0 :             writeln!(msg, "expected:").unwrap();
     641            0 :             for r in &expected {
     642            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     643            0 :             }
     644            0 :             writeln!(msg, "got:").unwrap();
     645            0 :             for r in &actual.ranges {
     646            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     647            0 :             }
     648            0 :             panic!("{}", msg);
     649           66 :         }
     650           66 :     }
     651              : 
     652              :     #[test]
     653            6 :     fn keyspace_consume() {
     654            6 :         let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
     655            6 : 
     656            6 :         let mut accum = KeySpaceAccum::new();
     657           24 :         for range in &ranges {
     658           18 :             accum.add_range(range.clone());
     659           18 :         }
     660              : 
     661            6 :         let expected_size: u64 = ranges
     662            6 :             .iter()
     663           18 :             .map(|r| ShardedRange::raw_size(r) as u64)
     664            6 :             .sum();
     665            6 :         assert_eq!(accum.raw_size(), expected_size);
     666              : 
     667            6 :         assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
     668            6 :         assert_eq!(accum.raw_size(), 0);
     669              : 
     670            6 :         assert_ks_eq(&accum.consume_keyspace(), vec![]);
     671            6 :         assert_eq!(accum.raw_size(), 0);
     672              : 
     673           24 :         for range in &ranges {
     674           18 :             accum.add_range(range.clone());
     675           18 :         }
     676            6 :         assert_ks_eq(&accum.to_keyspace(), ranges);
     677            6 :     }
     678              : 
     679              :     #[test]
     680            6 :     fn keyspace_add_range() {
     681            6 :         // two separate ranges
     682            6 :         //
     683            6 :         // #####
     684            6 :         //         #####
     685            6 :         let mut ks = KeySpaceRandomAccum::default();
     686            6 :         ks.add_range(kr(0..10));
     687            6 :         ks.add_range(kr(20..30));
     688            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
     689            6 : 
     690            6 :         // two separate ranges, added in reverse order
     691            6 :         //
     692            6 :         //         #####
     693            6 :         // #####
     694            6 :         let mut ks = KeySpaceRandomAccum::default();
     695            6 :         ks.add_range(kr(20..30));
     696            6 :         ks.add_range(kr(0..10));
     697            6 : 
     698            6 :         // add range that is adjacent to the end of an existing range
     699            6 :         //
     700            6 :         // #####
     701            6 :         //      #####
     702            6 :         ks.add_range(kr(0..10));
     703            6 :         ks.add_range(kr(10..30));
     704            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     705            6 : 
     706            6 :         // add range that is adjacent to the start of an existing range
     707            6 :         //
     708            6 :         //      #####
     709            6 :         // #####
     710            6 :         let mut ks = KeySpaceRandomAccum::default();
     711            6 :         ks.add_range(kr(10..30));
     712            6 :         ks.add_range(kr(0..10));
     713            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     714            6 : 
     715            6 :         // add range that overlaps with the end of an existing range
     716            6 :         //
     717            6 :         // #####
     718            6 :         //    #####
     719            6 :         let mut ks = KeySpaceRandomAccum::default();
     720            6 :         ks.add_range(kr(0..10));
     721            6 :         ks.add_range(kr(5..30));
     722            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     723            6 : 
     724            6 :         // add range that overlaps with the start of an existing range
     725            6 :         //
     726            6 :         //    #####
     727            6 :         // #####
     728            6 :         let mut ks = KeySpaceRandomAccum::default();
     729            6 :         ks.add_range(kr(5..30));
     730            6 :         ks.add_range(kr(0..10));
     731            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     732            6 : 
     733            6 :         // add range that is fully covered by an existing range
     734            6 :         //
     735            6 :         // #########
     736            6 :         //   #####
     737            6 :         let mut ks = KeySpaceRandomAccum::default();
     738            6 :         ks.add_range(kr(0..30));
     739            6 :         ks.add_range(kr(10..20));
     740            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     741            6 : 
     742            6 :         // add range that extends an existing range from both ends
     743            6 :         //
     744            6 :         //   #####
     745            6 :         // #########
     746            6 :         let mut ks = KeySpaceRandomAccum::default();
     747            6 :         ks.add_range(kr(10..20));
     748            6 :         ks.add_range(kr(0..30));
     749            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     750            6 : 
     751            6 :         // add a range that overlaps with two existing ranges, joining them
     752            6 :         //
     753            6 :         // #####   #####
     754            6 :         //    #######
     755            6 :         let mut ks = KeySpaceRandomAccum::default();
     756            6 :         ks.add_range(kr(0..10));
     757            6 :         ks.add_range(kr(20..30));
     758            6 :         ks.add_range(kr(5..25));
     759            6 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     760            6 :     }
     761              : 
     762              :     #[test]
     763            6 :     fn keyspace_overlaps() {
     764            6 :         let mut ks = KeySpaceRandomAccum::default();
     765            6 :         ks.add_range(kr(10..20));
     766            6 :         ks.add_range(kr(30..40));
     767            6 :         let ks = ks.to_keyspace();
     768            6 : 
     769            6 :         //        #####      #####
     770            6 :         // xxxx
     771            6 :         assert!(!ks.overlaps(&kr(0..5)));
     772              : 
     773              :         //        #####      #####
     774              :         //   xxxx
     775            6 :         assert!(!ks.overlaps(&kr(5..9)));
     776              : 
     777              :         //        #####      #####
     778              :         //    xxxx
     779            6 :         assert!(!ks.overlaps(&kr(5..10)));
     780              : 
     781              :         //        #####      #####
     782              :         //     xxxx
     783            6 :         assert!(ks.overlaps(&kr(5..11)));
     784              : 
     785              :         //        #####      #####
     786              :         //        xxxx
     787            6 :         assert!(ks.overlaps(&kr(10..15)));
     788              : 
     789              :         //        #####      #####
     790              :         //         xxxx
     791            6 :         assert!(ks.overlaps(&kr(15..20)));
     792              : 
     793              :         //        #####      #####
     794              :         //           xxxx
     795            6 :         assert!(ks.overlaps(&kr(15..25)));
     796              : 
     797              :         //        #####      #####
     798              :         //              xxxx
     799            6 :         assert!(!ks.overlaps(&kr(22..28)));
     800              : 
     801              :         //        #####      #####
     802              :         //               xxxx
     803            6 :         assert!(!ks.overlaps(&kr(25..30)));
     804              : 
     805              :         //        #####      #####
     806              :         //                      xxxx
     807            6 :         assert!(ks.overlaps(&kr(35..35)));
     808              : 
     809              :         //        #####      #####
     810              :         //                        xxxx
     811            6 :         assert!(!ks.overlaps(&kr(40..45)));
     812              : 
     813              :         //        #####      #####
     814              :         //                        xxxx
     815            6 :         assert!(!ks.overlaps(&kr(45..50)));
     816              : 
     817              :         //        #####      #####
     818              :         //        xxxxxxxxxxx
     819            6 :         assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
     820            6 :     }
     821              : 
     822              :     #[test]
     823            6 :     fn test_remove_full_overlapps() {
     824            6 :         let mut key_space1 = KeySpace {
     825            6 :             ranges: vec![
     826            6 :                 Key::from_i128(1)..Key::from_i128(4),
     827            6 :                 Key::from_i128(5)..Key::from_i128(8),
     828            6 :                 Key::from_i128(10)..Key::from_i128(12),
     829            6 :             ],
     830            6 :         };
     831            6 :         let key_space2 = KeySpace {
     832            6 :             ranges: vec![
     833            6 :                 Key::from_i128(2)..Key::from_i128(3),
     834            6 :                 Key::from_i128(6)..Key::from_i128(7),
     835            6 :                 Key::from_i128(11)..Key::from_i128(13),
     836            6 :             ],
     837            6 :         };
     838            6 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     839            6 :         let removed_expected = KeySpace {
     840            6 :             ranges: vec![
     841            6 :                 Key::from_i128(2)..Key::from_i128(3),
     842            6 :                 Key::from_i128(6)..Key::from_i128(7),
     843            6 :                 Key::from_i128(11)..Key::from_i128(12),
     844            6 :             ],
     845            6 :         };
     846            6 :         assert_eq!(removed, removed_expected);
     847              : 
     848            6 :         assert_eq!(
     849            6 :             key_space1.ranges,
     850            6 :             vec![
     851            6 :                 Key::from_i128(1)..Key::from_i128(2),
     852            6 :                 Key::from_i128(3)..Key::from_i128(4),
     853            6 :                 Key::from_i128(5)..Key::from_i128(6),
     854            6 :                 Key::from_i128(7)..Key::from_i128(8),
     855            6 :                 Key::from_i128(10)..Key::from_i128(11)
     856            6 :             ]
     857            6 :         );
     858            6 :     }
     859              : 
     860              :     #[test]
     861            6 :     fn test_remove_partial_overlaps() {
     862            6 :         // Test partial ovelaps
     863            6 :         let mut key_space1 = KeySpace {
     864            6 :             ranges: vec![
     865            6 :                 Key::from_i128(1)..Key::from_i128(5),
     866            6 :                 Key::from_i128(7)..Key::from_i128(10),
     867            6 :                 Key::from_i128(12)..Key::from_i128(15),
     868            6 :             ],
     869            6 :         };
     870            6 :         let key_space2 = KeySpace {
     871            6 :             ranges: vec![
     872            6 :                 Key::from_i128(3)..Key::from_i128(6),
     873            6 :                 Key::from_i128(8)..Key::from_i128(11),
     874            6 :                 Key::from_i128(14)..Key::from_i128(17),
     875            6 :             ],
     876            6 :         };
     877            6 : 
     878            6 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     879            6 :         let removed_expected = KeySpace {
     880            6 :             ranges: vec![
     881            6 :                 Key::from_i128(3)..Key::from_i128(5),
     882            6 :                 Key::from_i128(8)..Key::from_i128(10),
     883            6 :                 Key::from_i128(14)..Key::from_i128(15),
     884            6 :             ],
     885            6 :         };
     886            6 :         assert_eq!(removed, removed_expected);
     887              : 
     888            6 :         assert_eq!(
     889            6 :             key_space1.ranges,
     890            6 :             vec![
     891            6 :                 Key::from_i128(1)..Key::from_i128(3),
     892            6 :                 Key::from_i128(7)..Key::from_i128(8),
     893            6 :                 Key::from_i128(12)..Key::from_i128(14),
     894            6 :             ]
     895            6 :         );
     896            6 :     }
     897              : 
     898              :     #[test]
     899            6 :     fn test_remove_no_overlaps() {
     900            6 :         let mut key_space1 = KeySpace {
     901            6 :             ranges: vec![
     902            6 :                 Key::from_i128(1)..Key::from_i128(5),
     903            6 :                 Key::from_i128(7)..Key::from_i128(10),
     904            6 :                 Key::from_i128(12)..Key::from_i128(15),
     905            6 :             ],
     906            6 :         };
     907            6 :         let key_space2 = KeySpace {
     908            6 :             ranges: vec![
     909            6 :                 Key::from_i128(6)..Key::from_i128(7),
     910            6 :                 Key::from_i128(11)..Key::from_i128(12),
     911            6 :                 Key::from_i128(15)..Key::from_i128(17),
     912            6 :             ],
     913            6 :         };
     914            6 : 
     915            6 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     916            6 :         let removed_expected = KeySpace::default();
     917            6 :         assert_eq!(removed, removed_expected);
     918              : 
     919            6 :         assert_eq!(
     920            6 :             key_space1.ranges,
     921            6 :             vec![
     922            6 :                 Key::from_i128(1)..Key::from_i128(5),
     923            6 :                 Key::from_i128(7)..Key::from_i128(10),
     924            6 :                 Key::from_i128(12)..Key::from_i128(15),
     925            6 :             ]
     926            6 :         );
     927            6 :     }
     928              : 
     929              :     #[test]
     930            6 :     fn test_remove_one_range_overlaps_multiple() {
     931            6 :         let mut key_space1 = KeySpace {
     932            6 :             ranges: vec![
     933            6 :                 Key::from_i128(1)..Key::from_i128(3),
     934            6 :                 Key::from_i128(3)..Key::from_i128(6),
     935            6 :                 Key::from_i128(6)..Key::from_i128(10),
     936            6 :                 Key::from_i128(12)..Key::from_i128(15),
     937            6 :                 Key::from_i128(17)..Key::from_i128(20),
     938            6 :                 Key::from_i128(20)..Key::from_i128(30),
     939            6 :                 Key::from_i128(30)..Key::from_i128(40),
     940            6 :             ],
     941            6 :         };
     942            6 :         let key_space2 = KeySpace {
     943            6 :             ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
     944            6 :         };
     945            6 : 
     946            6 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     947            6 :         let removed_expected = KeySpace {
     948            6 :             ranges: vec![
     949            6 :                 Key::from_i128(9)..Key::from_i128(10),
     950            6 :                 Key::from_i128(12)..Key::from_i128(15),
     951            6 :                 Key::from_i128(17)..Key::from_i128(19),
     952            6 :             ],
     953            6 :         };
     954            6 :         assert_eq!(removed, removed_expected);
     955              : 
     956            6 :         assert_eq!(
     957            6 :             key_space1.ranges,
     958            6 :             vec![
     959            6 :                 Key::from_i128(1)..Key::from_i128(3),
     960            6 :                 Key::from_i128(3)..Key::from_i128(6),
     961            6 :                 Key::from_i128(6)..Key::from_i128(9),
     962            6 :                 Key::from_i128(19)..Key::from_i128(20),
     963            6 :                 Key::from_i128(20)..Key::from_i128(30),
     964            6 :                 Key::from_i128(30)..Key::from_i128(40),
     965            6 :             ]
     966            6 :         );
     967            6 :     }
     968              :     #[test]
     969            6 :     fn sharded_range_relation_gap() {
     970            6 :         let shard_identity = ShardIdentity::new(
     971            6 :             ShardNumber(0),
     972            6 :             ShardCount::new(4),
     973            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     974            6 :         )
     975            6 :         .unwrap();
     976            6 : 
     977            6 :         let range = ShardedRange::new(
     978            6 :             Range {
     979            6 :                 start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
     980            6 :                 end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
     981            6 :             },
     982            6 :             &shard_identity,
     983            6 :         );
     984            6 : 
     985            6 :         // Key range spans relations, expect MAX
     986            6 :         assert_eq!(range.page_count(), u32::MAX);
     987            6 :     }
     988              : 
     989              :     #[test]
     990            6 :     fn shard_identity_keyspaces_single_key() {
     991            6 :         let shard_identity = ShardIdentity::new(
     992            6 :             ShardNumber(1),
     993            6 :             ShardCount::new(4),
     994            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     995            6 :         )
     996            6 :         .unwrap();
     997            6 : 
     998            6 :         let range = ShardedRange::new(
     999            6 :             Range {
    1000            6 :                 start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
    1001            6 :                 end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
    1002            6 :             },
    1003            6 :             &shard_identity,
    1004            6 :         );
    1005            6 :         // Single-key range on logical size key
    1006            6 :         assert_eq!(range.page_count(), 1);
    1007            6 :     }
    1008              : 
    1009              :     /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
    1010              :     #[test]
    1011            6 :     fn contiguous_range_check() {
    1012            6 :         assert!(!is_contiguous_range(
    1013            6 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1014            6 :                 ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
    1015            6 :         ),);
    1016              : 
    1017              :         // The ranges goes all the way up to the 0xffffffff, including it: this is
    1018              :         // not considered a rel block range because 0xffffffff stores logical sizes,
    1019              :         // not blocks.
    1020            6 :         assert!(!is_contiguous_range(
    1021            6 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1022            6 :                 ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
    1023            6 :         ),);
    1024              : 
    1025              :         // Keys within the normal data region of a relation
    1026            6 :         assert!(is_contiguous_range(
    1027            6 :             &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
    1028            6 :                 ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
    1029            6 :         ),);
    1030              : 
    1031              :         // The logical size key of one forkno, then some blocks in the next
    1032            6 :         assert!(is_contiguous_range(
    1033            6 :             &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
    1034            6 :                 ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
    1035            6 :         ),);
    1036            6 :     }
    1037              : 
    1038              :     #[test]
    1039            6 :     fn shard_identity_keyspaces_forkno_gap() {
    1040            6 :         let shard_identity = ShardIdentity::new(
    1041            6 :             ShardNumber(1),
    1042            6 :             ShardCount::new(4),
    1043            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1044            6 :         )
    1045            6 :         .unwrap();
    1046            6 : 
    1047            6 :         let range = ShardedRange::new(
    1048            6 :             Range {
    1049            6 :                 start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
    1050            6 :                 end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
    1051            6 :             },
    1052            6 :             &shard_identity,
    1053            6 :         );
    1054            6 : 
    1055            6 :         // Range spanning the end of one forkno and the start of the next: we do not attempt to
    1056            6 :         // calculate a valid size, because we have no way to know if they keys between start
    1057            6 :         // and end are actually in use.
    1058            6 :         assert_eq!(range.page_count(), u32::MAX);
    1059            6 :     }
    1060              : 
    1061              :     #[test]
    1062            6 :     fn shard_identity_keyspaces_one_relation() {
    1063           30 :         for shard_number in 0..4 {
    1064           24 :             let shard_identity = ShardIdentity::new(
    1065           24 :                 ShardNumber(shard_number),
    1066           24 :                 ShardCount::new(4),
    1067           24 :                 ShardParameters::DEFAULT_STRIPE_SIZE,
    1068           24 :             )
    1069           24 :             .unwrap();
    1070           24 : 
    1071           24 :             let range = ShardedRange::new(
    1072           24 :                 Range {
    1073           24 :                     start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
    1074           24 :                     end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
    1075           24 :                 },
    1076           24 :                 &shard_identity,
    1077           24 :             );
    1078           24 : 
    1079           24 :             // Very simple case: range covering block zero of one relation, where that block maps to shard zero
    1080           24 :             if shard_number == 0 {
    1081            6 :                 assert_eq!(range.page_count(), 1);
    1082              :             } else {
    1083              :                 // Other shards should perceive the range's size as zero
    1084           18 :                 assert_eq!(range.page_count(), 0);
    1085              :             }
    1086              :         }
    1087            6 :     }
    1088              : 
    1089              :     /// Test helper: construct a ShardedRange and call fragment() on it, returning
    1090              :     /// the total page count in the range and the fragments.
    1091         6072 :     fn do_fragment(
    1092         6072 :         range_start: Key,
    1093         6072 :         range_end: Key,
    1094         6072 :         shard_identity: &ShardIdentity,
    1095         6072 :         target_nblocks: u32,
    1096         6072 :     ) -> (u32, Vec<(u32, Range<Key>)>) {
    1097         6072 :         let range = ShardedRange::new(
    1098         6072 :             Range {
    1099         6072 :                 start: range_start,
    1100         6072 :                 end: range_end,
    1101         6072 :             },
    1102         6072 :             shard_identity,
    1103         6072 :         );
    1104         6072 : 
    1105         6072 :         let page_count = range.page_count();
    1106         6072 :         let fragments = range.fragment(target_nblocks);
    1107         6072 : 
    1108         6072 :         // Invariant: we always get at least one fragment
    1109         6072 :         assert!(!fragments.is_empty());
    1110              : 
    1111              :         // Invariant: the first/last fragment start/end should equal the input start/end
    1112         6072 :         assert_eq!(fragments.first().unwrap().1.start, range_start);
    1113         6072 :         assert_eq!(fragments.last().unwrap().1.end, range_end);
    1114              : 
    1115         6072 :         if page_count > 0 {
    1116              :             // Invariant: every fragment must contain at least one shard-local page, if the
    1117              :             // total range contains at least one shard-local page
    1118         4122 :             let all_nonzero = fragments.iter().all(|f| f.0 > 0);
    1119         3324 :             if !all_nonzero {
    1120            0 :                 eprintln!("Found a zero-length fragment: {:?}", fragments);
    1121         3324 :             }
    1122         3324 :             assert!(all_nonzero);
    1123              :         } else {
    1124              :             // A range with no shard-local pages should always be returned as a single fragment
    1125         2748 :             assert_eq!(fragments, vec![(0, range_start..range_end)]);
    1126              :         }
    1127              : 
    1128              :         // Invariant: fragments must be ordered and non-overlapping
    1129         6072 :         let mut last: Option<Range<Key>> = None;
    1130        12942 :         for frag in &fragments {
    1131         6870 :             if let Some(last) = last {
    1132          798 :                 assert!(frag.1.start >= last.end);
    1133          798 :                 assert!(frag.1.start > last.start);
    1134         6072 :             }
    1135         6870 :             last = Some(frag.1.clone())
    1136              :         }
    1137              : 
    1138              :         // Invariant: fragments respect target_nblocks
    1139        12942 :         for frag in &fragments {
    1140         6870 :             assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
    1141              :         }
    1142              : 
    1143         6072 :         (page_count, fragments)
    1144         6072 :     }
    1145              : 
    1146              :     /// Really simple tests for fragment(), on a range that just contains a single stripe
    1147              :     /// for a single tenant.
    1148              :     #[test]
    1149            6 :     fn sharded_range_fragment_simple() {
    1150            6 :         let shard_identity = ShardIdentity::new(
    1151            6 :             ShardNumber(0),
    1152            6 :             ShardCount::new(4),
    1153            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1154            6 :         )
    1155            6 :         .unwrap();
    1156            6 : 
    1157            6 :         // A range which we happen to know covers exactly one stripe which belongs to this shard
    1158            6 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1159            6 :         let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
    1160            6 : 
    1161            6 :         // Ask for stripe_size blocks, we get the whole stripe
    1162            6 :         assert_eq!(
    1163            6 :             do_fragment(input_start, input_end, &shard_identity, 32768),
    1164            6 :             (32768, vec![(32768, input_start..input_end)])
    1165            6 :         );
    1166              : 
    1167              :         // Ask for more, we still get the whole stripe
    1168            6 :         assert_eq!(
    1169            6 :             do_fragment(input_start, input_end, &shard_identity, 10000000),
    1170            6 :             (32768, vec![(32768, input_start..input_end)])
    1171            6 :         );
    1172              : 
    1173              :         // Ask for target_nblocks of half the stripe size, we get two halves
    1174            6 :         assert_eq!(
    1175            6 :             do_fragment(input_start, input_end, &shard_identity, 16384),
    1176            6 :             (
    1177            6 :                 32768,
    1178            6 :                 vec![
    1179            6 :                     (16384, input_start..input_start.add(16384)),
    1180            6 :                     (16384, input_start.add(16384)..input_end)
    1181            6 :                 ]
    1182            6 :             )
    1183            6 :         );
    1184            6 :     }
    1185              : 
    1186              :     #[test]
    1187            6 :     fn sharded_range_fragment_multi_stripe() {
    1188            6 :         let shard_identity = ShardIdentity::new(
    1189            6 :             ShardNumber(0),
    1190            6 :             ShardCount::new(4),
    1191            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1192            6 :         )
    1193            6 :         .unwrap();
    1194            6 : 
    1195            6 :         // A range which covers multiple stripes, exactly one of which belongs to the current shard.
    1196            6 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1197            6 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1198            6 :         // Ask for all the blocks, get a fragment that covers the whole range but reports
    1199            6 :         // its size to be just the blocks belonging to our shard.
    1200            6 :         assert_eq!(
    1201            6 :             do_fragment(input_start, input_end, &shard_identity, 131072),
    1202            6 :             (32768, vec![(32768, input_start..input_end)])
    1203            6 :         );
    1204              : 
    1205              :         // Ask for a sub-stripe quantity
    1206            6 :         assert_eq!(
    1207            6 :             do_fragment(input_start, input_end, &shard_identity, 16000),
    1208            6 :             (
    1209            6 :                 32768,
    1210            6 :                 vec![
    1211            6 :                     (16000, input_start..input_start.add(16000)),
    1212            6 :                     (16000, input_start.add(16000)..input_start.add(32000)),
    1213            6 :                     (768, input_start.add(32000)..input_end),
    1214            6 :                 ]
    1215            6 :             )
    1216            6 :         );
    1217              : 
    1218              :         // Try on a range that starts slightly after our owned stripe
    1219            6 :         assert_eq!(
    1220            6 :             do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
    1221            6 :             (32767, vec![(32767, input_start.add(1)..input_end)])
    1222            6 :         );
    1223            6 :     }
    1224              : 
    1225              :     /// Test our calculations work correctly when we start a range from the logical size key of
    1226              :     /// a previous relation.
    1227              :     #[test]
    1228            6 :     fn sharded_range_fragment_starting_from_logical_size() {
    1229            6 :         let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
    1230            6 :         let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
    1231            6 : 
    1232            6 :         // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
    1233            6 :         let shard_identity = ShardIdentity::new(
    1234            6 :             ShardNumber(0),
    1235            6 :             ShardCount::new(4),
    1236            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1237            6 :         )
    1238            6 :         .unwrap();
    1239            6 :         assert_eq!(
    1240            6 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1241            6 :             (0x8001, vec![(0x8001, input_start..input_end)])
    1242            6 :         );
    1243              : 
    1244              :         // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
    1245              :         // store all logical sizes)
    1246            6 :         let shard_identity = ShardIdentity::new(
    1247            6 :             ShardNumber(1),
    1248            6 :             ShardCount::new(4),
    1249            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1250            6 :         )
    1251            6 :         .unwrap();
    1252            6 :         assert_eq!(
    1253            6 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1254            6 :             (0x1, vec![(0x1, input_start..input_end)])
    1255            6 :         );
    1256            6 :     }
    1257              : 
    1258              :     /// Test that ShardedRange behaves properly when used on un-sharded data
    1259              :     #[test]
    1260            6 :     fn sharded_range_fragment_unsharded() {
    1261            6 :         let shard_identity = ShardIdentity::unsharded();
    1262            6 : 
    1263            6 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1264            6 :         let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
    1265            6 :         assert_eq!(
    1266            6 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1267            6 :             (
    1268            6 :                 0x10000,
    1269            6 :                 vec![
    1270            6 :                     (0x8000, input_start..input_start.add(0x8000)),
    1271            6 :                     (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
    1272            6 :                 ]
    1273            6 :             )
    1274            6 :         );
    1275            6 :     }
    1276              : 
    1277              :     #[test]
    1278            6 :     fn sharded_range_fragment_cross_relation() {
    1279            6 :         let shard_identity = ShardIdentity::unsharded();
    1280            6 : 
    1281            6 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1282            6 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1283            6 :         let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
    1284            6 :         assert_eq!(
    1285            6 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1286            6 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1287            6 :         );
    1288              : 
    1289              :         // Same, but using a sharded identity
    1290            6 :         let shard_identity = ShardIdentity::new(
    1291            6 :             ShardNumber(0),
    1292            6 :             ShardCount::new(4),
    1293            6 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1294            6 :         )
    1295            6 :         .unwrap();
    1296            6 :         assert_eq!(
    1297            6 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1298            6 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1299            6 :         );
    1300            6 :     }
    1301              : 
    1302              :     #[test]
    1303            6 :     fn sharded_range_fragment_tiny_nblocks() {
    1304            6 :         let shard_identity = ShardIdentity::unsharded();
    1305            6 : 
    1306            6 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1307            6 :         let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
    1308            6 :         let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
    1309            6 :         assert_eq!(
    1310            6 :             do_fragment(input_start, input_end, &shard_identity, 16),
    1311            6 :             (
    1312            6 :                 0x38,
    1313            6 :                 vec![
    1314            6 :                     (16, input_start..input_start.add(16)),
    1315            6 :                     (16, input_start.add(16)..input_start.add(32)),
    1316            6 :                     (16, input_start.add(32)..input_start.add(48)),
    1317            6 :                     (8, input_start.add(48)..input_end),
    1318            6 :                 ]
    1319            6 :             )
    1320            6 :         );
    1321            6 :     }
    1322              : 
    1323              :     #[test]
    1324            6 :     fn sharded_range_fragment_fuzz() {
    1325            6 :         // Use a fixed seed: we don't want to explicitly pick values, but we do want
    1326            6 :         // the test to be reproducible.
    1327            6 :         let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
    1328              : 
    1329         6006 :         for _i in 0..1000 {
    1330         6000 :             let shard_identity = if prng.next_u32() % 2 == 0 {
    1331         3114 :                 ShardIdentity::unsharded()
    1332              :             } else {
    1333         2886 :                 let shard_count = prng.next_u32() % 127 + 1;
    1334         2886 :                 ShardIdentity::new(
    1335         2886 :                     ShardNumber((prng.next_u32() % shard_count) as u8),
    1336         2886 :                     ShardCount::new(shard_count as u8),
    1337         2886 :                     ShardParameters::DEFAULT_STRIPE_SIZE,
    1338         2886 :                 )
    1339         2886 :                 .unwrap()
    1340              :             };
    1341              : 
    1342         6000 :             let target_nblocks = prng.next_u32() % 65536 + 1;
    1343         6000 : 
    1344         6000 :             let start_offset = prng.next_u32() % 16384;
    1345         6000 : 
    1346         6000 :             // Try ranges up to 4GiB in size, that are always at least 1
    1347         6000 :             let range_size = prng.next_u32() % 8192 + 1;
    1348         6000 : 
    1349         6000 :             // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1350         6000 :             let input_start = Key::from_hex("000000067F00000001000004E10000000000")
    1351         6000 :                 .unwrap()
    1352         6000 :                 .add(start_offset);
    1353         6000 :             let input_end = input_start.add(range_size);
    1354         6000 : 
    1355         6000 :             // This test's main success conditions are the invariants baked into do_fragment
    1356         6000 :             let (_total_size, fragments) =
    1357         6000 :                 do_fragment(input_start, input_end, &shard_identity, target_nblocks);
    1358         6000 : 
    1359         6000 :             // Pick a random key within the range and check it appears in the output
    1360         6000 :             let example_key = input_start.add(prng.next_u32() % range_size);
    1361         6000 : 
    1362         6000 :             // Panic on unwrap if it isn't found
    1363         6000 :             let example_key_frag = fragments
    1364         6000 :                 .iter()
    1365         6438 :                 .find(|f| f.1.contains(&example_key))
    1366         6000 :                 .unwrap();
    1367         6000 : 
    1368         6000 :             // Check that the fragment containing our random key has a nonzero size if
    1369         6000 :             // that key is shard-local
    1370         6000 :             let example_key_local = !shard_identity.is_key_disposable(&example_key);
    1371         6000 :             if example_key_local {
    1372         3252 :                 assert!(example_key_frag.0 > 0);
    1373         2748 :             }
    1374              :         }
    1375            6 :     }
    1376              : }
        

Generated by: LCOV version 2.1-beta