LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - keyspace.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 97.6 % 956 933
Test Date: 2024-05-10 13:18:37 Functions: 98.5 % 68 67

            Line data    Source code
       1              : use postgres_ffi::BLCKSZ;
       2              : use std::ops::Range;
       3              : 
       4              : use crate::{
       5              :     key::Key,
       6              :     shard::{ShardCount, ShardIdentity},
       7              : };
       8              : use itertools::Itertools;
       9              : 
      10              : ///
      11              : /// Represents a set of Keys, in a compact form.
      12              : ///
      13              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      14              : pub struct KeySpace {
      15              :     /// Contiguous ranges of keys that belong to the key space. In key order,
      16              :     /// and with no overlap.
      17              :     pub ranges: Vec<Range<Key>>,
      18              : }
      19              : 
      20              : /// A wrapper type for sparse keyspaces.
      21              : #[derive(Clone, Debug, Default, PartialEq, Eq)]
      22              : pub struct SparseKeySpace(pub KeySpace);
      23              : 
      24              : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
      25              : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
      26              : /// shard.
      27              : ///
      28              : /// When we iterate over keys within this object, we will skip any keys that don't belong
      29              : /// to this shard.
      30              : ///
      31              : /// The start + end keys may not belong to the shard: these specify where layer files should
      32              : /// start  + end, but we will never actually read/write those keys.
      33              : #[derive(Clone, Debug, PartialEq, Eq)]
      34              : pub struct ShardedRange<'a> {
      35              :     pub shard_identity: &'a ShardIdentity,
      36              :     pub range: Range<Key>,
      37              : }
      38              : 
      39              : // Calculate the size of a range within the blocks of the same relation, or spanning only the
      40              : // top page in the previous relation's space.
      41     33041996 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
      42     33041996 :     debug_assert!(is_contiguous_range(range));
      43     33041996 :     if range.start.field6 == 0xffffffff {
      44            8 :         range.end.field6 + 1
      45              :     } else {
      46     33041988 :         range.end.field6 - range.start.field6
      47              :     }
      48     33041996 : }
      49              : 
      50              : /// Return true if this key range includes only keys in the same relation's data blocks, or
      51              : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
      52              : ///
      53              : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
      54              : /// be on our shard.  Later in ShardedRange we do the extra work to figure out how much
      55              : /// of a given contiguous range is present on one shard.
      56              : ///
      57              : /// This matters, because:
      58              : /// - Within such ranges, keys are used contiguously.  Outside such ranges it is sparse.
      59              : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
      60     66083696 : fn is_contiguous_range(range: &Range<Key>) -> bool {
      61     66083696 :     range.start.field1 == range.end.field1
      62     66083696 :         && range.start.field2 == range.end.field2
      63     66083688 :         && range.start.field3 == range.end.field3
      64     66083688 :         && range.start.field4 == range.end.field4
      65     66083686 :         && (range.start.field5 == range.end.field5
      66           24 :             || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
      67     66083696 : }
      68              : 
      69              : impl<'a> ShardedRange<'a> {
      70         3262 :     pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
      71         3262 :         Self {
      72         3262 :             shard_identity,
      73         3262 :             range,
      74         3262 :         }
      75         3262 :     }
      76              : 
      77              :     /// Break up this range into chunks, each of which has at least one local key in it if the
      78              :     /// total range has at least one local key.
      79         3248 :     pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
      80         3248 :         // Optimization for single-key case (e.g. logical size keys)
      81         3248 :         if self.range.end == self.range.start.add(1) {
      82         1022 :             return vec![(
      83         1022 :                 if self.shard_identity.is_key_disposable(&self.range.start) {
      84            0 :                     0
      85              :                 } else {
      86         1022 :                     1
      87              :                 },
      88         1022 :                 self.range,
      89              :             )];
      90         2226 :         }
      91         2226 : 
      92         2226 :         if !is_contiguous_range(&self.range) {
      93              :             // Ranges that span relations are not fragmented.  We only get these ranges as a result
      94              :             // of operations that act on existing layers, so we trust that the existing range is
      95              :             // reasonably small.
      96            4 :             return vec![(u32::MAX, self.range)];
      97         2222 :         }
      98         2222 : 
      99         2222 :         let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
     100         2222 : 
     101         2222 :         let mut cursor = self.range.start;
     102         4738 :         while cursor < self.range.end {
     103         2516 :             let advance_by = self.distance_to_next_boundary(cursor);
     104         2516 :             let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
     105              : 
     106              :             // If the previous fragment is undersized, then we seek to consume enough
     107              :             // blocks to complete it.
     108         2516 :             let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
     109          294 :                 Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
     110          272 :                 Some(frag) => {
     111          272 :                     // Prev block is complete, want the full number.
     112          272 :                     (
     113          272 :                         target_nblocks,
     114          272 :                         if is_fragment_disposable {
     115              :                             // If this current range will be empty (not shard-local data), we will merge into previous
     116            0 :                             Some(frag)
     117              :                         } else {
     118          272 :                             None
     119              :                         },
     120              :                     )
     121              :                 }
     122              :                 None => {
     123              :                     // First iteration, want the full number
     124         2222 :                     (target_nblocks, None)
     125              :                 }
     126              :             };
     127              : 
     128         2516 :             let advance_by = if is_fragment_disposable {
     129          936 :                 advance_by
     130              :             } else {
     131         1580 :                 std::cmp::min(advance_by, want_blocks)
     132              :             };
     133              : 
     134         2516 :             let next_cursor = cursor.add(advance_by);
     135              : 
     136         2516 :             let this_frag = (
     137         2516 :                 if is_fragment_disposable {
     138          936 :                     0
     139              :                 } else {
     140         1580 :                     advance_by
     141              :                 },
     142         2516 :                 cursor..next_cursor,
     143         2516 :             );
     144         2516 :             cursor = next_cursor;
     145              : 
     146         2516 :             if let Some(last_fragment) = merge_last_fragment {
     147           22 :                 // Previous fragment was short or this one is empty, merge into it
     148           22 :                 last_fragment.0 += this_frag.0;
     149           22 :                 last_fragment.1.end = this_frag.1.end;
     150         2494 :             } else {
     151         2494 :                 fragments.push(this_frag);
     152         2494 :             }
     153              :         }
     154              : 
     155         2222 :         fragments
     156         3248 :     }
     157              : 
     158              :     /// Estimate the physical pages that are within this range, on this shard.  This returns
     159              :     /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
     160         2038 :     pub fn page_count(&self) -> u32 {
     161         2038 :         // Special cases for single keys like logical sizes
     162         2038 :         if self.range.end == self.range.start.add(1) {
     163           12 :             return if self.shard_identity.is_key_disposable(&self.range.start) {
     164            6 :                 0
     165              :             } else {
     166            6 :                 1
     167              :             };
     168         2026 :         }
     169         2026 : 
     170         2026 :         // We can only do an authentic calculation of contiguous key ranges
     171         2026 :         if !is_contiguous_range(&self.range) {
     172            8 :             return u32::MAX;
     173         2018 :         }
     174         2018 : 
     175         2018 :         // Special case for single sharded tenants: our logical and physical sizes are the same
     176         2018 :         if self.shard_identity.count < ShardCount::new(2) {
     177         1048 :             return contiguous_range_len(&self.range);
     178          970 :         }
     179          970 : 
     180          970 :         // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
     181          970 :         // to Self, and add the stripe's block count to our total if so.
     182          970 :         let mut result: u64 = 0;
     183          970 :         let mut cursor = self.range.start;
     184         1962 :         while cursor < self.range.end {
     185              :             // Count up to the next stripe_size boundary or end of range
     186          992 :             let advance_by = self.distance_to_next_boundary(cursor);
     187          992 : 
     188          992 :             // If this blocks in this stripe belong to us, add them to our count
     189          992 :             if !self.shard_identity.is_key_disposable(&cursor) {
     190           56 :                 result += advance_by as u64;
     191          936 :             }
     192              : 
     193          992 :             cursor = cursor.add(advance_by);
     194              :         }
     195              : 
     196          970 :         if result > u32::MAX as u64 {
     197            0 :             u32::MAX
     198              :         } else {
     199          970 :             result as u32
     200              :         }
     201         2038 :     }
     202              : 
     203              :     /// Advance the cursor to the next potential fragment boundary: this is either
     204              :     /// a stripe boundary, or the end of the range.
     205         3508 :     fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
     206         3508 :         let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
     207         3508 : 
     208         3508 :         if self.shard_identity.count < ShardCount::new(2) {
     209              :             // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
     210         1510 :             return distance_to_range_end;
     211         1998 :         }
     212         1998 : 
     213         1998 :         if cursor.field6 == 0xffffffff {
     214              :             // We are wrapping from one relation's logical size to the next relation's first data block
     215            8 :             return 1;
     216         1990 :         }
     217         1990 : 
     218         1990 :         let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
     219         1990 :         let stripe_remainder = self.shard_identity.stripe_size.0
     220         1990 :             - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
     221         1990 : 
     222         1990 :         if cfg!(debug_assertions) {
     223              :             // We should never overflow field5 and field6 -- our callers check this earlier
     224              :             // and would have returned their u32::MAX cases if the input range violated this.
     225         1990 :             let next_cursor = cursor.add(stripe_remainder);
     226         1990 :             debug_assert!(
     227         1990 :                 next_cursor.field1 == cursor.field1
     228         1990 :                     && next_cursor.field2 == cursor.field2
     229         1990 :                     && next_cursor.field3 == cursor.field3
     230         1990 :                     && next_cursor.field4 == cursor.field4
     231         1990 :                     && next_cursor.field5 == cursor.field5
     232              :             )
     233            0 :         }
     234              : 
     235         1990 :         std::cmp::min(stripe_remainder, distance_to_range_end)
     236         3508 :     }
     237              : 
     238              :     /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
     239              :     /// this function simply calculates the number of pages in the space, without accounting for those
     240              :     /// pages that would not actually be stored on this node.
     241              :     ///
     242              :     /// Don't use this function in code that works with physical entities like layer files.
     243     33037440 :     pub fn raw_size(range: &Range<Key>) -> u32 {
     244     33037440 :         if is_contiguous_range(range) {
     245     33037440 :             contiguous_range_len(range)
     246              :         } else {
     247            0 :             u32::MAX
     248              :         }
     249     33037440 :     }
     250              : }
     251              : 
     252              : impl KeySpace {
     253              :     /// Create a key space with a single range.
     254          206 :     pub fn single(key_range: Range<Key>) -> Self {
     255          206 :         Self {
     256          206 :             ranges: vec![key_range],
     257          206 :         }
     258          206 :     }
     259              : 
     260              :     /// Partition a key space into roughly chunks of roughly 'target_size' bytes
     261              :     /// in each partition.
     262              :     ///
     263          204 :     pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
     264          204 :         // Assume that each value is 8k in size.
     265          204 :         let target_nblocks = (target_size / BLCKSZ as u64) as u32;
     266          204 : 
     267          204 :         let mut parts = Vec::new();
     268          204 :         let mut current_part = Vec::new();
     269          204 :         let mut current_part_size: usize = 0;
     270         1428 :         for range in &self.ranges {
     271              :             // While doing partitioning, wrap the range in ShardedRange so that our size calculations
     272              :             // will respect shard striping rather than assuming all keys within a range are present.
     273         1224 :             let range = ShardedRange::new(range.clone(), shard_identity);
     274              : 
     275              :             // Chunk up the range into parts that each contain up to target_size local blocks
     276         1230 :             for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
     277              :                 // If appending the next contiguous range in the keyspace to the current
     278              :                 // partition would cause it to be too large, and our current partition
     279              :                 // covers at least one block that is physically present in this shard,
     280              :                 // then start a new partition
     281         1230 :                 if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
     282           26 :                     && current_part_size > 0
     283           26 :                 {
     284           26 :                     parts.push(KeySpace {
     285           26 :                         ranges: current_part,
     286           26 :                     });
     287           26 :                     current_part = Vec::new();
     288           26 :                     current_part_size = 0;
     289         1204 :                 }
     290         1230 :                 current_part.push(frag_range.start..frag_range.end);
     291         1230 :                 current_part_size += frag_on_shard_size as usize;
     292              :             }
     293              :         }
     294              : 
     295              :         // add last partition that wasn't full yet.
     296          204 :         if !current_part.is_empty() {
     297          204 :             parts.push(KeySpace {
     298          204 :                 ranges: current_part,
     299          204 :             });
     300          204 :         }
     301              : 
     302          204 :         KeyPartitioning { parts }
     303          204 :     }
     304              : 
     305          248 :     pub fn is_empty(&self) -> bool {
     306          248 :         self.total_raw_size() == 0
     307          248 :     }
     308              : 
     309              :     /// Merge another keyspace into the current one.
     310              :     /// Note: the keyspaces must not ovelap (enforced via assertions)
     311        63622 :     pub fn merge(&mut self, other: &KeySpace) {
     312        63622 :         let all_ranges = self
     313        63622 :             .ranges
     314        63622 :             .iter()
     315     28685887 :             .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
     316        63622 : 
     317        63622 :         let mut accum = KeySpaceAccum::new();
     318        63622 :         let mut prev: Option<&Range<Key>> = None;
     319     28841228 :         for range in all_ranges {
     320     28777606 :             if let Some(prev) = prev {
     321     28714096 :                 let overlap =
     322     28714096 :                     std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
     323     28714096 :                 assert!(
     324     28714096 :                     !overlap,
     325            0 :                     "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
     326              :                     prev, range
     327              :                 );
     328        63510 :             }
     329              : 
     330     28777606 :             accum.add_range(range.clone());
     331     28777606 :             prev = Some(range);
     332              :         }
     333              : 
     334        63622 :         self.ranges = accum.to_keyspace().ranges;
     335        63622 :     }
     336              : 
     337              :     /// Remove all keys in `other` from `self`.
     338              :     /// This can involve splitting or removing of existing ranges.
     339              :     /// Returns the removed keyspace
     340          448 :     pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
     341          448 :         let (self_start, self_end) = match (self.start(), self.end()) {
     342          410 :             (Some(start), Some(end)) => (start, end),
     343              :             _ => {
     344              :                 // self is empty
     345           38 :                 return KeySpace::default();
     346              :             }
     347              :         };
     348              : 
     349              :         // Key spaces are sorted by definition, so skip ahead to the first
     350              :         // potentially intersecting range. Similarly, ignore ranges that start
     351              :         // after the current keyspace ends.
     352          410 :         let other_ranges = other
     353          410 :             .ranges
     354          410 :             .iter()
     355          410 :             .skip_while(|range| self_start >= range.end)
     356        40166 :             .take_while(|range| self_end > range.start);
     357          410 : 
     358          410 :         let mut removed_accum = KeySpaceRandomAccum::new();
     359        40536 :         for range in other_ranges {
     360       100232 :             while let Some(overlap_at) = self.overlaps_at(range) {
     361        60106 :                 let overlapped = self.ranges[overlap_at].clone();
     362        60106 : 
     363        60106 :                 if overlapped.start < range.start && overlapped.end <= range.end {
     364           10 :                     // Higher part of the range is completely overlapped.
     365           10 :                     removed_accum.add_range(range.start..self.ranges[overlap_at].end);
     366           10 :                     self.ranges[overlap_at].end = range.start;
     367        60096 :                 }
     368        60106 :                 if overlapped.start >= range.start && overlapped.end > range.end {
     369           46 :                     // Lower part of the range is completely overlapped.
     370           46 :                     removed_accum.add_range(self.ranges[overlap_at].start..range.end);
     371           46 :                     self.ranges[overlap_at].start = range.end;
     372        60060 :                 }
     373        60106 :                 if overlapped.start < range.start && overlapped.end > range.end {
     374        39964 :                     // Middle part of the range is overlapped.
     375        39964 :                     removed_accum.add_range(range.clone());
     376        39964 :                     self.ranges[overlap_at].end = range.start;
     377        39964 :                     self.ranges
     378        39964 :                         .insert(overlap_at + 1, range.end..overlapped.end);
     379        39964 :                 }
     380        60106 :                 if overlapped.start >= range.start && overlapped.end <= range.end {
     381        20086 :                     // Whole range is overlapped
     382        20086 :                     removed_accum.add_range(self.ranges[overlap_at].clone());
     383        20086 :                     self.ranges.remove(overlap_at);
     384        40020 :                 }
     385              :             }
     386              :         }
     387              : 
     388          410 :         removed_accum.to_keyspace()
     389          448 :     }
     390              : 
     391          456 :     pub fn start(&self) -> Option<Key> {
     392          456 :         self.ranges.first().map(|range| range.start)
     393          456 :     }
     394              : 
     395          448 :     pub fn end(&self) -> Option<Key> {
     396          448 :         self.ranges.last().map(|range| range.end)
     397          448 :     }
     398              : 
     399              :     /// The size of the keyspace in pages, before accounting for sharding
     400         1090 :     pub fn total_raw_size(&self) -> usize {
     401         1090 :         self.ranges
     402         1090 :             .iter()
     403        84286 :             .map(|range| ShardedRange::raw_size(range) as usize)
     404         1090 :             .sum()
     405         1090 :     }
     406              : 
     407       185807 :     fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
     408      1436148 :         match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
     409          108 :             Ok(0) => None,
     410          452 :             Err(0) => None,
     411       103352 :             Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     412        81895 :             Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
     413       103967 :             _ => None,
     414              :         }
     415       185807 :     }
     416              : 
     417              :     ///
     418              :     /// Check if key space contains overlapping range
     419              :     ///
     420        85575 :     pub fn overlaps(&self, range: &Range<Key>) -> bool {
     421        85575 :         self.overlaps_at(range).is_some()
     422        85575 :     }
     423              : 
     424              :     /// Check if the keyspace contains a key
     425        83905 :     pub fn contains(&self, key: &Key) -> bool {
     426        83905 :         self.overlaps(&(*key..key.next()))
     427        83905 :     }
     428              : }
     429              : 
     430              : ///
     431              : /// Represents a partitioning of the key space.
     432              : ///
     433              : /// The only kind of partitioning we do is to partition the key space into
     434              : /// partitions that are roughly equal in physical size (see KeySpace::partition).
     435              : /// But this data structure could represent any partitioning.
     436              : ///
     437              : #[derive(Clone, Debug, Default)]
     438              : pub struct KeyPartitioning {
     439              :     pub parts: Vec<KeySpace>,
     440              : }
     441              : 
     442              : /// Represents a partitioning of the sparse key space.
     443              : #[derive(Clone, Debug, Default)]
     444              : pub struct SparseKeyPartitioning {
     445              :     pub parts: Vec<SparseKeySpace>,
     446              : }
     447              : 
     448              : impl KeyPartitioning {
     449          668 :     pub fn new() -> Self {
     450          668 :         KeyPartitioning { parts: Vec::new() }
     451          668 :     }
     452              : 
     453              :     /// Convert a key partitioning to a sparse partition.
     454          334 :     pub fn into_sparse(self) -> SparseKeyPartitioning {
     455          334 :         SparseKeyPartitioning {
     456          334 :             parts: self.parts.into_iter().map(SparseKeySpace).collect(),
     457          334 :         }
     458          334 :     }
     459              : }
     460              : 
     461              : impl SparseKeyPartitioning {
     462              :     /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
     463              :     /// cause long/dead loops.
     464          330 :     pub fn into_dense(self) -> KeyPartitioning {
     465          330 :         KeyPartitioning {
     466          330 :             parts: self.parts.into_iter().map(|x| x.0).collect(),
     467          330 :         }
     468          330 :     }
     469              : }
     470              : 
     471              : ///
     472              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     473              : /// object. This takes care of merging adjacent keys and key ranges into
     474              : /// contiguous ranges.
     475              : ///
     476              : #[derive(Clone, Debug, Default)]
     477              : pub struct KeySpaceAccum {
     478              :     accum: Option<Range<Key>>,
     479              : 
     480              :     ranges: Vec<Range<Key>>,
     481              :     size: u64,
     482              : }
     483              : 
     484              : impl KeySpaceAccum {
     485       154600 :     pub fn new() -> Self {
     486       154600 :         Self {
     487       154600 :             accum: None,
     488       154600 :             ranges: Vec::new(),
     489       154600 :             size: 0,
     490       154600 :         }
     491       154600 :     }
     492              : 
     493              :     #[inline(always)]
     494      4080326 :     pub fn add_key(&mut self, key: Key) {
     495      4080326 :         self.add_range(singleton_range(key))
     496      4080326 :     }
     497              : 
     498              :     #[inline(always)]
     499     32953148 :     pub fn add_range(&mut self, range: Range<Key>) {
     500     32953148 :         self.size += ShardedRange::raw_size(&range) as u64;
     501     32953148 : 
     502     32953148 :         match self.accum.as_mut() {
     503     32784976 :             Some(accum) => {
     504     32784976 :                 if range.start == accum.end {
     505      4066834 :                     accum.end = range.end;
     506      4066834 :                 } else {
     507              :                     // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
     508              :                     // a new range here if the skipped region was all keys that don't belong on this shard.
     509              :                     // (https://github.com/neondatabase/neon/issues/6247)
     510     28718142 :                     assert!(range.start > accum.end);
     511     28718142 :                     self.ranges.push(accum.clone());
     512     28718142 :                     *accum = range;
     513              :                 }
     514              :             }
     515       168172 :             None => self.accum = Some(range),
     516              :         }
     517     32953148 :     }
     518              : 
     519       151514 :     pub fn to_keyspace(mut self) -> KeySpace {
     520       151514 :         if let Some(accum) = self.accum.take() {
     521       148120 :             self.ranges.push(accum);
     522       148120 :         }
     523       151514 :         KeySpace {
     524       151514 :             ranges: self.ranges,
     525       151514 :         }
     526       151514 :     }
     527              : 
     528          620 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     529          620 :         std::mem::take(self).to_keyspace()
     530          620 :     }
     531              : 
     532              :     // The total number of keys in this object, ignoring any sharding effects that might cause some of
     533              :     // the keys to be omitted in storage on this shard.
     534         1438 :     pub fn raw_size(&self) -> u64 {
     535         1438 :         self.size
     536         1438 :     }
     537              : }
     538              : 
     539              : ///
     540              : /// A helper object, to collect a set of keys and key ranges into a KeySpace
     541              : /// object. Key ranges may be inserted in any order and can overlap.
     542              : ///
     543              : #[derive(Clone, Debug, Default)]
     544              : pub struct KeySpaceRandomAccum {
     545              :     ranges: Vec<Range<Key>>,
     546              : }
     547              : 
     548              : impl KeySpaceRandomAccum {
     549          924 :     pub fn new() -> Self {
     550          924 :         Self { ranges: Vec::new() }
     551          924 :     }
     552              : 
     553        20388 :     pub fn add_key(&mut self, key: Key) {
     554        20388 :         self.add_range(singleton_range(key))
     555        20388 :     }
     556              : 
     557        80536 :     pub fn add_range(&mut self, range: Range<Key>) {
     558        80536 :         self.ranges.push(range);
     559        80536 :     }
     560              : 
     561          676 :     pub fn to_keyspace(mut self) -> KeySpace {
     562          676 :         let mut ranges = Vec::new();
     563          676 :         if !self.ranges.is_empty() {
     564       161918 :             self.ranges.sort_by_key(|r| r.start);
     565          360 :             let mut start = self.ranges.first().unwrap().start;
     566          360 :             let mut end = self.ranges.first().unwrap().end;
     567        80896 :             for r in self.ranges {
     568        80536 :                 assert!(r.start >= start);
     569        80536 :                 if r.start > end {
     570        79812 :                     ranges.push(start..end);
     571        79812 :                     start = r.start;
     572        79812 :                     end = r.end;
     573        79812 :                 } else if r.end > end {
     574          356 :                     end = r.end;
     575          368 :                 }
     576              :             }
     577          360 :             ranges.push(start..end);
     578          316 :         }
     579          676 :         KeySpace { ranges }
     580          676 :     }
     581              : 
     582          248 :     pub fn consume_keyspace(&mut self) -> KeySpace {
     583          248 :         let mut prev_accum = KeySpaceRandomAccum::new();
     584          248 :         std::mem::swap(self, &mut prev_accum);
     585          248 : 
     586          248 :         prev_accum.to_keyspace()
     587          248 :     }
     588              : }
     589              : 
     590      4100714 : pub fn singleton_range(key: Key) -> Range<Key> {
     591      4100714 :     key..key.next()
     592      4100714 : }
     593              : 
     594              : #[cfg(test)]
     595              : mod tests {
     596              :     use rand::{RngCore, SeedableRng};
     597              : 
     598              :     use crate::{
     599              :         models::ShardParameters,
     600              :         shard::{ShardCount, ShardNumber},
     601              :     };
     602              : 
     603              :     use super::*;
     604              :     use std::fmt::Write;
     605              : 
     606              :     // Helper function to create a key range.
     607              :     //
     608              :     // Make the tests below less verbose.
     609           92 :     fn kr(irange: Range<i128>) -> Range<Key> {
     610           92 :         Key::from_i128(irange.start)..Key::from_i128(irange.end)
     611           92 :     }
     612              : 
     613              :     #[allow(dead_code)]
     614            0 :     fn dump_keyspace(ks: &KeySpace) {
     615            0 :         for r in ks.ranges.iter() {
     616            0 :             println!("  {}..{}", r.start.to_i128(), r.end.to_i128());
     617            0 :         }
     618            0 :     }
     619              : 
     620           22 :     fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
     621           22 :         if actual.ranges != expected {
     622            0 :             let mut msg = String::new();
     623            0 : 
     624            0 :             writeln!(msg, "expected:").unwrap();
     625            0 :             for r in &expected {
     626            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     627            0 :             }
     628            0 :             writeln!(msg, "got:").unwrap();
     629            0 :             for r in &actual.ranges {
     630            0 :                 writeln!(msg, "  {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
     631            0 :             }
     632            0 :             panic!("{}", msg);
     633           22 :         }
     634           22 :     }
     635              : 
     636              :     #[test]
     637            2 :     fn keyspace_consume() {
     638            2 :         let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
     639            2 : 
     640            2 :         let mut accum = KeySpaceAccum::new();
     641            8 :         for range in &ranges {
     642            6 :             accum.add_range(range.clone());
     643            6 :         }
     644              : 
     645            2 :         let expected_size: u64 = ranges
     646            2 :             .iter()
     647            6 :             .map(|r| ShardedRange::raw_size(r) as u64)
     648            2 :             .sum();
     649            2 :         assert_eq!(accum.raw_size(), expected_size);
     650              : 
     651            2 :         assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
     652            2 :         assert_eq!(accum.raw_size(), 0);
     653              : 
     654            2 :         assert_ks_eq(&accum.consume_keyspace(), vec![]);
     655            2 :         assert_eq!(accum.raw_size(), 0);
     656              : 
     657            8 :         for range in &ranges {
     658            6 :             accum.add_range(range.clone());
     659            6 :         }
     660            2 :         assert_ks_eq(&accum.to_keyspace(), ranges);
     661            2 :     }
     662              : 
     663              :     #[test]
     664            2 :     fn keyspace_add_range() {
     665            2 :         // two separate ranges
     666            2 :         //
     667            2 :         // #####
     668            2 :         //         #####
     669            2 :         let mut ks = KeySpaceRandomAccum::default();
     670            2 :         ks.add_range(kr(0..10));
     671            2 :         ks.add_range(kr(20..30));
     672            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
     673            2 : 
     674            2 :         // two separate ranges, added in reverse order
     675            2 :         //
     676            2 :         //         #####
     677            2 :         // #####
     678            2 :         let mut ks = KeySpaceRandomAccum::default();
     679            2 :         ks.add_range(kr(20..30));
     680            2 :         ks.add_range(kr(0..10));
     681            2 : 
     682            2 :         // add range that is adjacent to the end of an existing range
     683            2 :         //
     684            2 :         // #####
     685            2 :         //      #####
     686            2 :         ks.add_range(kr(0..10));
     687            2 :         ks.add_range(kr(10..30));
     688            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     689            2 : 
     690            2 :         // add range that is adjacent to the start of an existing range
     691            2 :         //
     692            2 :         //      #####
     693            2 :         // #####
     694            2 :         let mut ks = KeySpaceRandomAccum::default();
     695            2 :         ks.add_range(kr(10..30));
     696            2 :         ks.add_range(kr(0..10));
     697            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     698            2 : 
     699            2 :         // add range that overlaps with the end of an existing range
     700            2 :         //
     701            2 :         // #####
     702            2 :         //    #####
     703            2 :         let mut ks = KeySpaceRandomAccum::default();
     704            2 :         ks.add_range(kr(0..10));
     705            2 :         ks.add_range(kr(5..30));
     706            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     707            2 : 
     708            2 :         // add range that overlaps with the start of an existing range
     709            2 :         //
     710            2 :         //    #####
     711            2 :         // #####
     712            2 :         let mut ks = KeySpaceRandomAccum::default();
     713            2 :         ks.add_range(kr(5..30));
     714            2 :         ks.add_range(kr(0..10));
     715            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     716            2 : 
     717            2 :         // add range that is fully covered by an existing range
     718            2 :         //
     719            2 :         // #########
     720            2 :         //   #####
     721            2 :         let mut ks = KeySpaceRandomAccum::default();
     722            2 :         ks.add_range(kr(0..30));
     723            2 :         ks.add_range(kr(10..20));
     724            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     725            2 : 
     726            2 :         // add range that extends an existing range from both ends
     727            2 :         //
     728            2 :         //   #####
     729            2 :         // #########
     730            2 :         let mut ks = KeySpaceRandomAccum::default();
     731            2 :         ks.add_range(kr(10..20));
     732            2 :         ks.add_range(kr(0..30));
     733            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     734            2 : 
     735            2 :         // add a range that overlaps with two existing ranges, joining them
     736            2 :         //
     737            2 :         // #####   #####
     738            2 :         //    #######
     739            2 :         let mut ks = KeySpaceRandomAccum::default();
     740            2 :         ks.add_range(kr(0..10));
     741            2 :         ks.add_range(kr(20..30));
     742            2 :         ks.add_range(kr(5..25));
     743            2 :         assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
     744            2 :     }
     745              : 
     746              :     #[test]
     747            2 :     fn keyspace_overlaps() {
     748            2 :         let mut ks = KeySpaceRandomAccum::default();
     749            2 :         ks.add_range(kr(10..20));
     750            2 :         ks.add_range(kr(30..40));
     751            2 :         let ks = ks.to_keyspace();
     752            2 : 
     753            2 :         //        #####      #####
     754            2 :         // xxxx
     755            2 :         assert!(!ks.overlaps(&kr(0..5)));
     756              : 
     757              :         //        #####      #####
     758              :         //   xxxx
     759            2 :         assert!(!ks.overlaps(&kr(5..9)));
     760              : 
     761              :         //        #####      #####
     762              :         //    xxxx
     763            2 :         assert!(!ks.overlaps(&kr(5..10)));
     764              : 
     765              :         //        #####      #####
     766              :         //     xxxx
     767            2 :         assert!(ks.overlaps(&kr(5..11)));
     768              : 
     769              :         //        #####      #####
     770              :         //        xxxx
     771            2 :         assert!(ks.overlaps(&kr(10..15)));
     772              : 
     773              :         //        #####      #####
     774              :         //         xxxx
     775            2 :         assert!(ks.overlaps(&kr(15..20)));
     776              : 
     777              :         //        #####      #####
     778              :         //           xxxx
     779            2 :         assert!(ks.overlaps(&kr(15..25)));
     780              : 
     781              :         //        #####      #####
     782              :         //              xxxx
     783            2 :         assert!(!ks.overlaps(&kr(22..28)));
     784              : 
     785              :         //        #####      #####
     786              :         //               xxxx
     787            2 :         assert!(!ks.overlaps(&kr(25..30)));
     788              : 
     789              :         //        #####      #####
     790              :         //                      xxxx
     791            2 :         assert!(ks.overlaps(&kr(35..35)));
     792              : 
     793              :         //        #####      #####
     794              :         //                        xxxx
     795            2 :         assert!(!ks.overlaps(&kr(40..45)));
     796              : 
     797              :         //        #####      #####
     798              :         //                        xxxx
     799            2 :         assert!(!ks.overlaps(&kr(45..50)));
     800              : 
     801              :         //        #####      #####
     802              :         //        xxxxxxxxxxx
     803            2 :         assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
     804            2 :     }
     805              : 
     806              :     #[test]
     807            2 :     fn test_remove_full_overlapps() {
     808            2 :         let mut key_space1 = KeySpace {
     809            2 :             ranges: vec![
     810            2 :                 Key::from_i128(1)..Key::from_i128(4),
     811            2 :                 Key::from_i128(5)..Key::from_i128(8),
     812            2 :                 Key::from_i128(10)..Key::from_i128(12),
     813            2 :             ],
     814            2 :         };
     815            2 :         let key_space2 = KeySpace {
     816            2 :             ranges: vec![
     817            2 :                 Key::from_i128(2)..Key::from_i128(3),
     818            2 :                 Key::from_i128(6)..Key::from_i128(7),
     819            2 :                 Key::from_i128(11)..Key::from_i128(13),
     820            2 :             ],
     821            2 :         };
     822            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     823            2 :         let removed_expected = KeySpace {
     824            2 :             ranges: vec![
     825            2 :                 Key::from_i128(2)..Key::from_i128(3),
     826            2 :                 Key::from_i128(6)..Key::from_i128(7),
     827            2 :                 Key::from_i128(11)..Key::from_i128(12),
     828            2 :             ],
     829            2 :         };
     830            2 :         assert_eq!(removed, removed_expected);
     831              : 
     832            2 :         assert_eq!(
     833            2 :             key_space1.ranges,
     834            2 :             vec![
     835            2 :                 Key::from_i128(1)..Key::from_i128(2),
     836            2 :                 Key::from_i128(3)..Key::from_i128(4),
     837            2 :                 Key::from_i128(5)..Key::from_i128(6),
     838            2 :                 Key::from_i128(7)..Key::from_i128(8),
     839            2 :                 Key::from_i128(10)..Key::from_i128(11)
     840            2 :             ]
     841            2 :         );
     842            2 :     }
     843              : 
     844              :     #[test]
     845            2 :     fn test_remove_partial_overlaps() {
     846            2 :         // Test partial ovelaps
     847            2 :         let mut key_space1 = KeySpace {
     848            2 :             ranges: vec![
     849            2 :                 Key::from_i128(1)..Key::from_i128(5),
     850            2 :                 Key::from_i128(7)..Key::from_i128(10),
     851            2 :                 Key::from_i128(12)..Key::from_i128(15),
     852            2 :             ],
     853            2 :         };
     854            2 :         let key_space2 = KeySpace {
     855            2 :             ranges: vec![
     856            2 :                 Key::from_i128(3)..Key::from_i128(6),
     857            2 :                 Key::from_i128(8)..Key::from_i128(11),
     858            2 :                 Key::from_i128(14)..Key::from_i128(17),
     859            2 :             ],
     860            2 :         };
     861            2 : 
     862            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     863            2 :         let removed_expected = KeySpace {
     864            2 :             ranges: vec![
     865            2 :                 Key::from_i128(3)..Key::from_i128(5),
     866            2 :                 Key::from_i128(8)..Key::from_i128(10),
     867            2 :                 Key::from_i128(14)..Key::from_i128(15),
     868            2 :             ],
     869            2 :         };
     870            2 :         assert_eq!(removed, removed_expected);
     871              : 
     872            2 :         assert_eq!(
     873            2 :             key_space1.ranges,
     874            2 :             vec![
     875            2 :                 Key::from_i128(1)..Key::from_i128(3),
     876            2 :                 Key::from_i128(7)..Key::from_i128(8),
     877            2 :                 Key::from_i128(12)..Key::from_i128(14),
     878            2 :             ]
     879            2 :         );
     880            2 :     }
     881              : 
     882              :     #[test]
     883            2 :     fn test_remove_no_overlaps() {
     884            2 :         let mut key_space1 = KeySpace {
     885            2 :             ranges: vec![
     886            2 :                 Key::from_i128(1)..Key::from_i128(5),
     887            2 :                 Key::from_i128(7)..Key::from_i128(10),
     888            2 :                 Key::from_i128(12)..Key::from_i128(15),
     889            2 :             ],
     890            2 :         };
     891            2 :         let key_space2 = KeySpace {
     892            2 :             ranges: vec![
     893            2 :                 Key::from_i128(6)..Key::from_i128(7),
     894            2 :                 Key::from_i128(11)..Key::from_i128(12),
     895            2 :                 Key::from_i128(15)..Key::from_i128(17),
     896            2 :             ],
     897            2 :         };
     898            2 : 
     899            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     900            2 :         let removed_expected = KeySpace::default();
     901            2 :         assert_eq!(removed, removed_expected);
     902              : 
     903            2 :         assert_eq!(
     904            2 :             key_space1.ranges,
     905            2 :             vec![
     906            2 :                 Key::from_i128(1)..Key::from_i128(5),
     907            2 :                 Key::from_i128(7)..Key::from_i128(10),
     908            2 :                 Key::from_i128(12)..Key::from_i128(15),
     909            2 :             ]
     910            2 :         );
     911            2 :     }
     912              : 
     913              :     #[test]
     914            2 :     fn test_remove_one_range_overlaps_multiple() {
     915            2 :         let mut key_space1 = KeySpace {
     916            2 :             ranges: vec![
     917            2 :                 Key::from_i128(1)..Key::from_i128(3),
     918            2 :                 Key::from_i128(3)..Key::from_i128(6),
     919            2 :                 Key::from_i128(6)..Key::from_i128(10),
     920            2 :                 Key::from_i128(12)..Key::from_i128(15),
     921            2 :                 Key::from_i128(17)..Key::from_i128(20),
     922            2 :                 Key::from_i128(20)..Key::from_i128(30),
     923            2 :                 Key::from_i128(30)..Key::from_i128(40),
     924            2 :             ],
     925            2 :         };
     926            2 :         let key_space2 = KeySpace {
     927            2 :             ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
     928            2 :         };
     929            2 : 
     930            2 :         let removed = key_space1.remove_overlapping_with(&key_space2);
     931            2 :         let removed_expected = KeySpace {
     932            2 :             ranges: vec![
     933            2 :                 Key::from_i128(9)..Key::from_i128(10),
     934            2 :                 Key::from_i128(12)..Key::from_i128(15),
     935            2 :                 Key::from_i128(17)..Key::from_i128(19),
     936            2 :             ],
     937            2 :         };
     938            2 :         assert_eq!(removed, removed_expected);
     939              : 
     940            2 :         assert_eq!(
     941            2 :             key_space1.ranges,
     942            2 :             vec![
     943            2 :                 Key::from_i128(1)..Key::from_i128(3),
     944            2 :                 Key::from_i128(3)..Key::from_i128(6),
     945            2 :                 Key::from_i128(6)..Key::from_i128(9),
     946            2 :                 Key::from_i128(19)..Key::from_i128(20),
     947            2 :                 Key::from_i128(20)..Key::from_i128(30),
     948            2 :                 Key::from_i128(30)..Key::from_i128(40),
     949            2 :             ]
     950            2 :         );
     951            2 :     }
     952              :     #[test]
     953            2 :     fn sharded_range_relation_gap() {
     954            2 :         let shard_identity = ShardIdentity::new(
     955            2 :             ShardNumber(0),
     956            2 :             ShardCount::new(4),
     957            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     958            2 :         )
     959            2 :         .unwrap();
     960            2 : 
     961            2 :         let range = ShardedRange::new(
     962            2 :             Range {
     963            2 :                 start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
     964            2 :                 end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
     965            2 :             },
     966            2 :             &shard_identity,
     967            2 :         );
     968            2 : 
     969            2 :         // Key range spans relations, expect MAX
     970            2 :         assert_eq!(range.page_count(), u32::MAX);
     971            2 :     }
     972              : 
     973              :     #[test]
     974            2 :     fn shard_identity_keyspaces_single_key() {
     975            2 :         let shard_identity = ShardIdentity::new(
     976            2 :             ShardNumber(1),
     977            2 :             ShardCount::new(4),
     978            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
     979            2 :         )
     980            2 :         .unwrap();
     981            2 : 
     982            2 :         let range = ShardedRange::new(
     983            2 :             Range {
     984            2 :                 start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
     985            2 :                 end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
     986            2 :             },
     987            2 :             &shard_identity,
     988            2 :         );
     989            2 :         // Single-key range on logical size key
     990            2 :         assert_eq!(range.page_count(), 1);
     991            2 :     }
     992              : 
     993              :     /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
     994              :     #[test]
     995            2 :     fn contiguous_range_check() {
     996            2 :         assert!(!is_contiguous_range(
     997            2 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
     998            2 :                 ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
     999            2 :         ),);
    1000              : 
    1001              :         // The ranges goes all the way up to the 0xffffffff, including it: this is
    1002              :         // not considered a rel block range because 0xffffffff stores logical sizes,
    1003              :         // not blocks.
    1004            2 :         assert!(!is_contiguous_range(
    1005            2 :             &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
    1006            2 :                 ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
    1007            2 :         ),);
    1008              : 
    1009              :         // Keys within the normal data region of a relation
    1010            2 :         assert!(is_contiguous_range(
    1011            2 :             &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
    1012            2 :                 ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
    1013            2 :         ),);
    1014              : 
    1015              :         // The logical size key of one forkno, then some blocks in the next
    1016            2 :         assert!(is_contiguous_range(
    1017            2 :             &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
    1018            2 :                 ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
    1019            2 :         ),);
    1020            2 :     }
    1021              : 
    1022              :     #[test]
    1023            2 :     fn shard_identity_keyspaces_forkno_gap() {
    1024            2 :         let shard_identity = ShardIdentity::new(
    1025            2 :             ShardNumber(1),
    1026            2 :             ShardCount::new(4),
    1027            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1028            2 :         )
    1029            2 :         .unwrap();
    1030            2 : 
    1031            2 :         let range = ShardedRange::new(
    1032            2 :             Range {
    1033            2 :                 start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
    1034            2 :                 end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
    1035            2 :             },
    1036            2 :             &shard_identity,
    1037            2 :         );
    1038            2 : 
    1039            2 :         // Range spanning the end of one forkno and the start of the next: we do not attempt to
    1040            2 :         // calculate a valid size, because we have no way to know if they keys between start
    1041            2 :         // and end are actually in use.
    1042            2 :         assert_eq!(range.page_count(), u32::MAX);
    1043            2 :     }
    1044              : 
    1045              :     #[test]
    1046            2 :     fn shard_identity_keyspaces_one_relation() {
    1047           10 :         for shard_number in 0..4 {
    1048            8 :             let shard_identity = ShardIdentity::new(
    1049            8 :                 ShardNumber(shard_number),
    1050            8 :                 ShardCount::new(4),
    1051            8 :                 ShardParameters::DEFAULT_STRIPE_SIZE,
    1052            8 :             )
    1053            8 :             .unwrap();
    1054            8 : 
    1055            8 :             let range = ShardedRange::new(
    1056            8 :                 Range {
    1057            8 :                     start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
    1058            8 :                     end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
    1059            8 :                 },
    1060            8 :                 &shard_identity,
    1061            8 :             );
    1062            8 : 
    1063            8 :             // Very simple case: range covering block zero of one relation, where that block maps to shard zero
    1064            8 :             if shard_number == 0 {
    1065            2 :                 assert_eq!(range.page_count(), 1);
    1066              :             } else {
    1067              :                 // Other shards should perceive the range's size as zero
    1068            6 :                 assert_eq!(range.page_count(), 0);
    1069              :             }
    1070              :         }
    1071            2 :     }
    1072              : 
    1073              :     /// Test helper: construct a ShardedRange and call fragment() on it, returning
    1074              :     /// the total page count in the range and the fragments.
    1075         2024 :     fn do_fragment(
    1076         2024 :         range_start: Key,
    1077         2024 :         range_end: Key,
    1078         2024 :         shard_identity: &ShardIdentity,
    1079         2024 :         target_nblocks: u32,
    1080         2024 :     ) -> (u32, Vec<(u32, Range<Key>)>) {
    1081         2024 :         let range = ShardedRange::new(
    1082         2024 :             Range {
    1083         2024 :                 start: range_start,
    1084         2024 :                 end: range_end,
    1085         2024 :             },
    1086         2024 :             shard_identity,
    1087         2024 :         );
    1088         2024 : 
    1089         2024 :         let page_count = range.page_count();
    1090         2024 :         let fragments = range.fragment(target_nblocks);
    1091         2024 : 
    1092         2024 :         // Invariant: we always get at least one fragment
    1093         2024 :         assert!(!fragments.is_empty());
    1094              : 
    1095              :         // Invariant: the first/last fragment start/end should equal the input start/end
    1096         2024 :         assert_eq!(fragments.first().unwrap().1.start, range_start);
    1097         2024 :         assert_eq!(fragments.last().unwrap().1.end, range_end);
    1098              : 
    1099         2024 :         if page_count > 0 {
    1100              :             // Invariant: every fragment must contain at least one shard-local page, if the
    1101              :             // total range contains at least one shard-local page
    1102         1374 :             let all_nonzero = fragments.iter().all(|f| f.0 > 0);
    1103         1108 :             if !all_nonzero {
    1104            0 :                 eprintln!("Found a zero-length fragment: {:?}", fragments);
    1105         1108 :             }
    1106         1108 :             assert!(all_nonzero);
    1107              :         } else {
    1108              :             // A range with no shard-local pages should always be returned as a single fragment
    1109          916 :             assert_eq!(fragments, vec![(0, range_start..range_end)]);
    1110              :         }
    1111              : 
    1112              :         // Invariant: fragments must be ordered and non-overlapping
    1113         2024 :         let mut last: Option<Range<Key>> = None;
    1114         4314 :         for frag in &fragments {
    1115         2290 :             if let Some(last) = last {
    1116          266 :                 assert!(frag.1.start >= last.end);
    1117          266 :                 assert!(frag.1.start > last.start);
    1118         2024 :             }
    1119         2290 :             last = Some(frag.1.clone())
    1120              :         }
    1121              : 
    1122              :         // Invariant: fragments respect target_nblocks
    1123         4314 :         for frag in &fragments {
    1124         2290 :             assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
    1125              :         }
    1126              : 
    1127         2024 :         (page_count, fragments)
    1128         2024 :     }
    1129              : 
    1130              :     /// Really simple tests for fragment(), on a range that just contains a single stripe
    1131              :     /// for a single tenant.
    1132              :     #[test]
    1133            2 :     fn sharded_range_fragment_simple() {
    1134            2 :         let shard_identity = ShardIdentity::new(
    1135            2 :             ShardNumber(0),
    1136            2 :             ShardCount::new(4),
    1137            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1138            2 :         )
    1139            2 :         .unwrap();
    1140            2 : 
    1141            2 :         // A range which we happen to know covers exactly one stripe which belongs to this shard
    1142            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1143            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
    1144            2 : 
    1145            2 :         // Ask for stripe_size blocks, we get the whole stripe
    1146            2 :         assert_eq!(
    1147            2 :             do_fragment(input_start, input_end, &shard_identity, 32768),
    1148            2 :             (32768, vec![(32768, input_start..input_end)])
    1149            2 :         );
    1150              : 
    1151              :         // Ask for more, we still get the whole stripe
    1152            2 :         assert_eq!(
    1153            2 :             do_fragment(input_start, input_end, &shard_identity, 10000000),
    1154            2 :             (32768, vec![(32768, input_start..input_end)])
    1155            2 :         );
    1156              : 
    1157              :         // Ask for target_nblocks of half the stripe size, we get two halves
    1158            2 :         assert_eq!(
    1159            2 :             do_fragment(input_start, input_end, &shard_identity, 16384),
    1160            2 :             (
    1161            2 :                 32768,
    1162            2 :                 vec![
    1163            2 :                     (16384, input_start..input_start.add(16384)),
    1164            2 :                     (16384, input_start.add(16384)..input_end)
    1165            2 :                 ]
    1166            2 :             )
    1167            2 :         );
    1168            2 :     }
    1169              : 
    1170              :     #[test]
    1171            2 :     fn sharded_range_fragment_multi_stripe() {
    1172            2 :         let shard_identity = ShardIdentity::new(
    1173            2 :             ShardNumber(0),
    1174            2 :             ShardCount::new(4),
    1175            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1176            2 :         )
    1177            2 :         .unwrap();
    1178            2 : 
    1179            2 :         // A range which covers multiple stripes, exactly one of which belongs to the current shard.
    1180            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1181            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1182            2 :         // Ask for all the blocks, get a fragment that covers the whole range but reports
    1183            2 :         // its size to be just the blocks belonging to our shard.
    1184            2 :         assert_eq!(
    1185            2 :             do_fragment(input_start, input_end, &shard_identity, 131072),
    1186            2 :             (32768, vec![(32768, input_start..input_end)])
    1187            2 :         );
    1188              : 
    1189              :         // Ask for a sub-stripe quantity
    1190            2 :         assert_eq!(
    1191            2 :             do_fragment(input_start, input_end, &shard_identity, 16000),
    1192            2 :             (
    1193            2 :                 32768,
    1194            2 :                 vec![
    1195            2 :                     (16000, input_start..input_start.add(16000)),
    1196            2 :                     (16000, input_start.add(16000)..input_start.add(32000)),
    1197            2 :                     (768, input_start.add(32000)..input_end),
    1198            2 :                 ]
    1199            2 :             )
    1200            2 :         );
    1201              : 
    1202              :         // Try on a range that starts slightly after our owned stripe
    1203            2 :         assert_eq!(
    1204            2 :             do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
    1205            2 :             (32767, vec![(32767, input_start.add(1)..input_end)])
    1206            2 :         );
    1207            2 :     }
    1208              : 
    1209              :     /// Test our calculations work correctly when we start a range from the logical size key of
    1210              :     /// a previous relation.
    1211              :     #[test]
    1212            2 :     fn sharded_range_fragment_starting_from_logical_size() {
    1213            2 :         let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
    1214            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
    1215            2 : 
    1216            2 :         // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
    1217            2 :         let shard_identity = ShardIdentity::new(
    1218            2 :             ShardNumber(0),
    1219            2 :             ShardCount::new(4),
    1220            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1221            2 :         )
    1222            2 :         .unwrap();
    1223            2 :         assert_eq!(
    1224            2 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1225            2 :             (0x8001, vec![(0x8001, input_start..input_end)])
    1226            2 :         );
    1227              : 
    1228              :         // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
    1229              :         // store all logical sizes)
    1230            2 :         let shard_identity = ShardIdentity::new(
    1231            2 :             ShardNumber(1),
    1232            2 :             ShardCount::new(4),
    1233            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1234            2 :         )
    1235            2 :         .unwrap();
    1236            2 :         assert_eq!(
    1237            2 :             do_fragment(input_start, input_end, &shard_identity, 0x10000),
    1238            2 :             (0x1, vec![(0x1, input_start..input_end)])
    1239            2 :         );
    1240            2 :     }
    1241              : 
    1242              :     /// Test that ShardedRange behaves properly when used on un-sharded data
    1243              :     #[test]
    1244            2 :     fn sharded_range_fragment_unsharded() {
    1245            2 :         let shard_identity = ShardIdentity::unsharded();
    1246            2 : 
    1247            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1248            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
    1249            2 :         assert_eq!(
    1250            2 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1251            2 :             (
    1252            2 :                 0x10000,
    1253            2 :                 vec![
    1254            2 :                     (0x8000, input_start..input_start.add(0x8000)),
    1255            2 :                     (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
    1256            2 :                 ]
    1257            2 :             )
    1258            2 :         );
    1259            2 :     }
    1260              : 
    1261              :     #[test]
    1262            2 :     fn sharded_range_fragment_cross_relation() {
    1263            2 :         let shard_identity = ShardIdentity::unsharded();
    1264            2 : 
    1265            2 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1266            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1267            2 :         let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
    1268            2 :         assert_eq!(
    1269            2 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1270            2 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1271            2 :         );
    1272              : 
    1273              :         // Same, but using a sharded identity
    1274            2 :         let shard_identity = ShardIdentity::new(
    1275            2 :             ShardNumber(0),
    1276            2 :             ShardCount::new(4),
    1277            2 :             ShardParameters::DEFAULT_STRIPE_SIZE,
    1278            2 :         )
    1279            2 :         .unwrap();
    1280            2 :         assert_eq!(
    1281            2 :             do_fragment(input_start, input_end, &shard_identity, 0x8000),
    1282            2 :             (u32::MAX, vec![(u32::MAX, input_start..input_end),])
    1283            2 :         );
    1284            2 :     }
    1285              : 
    1286              :     #[test]
    1287            2 :     fn sharded_range_fragment_tiny_nblocks() {
    1288            2 :         let shard_identity = ShardIdentity::unsharded();
    1289            2 : 
    1290            2 :         // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1291            2 :         let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
    1292            2 :         let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
    1293            2 :         assert_eq!(
    1294            2 :             do_fragment(input_start, input_end, &shard_identity, 16),
    1295            2 :             (
    1296            2 :                 0x38,
    1297            2 :                 vec![
    1298            2 :                     (16, input_start..input_start.add(16)),
    1299            2 :                     (16, input_start.add(16)..input_start.add(32)),
    1300            2 :                     (16, input_start.add(32)..input_start.add(48)),
    1301            2 :                     (8, input_start.add(48)..input_end),
    1302            2 :                 ]
    1303            2 :             )
    1304            2 :         );
    1305            2 :     }
    1306              : 
    1307              :     #[test]
    1308            2 :     fn sharded_range_fragment_fuzz() {
    1309            2 :         // Use a fixed seed: we don't want to explicitly pick values, but we do want
    1310            2 :         // the test to be reproducible.
    1311            2 :         let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
    1312              : 
    1313         2002 :         for _i in 0..1000 {
    1314         2000 :             let shard_identity = if prng.next_u32() % 2 == 0 {
    1315         1038 :                 ShardIdentity::unsharded()
    1316              :             } else {
    1317          962 :                 let shard_count = prng.next_u32() % 127 + 1;
    1318          962 :                 ShardIdentity::new(
    1319          962 :                     ShardNumber((prng.next_u32() % shard_count) as u8),
    1320          962 :                     ShardCount::new(shard_count as u8),
    1321          962 :                     ShardParameters::DEFAULT_STRIPE_SIZE,
    1322          962 :                 )
    1323          962 :                 .unwrap()
    1324              :             };
    1325              : 
    1326         2000 :             let target_nblocks = prng.next_u32() % 65536 + 1;
    1327         2000 : 
    1328         2000 :             let start_offset = prng.next_u32() % 16384;
    1329         2000 : 
    1330         2000 :             // Try ranges up to 4GiB in size, that are always at least 1
    1331         2000 :             let range_size = prng.next_u32() % 8192 + 1;
    1332         2000 : 
    1333         2000 :             // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
    1334         2000 :             let input_start = Key::from_hex("000000067F00000001000004E10000000000")
    1335         2000 :                 .unwrap()
    1336         2000 :                 .add(start_offset);
    1337         2000 :             let input_end = input_start.add(range_size);
    1338         2000 : 
    1339         2000 :             // This test's main success conditions are the invariants baked into do_fragment
    1340         2000 :             let (_total_size, fragments) =
    1341         2000 :                 do_fragment(input_start, input_end, &shard_identity, target_nblocks);
    1342         2000 : 
    1343         2000 :             // Pick a random key within the range and check it appears in the output
    1344         2000 :             let example_key = input_start.add(prng.next_u32() % range_size);
    1345         2000 : 
    1346         2000 :             // Panic on unwrap if it isn't found
    1347         2000 :             let example_key_frag = fragments
    1348         2000 :                 .iter()
    1349         2146 :                 .find(|f| f.1.contains(&example_key))
    1350         2000 :                 .unwrap();
    1351         2000 : 
    1352         2000 :             // Check that the fragment containing our random key has a nonzero size if
    1353         2000 :             // that key is shard-local
    1354         2000 :             let example_key_local = !shard_identity.is_key_disposable(&example_key);
    1355         2000 :             if example_key_local {
    1356         1084 :                 assert!(example_key_frag.0 > 0);
    1357          916 :             }
    1358              :         }
    1359            2 :     }
    1360              : }
        

Generated by: LCOV version 2.1-beta