Line data Source code
1 : use std::ops::Range;
2 :
3 : use itertools::Itertools;
4 :
5 : use crate::key::Key;
6 : use crate::shard::{ShardCount, ShardIdentity};
7 :
8 : ///
9 : /// Represents a set of Keys, in a compact form.
10 : ///
11 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
12 : pub struct KeySpace {
13 : /// Contiguous ranges of keys that belong to the key space. In key order,
14 : /// and with no overlap.
15 : pub ranges: Vec<Range<Key>>,
16 : }
17 :
18 : impl std::fmt::Display for KeySpace {
19 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 0 : write!(f, "[")?;
21 0 : for range in &self.ranges {
22 0 : write!(f, "{}..{},", range.start, range.end)?;
23 : }
24 0 : write!(f, "]")
25 0 : }
26 : }
27 :
28 : /// A wrapper type for sparse keyspaces.
29 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
30 : pub struct SparseKeySpace(pub KeySpace);
31 :
32 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
33 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
34 : /// shard.
35 : ///
36 : /// When we iterate over keys within this object, we will skip any keys that don't belong
37 : /// to this shard.
38 : ///
39 : /// The start + end keys may not belong to the shard: these specify where layer files should
40 : /// start + end, but we will never actually read/write those keys.
41 : #[derive(Clone, Debug, PartialEq, Eq)]
42 : pub struct ShardedRange<'a> {
43 : pub shard_identity: &'a ShardIdentity,
44 : pub range: Range<Key>,
45 : }
46 :
47 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
48 : // top page in the previous relation's space.
49 3788363 : pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
50 3788363 : debug_assert!(is_contiguous_range(range));
51 3788363 : if range.start.field6 == 0xffffffff {
52 405867 : range.end.field6 + 1
53 : } else {
54 3382496 : range.end.field6 - range.start.field6
55 : }
56 3788363 : }
57 :
58 : /// Return true if this key range includes only keys in the same relation's data blocks, or
59 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
60 : ///
61 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
62 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
63 : /// of a given contiguous range is present on one shard.
64 : ///
65 : /// This matters, because:
66 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
67 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
68 7579901 : pub fn is_contiguous_range(range: &Range<Key>) -> bool {
69 7579901 : range.start.field1 == range.end.field1
70 7574739 : && range.start.field2 == range.end.field2
71 7574735 : && range.start.field3 == range.end.field3
72 7574733 : && range.start.field4 == range.end.field4
73 7574732 : && (range.start.field5 == range.end.field5
74 811738 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
75 7579901 : }
76 :
77 : impl<'a> ShardedRange<'a> {
78 2024 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
79 2024 : Self {
80 2024 : shard_identity,
81 2024 : range,
82 2024 : }
83 2024 : }
84 :
85 : /// Break up this range into chunks, each of which has at least one local key in it if the
86 : /// total range has at least one local key.
87 2017 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
88 : // Optimization for single-key case (e.g. logical size keys)
89 2017 : if self.range.end == self.range.start.add(1) {
90 837 : return vec![(
91 837 : if self.shard_identity.is_key_disposable(&self.range.start) {
92 0 : 0
93 : } else {
94 837 : 1
95 : },
96 837 : self.range,
97 : )];
98 1180 : }
99 :
100 1180 : if !is_contiguous_range(&self.range) {
101 : // Ranges that span relations are not fragmented. We only get these ranges as a result
102 : // of operations that act on existing layers, so we trust that the existing range is
103 : // reasonably small.
104 2 : return vec![(u32::MAX, self.range)];
105 1178 : }
106 :
107 1178 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
108 :
109 1178 : let mut cursor = self.range.start;
110 3423 : while cursor < self.range.end {
111 2245 : let advance_by = self.distance_to_next_boundary(cursor);
112 2245 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
113 :
114 : // If the previous fragment is undersized, then we seek to consume enough
115 : // blocks to complete it.
116 2245 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
117 1067 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
118 135 : Some(frag) => {
119 : // Prev block is complete, want the full number.
120 : (
121 135 : target_nblocks,
122 135 : if is_fragment_disposable {
123 : // If this current range will be empty (not shard-local data), we will merge into previous
124 0 : Some(frag)
125 : } else {
126 135 : None
127 : },
128 : )
129 : }
130 : None => {
131 : // First iteration, want the full number
132 1178 : (target_nblocks, None)
133 : }
134 : };
135 :
136 2245 : let advance_by = if is_fragment_disposable {
137 1371 : advance_by
138 : } else {
139 874 : std::cmp::min(advance_by, want_blocks)
140 : };
141 :
142 2245 : let next_cursor = cursor.add(advance_by);
143 :
144 2245 : let this_frag = (
145 2245 : if is_fragment_disposable {
146 1371 : 0
147 : } else {
148 874 : advance_by
149 : },
150 2245 : cursor..next_cursor,
151 : );
152 2245 : cursor = next_cursor;
153 :
154 2245 : if let Some(last_fragment) = merge_last_fragment {
155 932 : // Previous fragment was short or this one is empty, merge into it
156 932 : last_fragment.0 += this_frag.0;
157 932 : last_fragment.1.end = this_frag.1.end;
158 1313 : } else {
159 1313 : fragments.push(this_frag);
160 1313 : }
161 : }
162 :
163 1178 : fragments
164 2017 : }
165 :
166 : /// Estimate the physical pages that are within this range, on this shard. This returns
167 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
168 1019 : pub fn page_count(&self) -> u32 {
169 : // Special cases for single keys like logical sizes
170 1019 : if self.range.end == self.range.start.add(1) {
171 6 : return if self.shard_identity.is_key_disposable(&self.range.start) {
172 3 : 0
173 : } else {
174 3 : 1
175 : };
176 1013 : }
177 :
178 : // We can only do an authentic calculation of contiguous key ranges
179 1013 : if !is_contiguous_range(&self.range) {
180 4 : return u32::MAX;
181 1009 : }
182 :
183 : // Special case for single sharded tenants: our logical and physical sizes are the same
184 1009 : if self.shard_identity.count < ShardCount::new(2) {
185 524 : return contiguous_range_len(&self.range);
186 485 : }
187 :
188 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
189 : // to Self, and add the stripe's block count to our total if so.
190 485 : let mut result: u64 = 0;
191 485 : let mut cursor = self.range.start;
192 1902 : while cursor < self.range.end {
193 : // Count up to the next stripe_size boundary or end of range
194 1417 : let advance_by = self.distance_to_next_boundary(cursor);
195 :
196 : // If this blocks in this stripe belong to us, add them to our count
197 1417 : if !self.shard_identity.is_key_disposable(&cursor) {
198 46 : result += advance_by as u64;
199 1371 : }
200 :
201 1417 : cursor = cursor.add(advance_by);
202 : }
203 :
204 485 : if result > u32::MAX as u64 {
205 0 : u32::MAX
206 : } else {
207 485 : result as u32
208 : }
209 1019 : }
210 :
211 : /// Advance the cursor to the next potential fragment boundary: this is either
212 : /// a stripe boundary, or the end of the range.
213 3662 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
214 3662 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
215 :
216 3662 : if self.shard_identity.count < ShardCount::new(2) {
217 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
218 816 : return distance_to_range_end;
219 2846 : }
220 :
221 2846 : if cursor.field6 == 0xffffffff {
222 : // We are wrapping from one relation's logical size to the next relation's first data block
223 4 : return 1;
224 2842 : }
225 :
226 2842 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
227 2842 : let stripe_remainder = self.shard_identity.stripe_size.0
228 2842 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
229 :
230 2842 : if cfg!(debug_assertions) {
231 : // We should never overflow field5 and field6 -- our callers check this earlier
232 : // and would have returned their u32::MAX cases if the input range violated this.
233 2842 : let next_cursor = cursor.add(stripe_remainder);
234 2842 : debug_assert!(
235 2842 : next_cursor.field1 == cursor.field1
236 2842 : && next_cursor.field2 == cursor.field2
237 2842 : && next_cursor.field3 == cursor.field3
238 2842 : && next_cursor.field4 == cursor.field4
239 2842 : && next_cursor.field5 == cursor.field5
240 : )
241 0 : }
242 :
243 2842 : std::cmp::min(stripe_remainder, distance_to_range_end)
244 3662 : }
245 :
246 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
247 : /// this function simply calculates the number of pages in the space, without accounting for those
248 : /// pages that would not actually be stored on this node.
249 : ///
250 : /// Don't use this function in code that works with physical entities like layer files.
251 3789341 : pub fn raw_size(range: &Range<Key>) -> u32 {
252 3789341 : if is_contiguous_range(range) {
253 3784177 : contiguous_range_len(range)
254 : } else {
255 5164 : u32::MAX
256 : }
257 3789341 : }
258 : }
259 :
260 : impl KeySpace {
261 : /// Create a key space with a single range.
262 315484 : pub fn single(key_range: Range<Key>) -> Self {
263 315484 : Self {
264 315484 : ranges: vec![key_range],
265 315484 : }
266 315484 : }
267 :
268 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
269 : /// in each partition.
270 : ///
271 169 : pub fn partition(
272 169 : &self,
273 169 : shard_identity: &ShardIdentity,
274 169 : target_size: u64,
275 169 : block_size: u64,
276 169 : ) -> KeyPartitioning {
277 169 : let target_nblocks = (target_size / block_size) as u32;
278 :
279 169 : let mut parts = Vec::new();
280 169 : let mut current_part = Vec::new();
281 169 : let mut current_part_size: usize = 0;
282 1174 : for range in &self.ranges {
283 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
284 : // will respect shard striping rather than assuming all keys within a range are present.
285 1005 : let range = ShardedRange::new(range.clone(), shard_identity);
286 :
287 : // Chunk up the range into parts that each contain up to target_size local blocks
288 1007 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
289 : // If appending the next contiguous range in the keyspace to the current
290 : // partition would cause it to be too large, and our current partition
291 : // covers at least one block that is physically present in this shard,
292 : // then start a new partition
293 1007 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
294 12 : && current_part_size > 0
295 12 : {
296 12 : parts.push(KeySpace {
297 12 : ranges: current_part,
298 12 : });
299 12 : current_part = Vec::new();
300 12 : current_part_size = 0;
301 995 : }
302 1007 : current_part.push(frag_range.start..frag_range.end);
303 1007 : current_part_size += frag_on_shard_size as usize;
304 : }
305 : }
306 :
307 : // add last partition that wasn't full yet.
308 169 : if !current_part.is_empty() {
309 169 : parts.push(KeySpace {
310 169 : ranges: current_part,
311 169 : });
312 169 : }
313 :
314 169 : KeyPartitioning { parts }
315 169 : }
316 :
317 1308051 : pub fn is_empty(&self) -> bool {
318 1308051 : self.total_raw_size() == 0
319 1308051 : }
320 :
321 : /// Merge another keyspace into the current one.
322 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
323 488385 : pub fn merge(&mut self, other: &KeySpace) {
324 488385 : let all_ranges = self
325 488385 : .ranges
326 488385 : .iter()
327 488385 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
328 :
329 488385 : let mut accum = KeySpaceAccum::new();
330 488385 : let mut prev: Option<&Range<Key>> = None;
331 1151530 : for range in all_ranges {
332 663145 : if let Some(prev) = prev {
333 302407 : let overlap =
334 302407 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
335 302407 : assert!(
336 302407 : !overlap,
337 0 : "Attempt to merge ovelapping keyspaces: {prev:?} overlaps {range:?}"
338 : );
339 360738 : }
340 :
341 663145 : accum.add_range(range.clone());
342 663145 : prev = Some(range);
343 : }
344 :
345 488385 : self.ranges = accum.to_keyspace().ranges;
346 488385 : }
347 :
348 : /// Remove all keys in `other` from `self`.
349 : /// This can involve splitting or removing of existing ranges.
350 : /// Returns the removed keyspace
351 1559055 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
352 1559055 : let (self_start, self_end) = match (self.start(), self.end()) {
353 1130542 : (Some(start), Some(end)) => (start, end),
354 : _ => {
355 : // self is empty
356 428513 : return KeySpace::default();
357 : }
358 : };
359 :
360 : // Key spaces are sorted by definition, so skip ahead to the first
361 : // potentially intersecting range. Similarly, ignore ranges that start
362 : // after the current keyspace ends.
363 1130542 : let other_ranges = other
364 1130542 : .ranges
365 1130542 : .iter()
366 1130542 : .skip_while(|range| self_start >= range.end)
367 1130542 : .take_while(|range| self_end > range.start);
368 :
369 1130542 : let mut removed_accum = KeySpaceRandomAccum::new();
370 1808800 : for range in other_ranges {
371 1339000 : while let Some(overlap_at) = self.overlaps_at(range) {
372 660742 : let overlapped = self.ranges[overlap_at].clone();
373 :
374 660742 : if overlapped.start < range.start && overlapped.end <= range.end {
375 1803 : // Higher part of the range is completely overlapped.
376 1803 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
377 1803 : self.ranges[overlap_at].end = range.start;
378 658939 : }
379 660742 : if overlapped.start >= range.start && overlapped.end > range.end {
380 1880 : // Lower part of the range is completely overlapped.
381 1880 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
382 1880 : self.ranges[overlap_at].start = range.end;
383 658862 : }
384 660742 : if overlapped.start < range.start && overlapped.end > range.end {
385 637 : // Middle part of the range is overlapped.
386 637 : removed_accum.add_range(range.clone());
387 637 : self.ranges[overlap_at].end = range.start;
388 637 : self.ranges
389 637 : .insert(overlap_at + 1, range.end..overlapped.end);
390 660105 : }
391 660742 : if overlapped.start >= range.start && overlapped.end <= range.end {
392 656422 : // Whole range is overlapped
393 656422 : removed_accum.add_range(self.ranges[overlap_at].clone());
394 656422 : self.ranges.remove(overlap_at);
395 656422 : }
396 : }
397 : }
398 :
399 1130542 : removed_accum.to_keyspace()
400 1559055 : }
401 :
402 1559060 : pub fn start(&self) -> Option<Key> {
403 1559060 : self.ranges.first().map(|range| range.start)
404 1559060 : }
405 :
406 1559220 : pub fn end(&self) -> Option<Key> {
407 1559220 : self.ranges.last().map(|range| range.end)
408 1559220 : }
409 :
410 : /// The size of the keyspace in pages, before accounting for sharding
411 1318945 : pub fn total_raw_size(&self) -> usize {
412 1318945 : self.ranges
413 1318945 : .iter()
414 1318945 : .map(|range| ShardedRange::raw_size(range) as usize)
415 1318945 : .sum()
416 1318945 : }
417 :
418 1758504 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
419 1758504 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
420 4361 : Ok(0) => None,
421 820041 : Err(0) => None,
422 2410 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
423 931692 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
424 199722 : _ => None,
425 : }
426 1758504 : }
427 :
428 : ///
429 : /// Check if key space contains overlapping range
430 : ///
431 419504 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
432 419504 : self.overlaps_at(range).is_some()
433 419504 : }
434 :
435 : /// Check if the keyspace contains a key
436 418675 : pub fn contains(&self, key: &Key) -> bool {
437 418675 : self.overlaps(&(*key..key.next()))
438 418675 : }
439 : }
440 :
441 : ///
442 : /// Represents a partitioning of the key space.
443 : ///
444 : /// The only kind of partitioning we do is to partition the key space into
445 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
446 : /// But this data structure could represent any partitioning.
447 : ///
448 : #[derive(Clone, Debug, Default)]
449 : pub struct KeyPartitioning {
450 : pub parts: Vec<KeySpace>,
451 : }
452 :
453 : /// Represents a partitioning of the sparse key space.
454 : #[derive(Clone, Debug, Default)]
455 : pub struct SparseKeyPartitioning {
456 : pub parts: Vec<SparseKeySpace>,
457 : }
458 :
459 : impl KeyPartitioning {
460 470 : pub fn new() -> Self {
461 470 : KeyPartitioning { parts: Vec::new() }
462 470 : }
463 :
464 : /// Convert a key partitioning to a sparse partition.
465 235 : pub fn into_sparse(self) -> SparseKeyPartitioning {
466 235 : SparseKeyPartitioning {
467 235 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
468 235 : }
469 235 : }
470 : }
471 :
472 : impl SparseKeyPartitioning {
473 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
474 : /// cause long/dead loops.
475 191 : pub fn into_dense(self) -> KeyPartitioning {
476 : KeyPartitioning {
477 191 : parts: self.parts.into_iter().map(|x| x.0).collect(),
478 : }
479 191 : }
480 : }
481 :
482 : ///
483 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
484 : /// object. This takes care of merging adjacent keys and key ranges into
485 : /// contiguous ranges.
486 : ///
487 : #[derive(Clone, Debug, Default)]
488 : pub struct KeySpaceAccum {
489 : accum: Option<Range<Key>>,
490 :
491 : ranges: Vec<Range<Key>>,
492 : size: u64,
493 : }
494 :
495 : impl KeySpaceAccum {
496 543521 : pub fn new() -> Self {
497 543521 : Self {
498 543521 : accum: None,
499 543521 : ranges: Vec::new(),
500 543521 : size: 0,
501 543521 : }
502 543521 : }
503 :
504 : #[inline(always)]
505 2071653 : pub fn add_key(&mut self, key: Key) {
506 2071653 : self.add_range(singleton_range(key))
507 0 : }
508 :
509 : #[inline(always)]
510 3206345 : pub fn add_range(&mut self, range: Range<Key>) {
511 3206345 : self.size += ShardedRange::raw_size(&range) as u64;
512 :
513 3206345 : match self.accum.as_mut() {
514 2370409 : Some(accum) => {
515 2370409 : if range.start == accum.end {
516 2081662 : accum.end = range.end;
517 2081662 : } else {
518 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
519 : // a new range here if the skipped region was all keys that don't belong on this shard.
520 : // (https://github.com/neondatabase/neon/issues/6247)
521 288747 : assert!(range.start > accum.end);
522 288747 : self.ranges.push(accum.clone());
523 288747 : *accum = range;
524 : }
525 : }
526 835936 : None => self.accum = Some(range),
527 : }
528 663151 : }
529 :
530 963578 : pub fn to_keyspace(mut self) -> KeySpace {
531 963578 : if let Some(accum) = self.accum.take() {
532 835929 : self.ranges.push(accum);
533 835929 : }
534 963578 : KeySpace {
535 963578 : ranges: self.ranges,
536 963578 : }
537 963578 : }
538 :
539 702 : pub fn consume_keyspace(&mut self) -> KeySpace {
540 702 : std::mem::take(self).to_keyspace()
541 702 : }
542 :
543 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
544 : // the keys to be omitted in storage on this shard.
545 1520 : pub fn raw_size(&self) -> u64 {
546 1520 : self.size
547 1520 : }
548 : }
549 :
550 : ///
551 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
552 : /// object. Key ranges may be inserted in any order and can overlap.
553 : ///
554 : #[derive(Clone, Debug, Default)]
555 : pub struct KeySpaceRandomAccum {
556 : ranges: Vec<Range<Key>>,
557 : }
558 :
559 : impl KeySpaceRandomAccum {
560 3654185 : pub fn new() -> Self {
561 3654185 : Self { ranges: Vec::new() }
562 3654185 : }
563 :
564 372877 : pub fn add_key(&mut self, key: Key) {
565 372877 : self.add_range(singleton_range(key))
566 372877 : }
567 :
568 1577995 : pub fn add_range(&mut self, range: Range<Key>) {
569 1577995 : self.ranges.push(range);
570 1577995 : }
571 :
572 502771 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
573 990181 : for range in keyspace.ranges {
574 487410 : self.add_range(range);
575 487410 : }
576 502771 : }
577 :
578 2491326 : pub fn to_keyspace(mut self) -> KeySpace {
579 2491326 : let mut ranges = Vec::new();
580 2491326 : if !self.ranges.is_empty() {
581 1445321 : self.ranges.sort_by_key(|r| r.start);
582 1445321 : let mut start = self.ranges.first().unwrap().start;
583 1445321 : let mut end = self.ranges.first().unwrap().end;
584 3023292 : for r in self.ranges {
585 1577971 : assert!(r.start >= start);
586 1577971 : if r.start > end {
587 58741 : ranges.push(start..end);
588 58741 : start = r.start;
589 58741 : end = r.end;
590 1519230 : } else if r.end > end {
591 31697 : end = r.end;
592 1487533 : }
593 : }
594 1445321 : ranges.push(start..end);
595 1046005 : }
596 2491326 : KeySpace { ranges }
597 2491326 : }
598 :
599 1317718 : pub fn consume_keyspace(&mut self) -> KeySpace {
600 1317718 : let mut prev_accum = KeySpaceRandomAccum::new();
601 1317718 : std::mem::swap(self, &mut prev_accum);
602 :
603 1317718 : prev_accum.to_keyspace()
604 1317718 : }
605 : }
606 :
607 2444530 : pub fn singleton_range(key: Key) -> Range<Key> {
608 2444530 : key..key.next()
609 2444530 : }
610 :
611 : #[cfg(test)]
612 : mod tests {
613 : use std::fmt::Write;
614 :
615 : use rand::{RngCore, SeedableRng};
616 :
617 : use super::*;
618 : use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber, ShardStripeSize};
619 :
620 : // Helper function to create a key range.
621 : //
622 : // Make the tests below less verbose.
623 46 : fn kr(irange: Range<i128>) -> Range<Key> {
624 46 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
625 46 : }
626 :
627 : #[allow(dead_code)]
628 0 : fn dump_keyspace(ks: &KeySpace) {
629 0 : for r in ks.ranges.iter() {
630 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
631 0 : }
632 0 : }
633 :
634 11 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
635 11 : if actual.ranges != expected {
636 0 : let mut msg = String::new();
637 :
638 0 : writeln!(msg, "expected:").unwrap();
639 0 : for r in &expected {
640 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
641 0 : }
642 0 : writeln!(msg, "got:").unwrap();
643 0 : for r in &actual.ranges {
644 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
645 0 : }
646 0 : panic!("{}", msg);
647 11 : }
648 11 : }
649 :
650 : #[test]
651 1 : fn keyspace_consume() {
652 1 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
653 :
654 1 : let mut accum = KeySpaceAccum::new();
655 4 : for range in &ranges {
656 3 : accum.add_range(range.clone());
657 3 : }
658 :
659 1 : let expected_size: u64 = ranges
660 1 : .iter()
661 3 : .map(|r| ShardedRange::raw_size(r) as u64)
662 1 : .sum();
663 1 : assert_eq!(accum.raw_size(), expected_size);
664 :
665 1 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
666 1 : assert_eq!(accum.raw_size(), 0);
667 :
668 1 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
669 1 : assert_eq!(accum.raw_size(), 0);
670 :
671 4 : for range in &ranges {
672 3 : accum.add_range(range.clone());
673 3 : }
674 1 : assert_ks_eq(&accum.to_keyspace(), ranges);
675 1 : }
676 :
677 : #[test]
678 1 : fn keyspace_add_range() {
679 : // two separate ranges
680 : //
681 : // #####
682 : // #####
683 1 : let mut ks = KeySpaceRandomAccum::default();
684 1 : ks.add_range(kr(0..10));
685 1 : ks.add_range(kr(20..30));
686 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
687 :
688 : // two separate ranges, added in reverse order
689 : //
690 : // #####
691 : // #####
692 1 : let mut ks = KeySpaceRandomAccum::default();
693 1 : ks.add_range(kr(20..30));
694 1 : ks.add_range(kr(0..10));
695 :
696 : // add range that is adjacent to the end of an existing range
697 : //
698 : // #####
699 : // #####
700 1 : ks.add_range(kr(0..10));
701 1 : ks.add_range(kr(10..30));
702 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
703 :
704 : // add range that is adjacent to the start of an existing range
705 : //
706 : // #####
707 : // #####
708 1 : let mut ks = KeySpaceRandomAccum::default();
709 1 : ks.add_range(kr(10..30));
710 1 : ks.add_range(kr(0..10));
711 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
712 :
713 : // add range that overlaps with the end of an existing range
714 : //
715 : // #####
716 : // #####
717 1 : let mut ks = KeySpaceRandomAccum::default();
718 1 : ks.add_range(kr(0..10));
719 1 : ks.add_range(kr(5..30));
720 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
721 :
722 : // add range that overlaps with the start of an existing range
723 : //
724 : // #####
725 : // #####
726 1 : let mut ks = KeySpaceRandomAccum::default();
727 1 : ks.add_range(kr(5..30));
728 1 : ks.add_range(kr(0..10));
729 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
730 :
731 : // add range that is fully covered by an existing range
732 : //
733 : // #########
734 : // #####
735 1 : let mut ks = KeySpaceRandomAccum::default();
736 1 : ks.add_range(kr(0..30));
737 1 : ks.add_range(kr(10..20));
738 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
739 :
740 : // add range that extends an existing range from both ends
741 : //
742 : // #####
743 : // #########
744 1 : let mut ks = KeySpaceRandomAccum::default();
745 1 : ks.add_range(kr(10..20));
746 1 : ks.add_range(kr(0..30));
747 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
748 :
749 : // add a range that overlaps with two existing ranges, joining them
750 : //
751 : // ##### #####
752 : // #######
753 1 : let mut ks = KeySpaceRandomAccum::default();
754 1 : ks.add_range(kr(0..10));
755 1 : ks.add_range(kr(20..30));
756 1 : ks.add_range(kr(5..25));
757 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
758 1 : }
759 :
760 : #[test]
761 1 : fn keyspace_overlaps() {
762 1 : let mut ks = KeySpaceRandomAccum::default();
763 1 : ks.add_range(kr(10..20));
764 1 : ks.add_range(kr(30..40));
765 1 : let ks = ks.to_keyspace();
766 :
767 : // ##### #####
768 : // xxxx
769 1 : assert!(!ks.overlaps(&kr(0..5)));
770 :
771 : // ##### #####
772 : // xxxx
773 1 : assert!(!ks.overlaps(&kr(5..9)));
774 :
775 : // ##### #####
776 : // xxxx
777 1 : assert!(!ks.overlaps(&kr(5..10)));
778 :
779 : // ##### #####
780 : // xxxx
781 1 : assert!(ks.overlaps(&kr(5..11)));
782 :
783 : // ##### #####
784 : // xxxx
785 1 : assert!(ks.overlaps(&kr(10..15)));
786 :
787 : // ##### #####
788 : // xxxx
789 1 : assert!(ks.overlaps(&kr(15..20)));
790 :
791 : // ##### #####
792 : // xxxx
793 1 : assert!(ks.overlaps(&kr(15..25)));
794 :
795 : // ##### #####
796 : // xxxx
797 1 : assert!(!ks.overlaps(&kr(22..28)));
798 :
799 : // ##### #####
800 : // xxxx
801 1 : assert!(!ks.overlaps(&kr(25..30)));
802 :
803 : // ##### #####
804 : // xxxx
805 1 : assert!(ks.overlaps(&kr(35..35)));
806 :
807 : // ##### #####
808 : // xxxx
809 1 : assert!(!ks.overlaps(&kr(40..45)));
810 :
811 : // ##### #####
812 : // xxxx
813 1 : assert!(!ks.overlaps(&kr(45..50)));
814 :
815 : // ##### #####
816 : // xxxxxxxxxxx
817 1 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
818 1 : }
819 :
820 : #[test]
821 1 : fn test_remove_full_overlapps() {
822 1 : let mut key_space1 = KeySpace {
823 1 : ranges: vec![
824 1 : Key::from_i128(1)..Key::from_i128(4),
825 1 : Key::from_i128(5)..Key::from_i128(8),
826 1 : Key::from_i128(10)..Key::from_i128(12),
827 1 : ],
828 1 : };
829 1 : let key_space2 = KeySpace {
830 1 : ranges: vec![
831 1 : Key::from_i128(2)..Key::from_i128(3),
832 1 : Key::from_i128(6)..Key::from_i128(7),
833 1 : Key::from_i128(11)..Key::from_i128(13),
834 1 : ],
835 1 : };
836 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
837 1 : let removed_expected = KeySpace {
838 1 : ranges: vec![
839 1 : Key::from_i128(2)..Key::from_i128(3),
840 1 : Key::from_i128(6)..Key::from_i128(7),
841 1 : Key::from_i128(11)..Key::from_i128(12),
842 1 : ],
843 1 : };
844 1 : assert_eq!(removed, removed_expected);
845 :
846 1 : assert_eq!(
847 : key_space1.ranges,
848 1 : vec![
849 1 : Key::from_i128(1)..Key::from_i128(2),
850 1 : Key::from_i128(3)..Key::from_i128(4),
851 1 : Key::from_i128(5)..Key::from_i128(6),
852 1 : Key::from_i128(7)..Key::from_i128(8),
853 1 : Key::from_i128(10)..Key::from_i128(11)
854 : ]
855 : );
856 1 : }
857 :
858 : #[test]
859 1 : fn test_remove_partial_overlaps() {
860 : // Test partial ovelaps
861 1 : let mut key_space1 = KeySpace {
862 1 : ranges: vec![
863 1 : Key::from_i128(1)..Key::from_i128(5),
864 1 : Key::from_i128(7)..Key::from_i128(10),
865 1 : Key::from_i128(12)..Key::from_i128(15),
866 1 : ],
867 1 : };
868 1 : let key_space2 = KeySpace {
869 1 : ranges: vec![
870 1 : Key::from_i128(3)..Key::from_i128(6),
871 1 : Key::from_i128(8)..Key::from_i128(11),
872 1 : Key::from_i128(14)..Key::from_i128(17),
873 1 : ],
874 1 : };
875 :
876 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
877 1 : let removed_expected = KeySpace {
878 1 : ranges: vec![
879 1 : Key::from_i128(3)..Key::from_i128(5),
880 1 : Key::from_i128(8)..Key::from_i128(10),
881 1 : Key::from_i128(14)..Key::from_i128(15),
882 1 : ],
883 1 : };
884 1 : assert_eq!(removed, removed_expected);
885 :
886 1 : assert_eq!(
887 : key_space1.ranges,
888 1 : vec![
889 1 : Key::from_i128(1)..Key::from_i128(3),
890 1 : Key::from_i128(7)..Key::from_i128(8),
891 1 : Key::from_i128(12)..Key::from_i128(14),
892 : ]
893 : );
894 1 : }
895 :
896 : #[test]
897 1 : fn test_remove_no_overlaps() {
898 1 : let mut key_space1 = KeySpace {
899 1 : ranges: vec![
900 1 : Key::from_i128(1)..Key::from_i128(5),
901 1 : Key::from_i128(7)..Key::from_i128(10),
902 1 : Key::from_i128(12)..Key::from_i128(15),
903 1 : ],
904 1 : };
905 1 : let key_space2 = KeySpace {
906 1 : ranges: vec![
907 1 : Key::from_i128(6)..Key::from_i128(7),
908 1 : Key::from_i128(11)..Key::from_i128(12),
909 1 : Key::from_i128(15)..Key::from_i128(17),
910 1 : ],
911 1 : };
912 :
913 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
914 1 : let removed_expected = KeySpace::default();
915 1 : assert_eq!(removed, removed_expected);
916 :
917 1 : assert_eq!(
918 : key_space1.ranges,
919 1 : vec![
920 1 : Key::from_i128(1)..Key::from_i128(5),
921 1 : Key::from_i128(7)..Key::from_i128(10),
922 1 : Key::from_i128(12)..Key::from_i128(15),
923 : ]
924 : );
925 1 : }
926 :
927 : #[test]
928 1 : fn test_remove_one_range_overlaps_multiple() {
929 1 : let mut key_space1 = KeySpace {
930 1 : ranges: vec![
931 1 : Key::from_i128(1)..Key::from_i128(3),
932 1 : Key::from_i128(3)..Key::from_i128(6),
933 1 : Key::from_i128(6)..Key::from_i128(10),
934 1 : Key::from_i128(12)..Key::from_i128(15),
935 1 : Key::from_i128(17)..Key::from_i128(20),
936 1 : Key::from_i128(20)..Key::from_i128(30),
937 1 : Key::from_i128(30)..Key::from_i128(40),
938 1 : ],
939 1 : };
940 1 : let key_space2 = KeySpace {
941 1 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
942 1 : };
943 :
944 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
945 1 : let removed_expected = KeySpace {
946 1 : ranges: vec![
947 1 : Key::from_i128(9)..Key::from_i128(10),
948 1 : Key::from_i128(12)..Key::from_i128(15),
949 1 : Key::from_i128(17)..Key::from_i128(19),
950 1 : ],
951 1 : };
952 1 : assert_eq!(removed, removed_expected);
953 :
954 1 : assert_eq!(
955 : key_space1.ranges,
956 1 : vec![
957 1 : Key::from_i128(1)..Key::from_i128(3),
958 1 : Key::from_i128(3)..Key::from_i128(6),
959 1 : Key::from_i128(6)..Key::from_i128(9),
960 1 : Key::from_i128(19)..Key::from_i128(20),
961 1 : Key::from_i128(20)..Key::from_i128(30),
962 1 : Key::from_i128(30)..Key::from_i128(40),
963 : ]
964 : );
965 1 : }
966 : #[test]
967 1 : fn sharded_range_relation_gap() {
968 1 : let shard_identity =
969 1 : ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
970 :
971 1 : let range = ShardedRange::new(
972 1 : Range {
973 1 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
974 1 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
975 1 : },
976 1 : &shard_identity,
977 : );
978 :
979 : // Key range spans relations, expect MAX
980 1 : assert_eq!(range.page_count(), u32::MAX);
981 1 : }
982 :
983 : #[test]
984 1 : fn shard_identity_keyspaces_single_key() {
985 1 : let shard_identity =
986 1 : ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
987 :
988 1 : let range = ShardedRange::new(
989 1 : Range {
990 1 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
991 1 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
992 1 : },
993 1 : &shard_identity,
994 : );
995 : // Single-key range on logical size key
996 1 : assert_eq!(range.page_count(), 1);
997 1 : }
998 :
999 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
1000 : #[test]
1001 1 : fn contiguous_range_check() {
1002 1 : assert!(!is_contiguous_range(
1003 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1004 1 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1005 1 : ),);
1006 :
1007 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1008 : // not considered a rel block range because 0xffffffff stores logical sizes,
1009 : // not blocks.
1010 1 : assert!(!is_contiguous_range(
1011 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1012 1 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1013 1 : ),);
1014 :
1015 : // Keys within the normal data region of a relation
1016 1 : assert!(is_contiguous_range(
1017 1 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1018 1 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1019 : ),);
1020 :
1021 : // The logical size key of one forkno, then some blocks in the next
1022 1 : assert!(is_contiguous_range(
1023 1 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1024 1 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1025 : ),);
1026 1 : }
1027 :
1028 : #[test]
1029 1 : fn shard_identity_keyspaces_forkno_gap() {
1030 1 : let shard_identity =
1031 1 : ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
1032 :
1033 1 : let range = ShardedRange::new(
1034 1 : Range {
1035 1 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1036 1 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1037 1 : },
1038 1 : &shard_identity,
1039 : );
1040 :
1041 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1042 : // calculate a valid size, because we have no way to know if they keys between start
1043 : // and end are actually in use.
1044 1 : assert_eq!(range.page_count(), u32::MAX);
1045 1 : }
1046 :
1047 : #[test]
1048 1 : fn shard_identity_keyspaces_one_relation() {
1049 5 : for shard_number in 0..4 {
1050 4 : let shard_identity = ShardIdentity::new(
1051 4 : ShardNumber(shard_number),
1052 4 : ShardCount::new(4),
1053 : DEFAULT_STRIPE_SIZE,
1054 : )
1055 4 : .unwrap();
1056 :
1057 4 : let range = ShardedRange::new(
1058 4 : Range {
1059 4 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1060 4 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1061 4 : },
1062 4 : &shard_identity,
1063 : );
1064 :
1065 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1066 4 : if shard_number == 0 {
1067 1 : assert_eq!(range.page_count(), 1);
1068 : } else {
1069 : // Other shards should perceive the range's size as zero
1070 3 : assert_eq!(range.page_count(), 0);
1071 : }
1072 : }
1073 1 : }
1074 :
1075 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1076 : /// the total page count in the range and the fragments.
1077 1012 : fn do_fragment(
1078 1012 : range_start: Key,
1079 1012 : range_end: Key,
1080 1012 : shard_identity: &ShardIdentity,
1081 1012 : target_nblocks: u32,
1082 1012 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1083 1012 : let range = ShardedRange::new(
1084 1012 : Range {
1085 1012 : start: range_start,
1086 1012 : end: range_end,
1087 1012 : },
1088 1012 : shard_identity,
1089 : );
1090 :
1091 1012 : let page_count = range.page_count();
1092 1012 : let fragments = range.fragment(target_nblocks);
1093 :
1094 : // Invariant: we always get at least one fragment
1095 1012 : assert!(!fragments.is_empty());
1096 :
1097 : // Invariant: the first/last fragment start/end should equal the input start/end
1098 1012 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1099 1012 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1100 :
1101 1012 : if page_count > 0 {
1102 : // Invariant: every fragment must contain at least one shard-local page, if the
1103 : // total range contains at least one shard-local page
1104 702 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1105 569 : if !all_nonzero {
1106 0 : eprintln!("Found a zero-length fragment: {fragments:?}");
1107 569 : }
1108 569 : assert!(all_nonzero);
1109 : } else {
1110 : // A range with no shard-local pages should always be returned as a single fragment
1111 443 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1112 : }
1113 :
1114 : // Invariant: fragments must be ordered and non-overlapping
1115 1012 : let mut last: Option<Range<Key>> = None;
1116 2157 : for frag in &fragments {
1117 1145 : if let Some(last) = last {
1118 133 : assert!(frag.1.start >= last.end);
1119 133 : assert!(frag.1.start > last.start);
1120 1012 : }
1121 1145 : last = Some(frag.1.clone())
1122 : }
1123 :
1124 : // Invariant: fragments respect target_nblocks
1125 2157 : for frag in &fragments {
1126 1145 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1127 : }
1128 :
1129 1012 : (page_count, fragments)
1130 1012 : }
1131 :
1132 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1133 : /// for a single tenant.
1134 : #[test]
1135 1 : fn sharded_range_fragment_simple() {
1136 : const SHARD_COUNT: u8 = 4;
1137 : const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
1138 :
1139 1 : let shard_identity = ShardIdentity::new(
1140 1 : ShardNumber(0),
1141 1 : ShardCount::new(SHARD_COUNT),
1142 1 : ShardStripeSize(STRIPE_SIZE),
1143 : )
1144 1 : .unwrap();
1145 :
1146 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1147 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1148 1 : let mut input_end = input_start;
1149 1 : input_end.field6 += STRIPE_SIZE; // field6 is block number
1150 :
1151 : // Ask for stripe_size blocks, we get the whole stripe
1152 1 : assert_eq!(
1153 1 : do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE),
1154 1 : (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
1155 : );
1156 :
1157 : // Ask for more, we still get the whole stripe
1158 1 : assert_eq!(
1159 1 : do_fragment(input_start, input_end, &shard_identity, 10 * STRIPE_SIZE),
1160 1 : (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
1161 : );
1162 :
1163 : // Ask for target_nblocks of half the stripe size, we get two halves
1164 1 : assert_eq!(
1165 1 : do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE / 2),
1166 1 : (
1167 1 : STRIPE_SIZE,
1168 1 : vec![
1169 1 : (
1170 1 : STRIPE_SIZE / 2,
1171 1 : input_start..input_start.add(STRIPE_SIZE / 2)
1172 1 : ),
1173 1 : (STRIPE_SIZE / 2, input_start.add(STRIPE_SIZE / 2)..input_end)
1174 1 : ]
1175 1 : )
1176 : );
1177 1 : }
1178 :
1179 : #[test]
1180 1 : fn sharded_range_fragment_multi_stripe() {
1181 : const SHARD_COUNT: u8 = 4;
1182 : const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
1183 : const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
1184 :
1185 1 : let shard_identity = ShardIdentity::new(
1186 1 : ShardNumber(0),
1187 1 : ShardCount::new(SHARD_COUNT),
1188 1 : ShardStripeSize(STRIPE_SIZE),
1189 : )
1190 1 : .unwrap();
1191 :
1192 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1193 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1194 1 : let mut input_end = input_start;
1195 1 : input_end.field6 += RANGE_SIZE; // field6 is block number
1196 :
1197 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1198 : // its size to be just the blocks belonging to our shard.
1199 1 : assert_eq!(
1200 1 : do_fragment(input_start, input_end, &shard_identity, RANGE_SIZE),
1201 1 : (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
1202 : );
1203 :
1204 : // Ask for a sub-stripe quantity that results in 3 fragments.
1205 1 : let limit = STRIPE_SIZE / 3 + 1;
1206 1 : assert_eq!(
1207 1 : do_fragment(input_start, input_end, &shard_identity, limit),
1208 1 : (
1209 1 : STRIPE_SIZE,
1210 1 : vec![
1211 1 : (limit, input_start..input_start.add(limit)),
1212 1 : (limit, input_start.add(limit)..input_start.add(2 * limit)),
1213 1 : (
1214 1 : STRIPE_SIZE - 2 * limit,
1215 1 : input_start.add(2 * limit)..input_end
1216 1 : ),
1217 1 : ]
1218 1 : )
1219 : );
1220 :
1221 : // Try on a range that starts slightly after our owned stripe
1222 1 : assert_eq!(
1223 1 : do_fragment(input_start.add(1), input_end, &shard_identity, RANGE_SIZE),
1224 1 : (
1225 1 : STRIPE_SIZE - 1,
1226 1 : vec![(STRIPE_SIZE - 1, input_start.add(1)..input_end)]
1227 1 : )
1228 : );
1229 1 : }
1230 :
1231 : /// Test our calculations work correctly when we start a range from the logical size key of
1232 : /// a previous relation.
1233 : #[test]
1234 1 : fn sharded_range_fragment_starting_from_logical_size() {
1235 : const SHARD_COUNT: u8 = 4;
1236 : const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
1237 : const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
1238 :
1239 1 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1240 1 : let mut input_end = Key::from_hex("000000067f00000001000000ae0100000000").unwrap();
1241 1 : input_end.field6 += RANGE_SIZE; // field6 is block number
1242 :
1243 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1244 1 : let shard_identity = ShardIdentity::new(
1245 1 : ShardNumber(0),
1246 1 : ShardCount::new(SHARD_COUNT),
1247 1 : ShardStripeSize(STRIPE_SIZE),
1248 : )
1249 1 : .unwrap();
1250 1 : assert_eq!(
1251 1 : do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
1252 1 : (
1253 1 : STRIPE_SIZE + 1,
1254 1 : vec![(STRIPE_SIZE + 1, input_start..input_end)]
1255 1 : )
1256 : );
1257 :
1258 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1259 : // store all logical sizes)
1260 1 : let shard_identity = ShardIdentity::new(
1261 1 : ShardNumber(1),
1262 1 : ShardCount::new(SHARD_COUNT),
1263 1 : ShardStripeSize(STRIPE_SIZE),
1264 : )
1265 1 : .unwrap();
1266 1 : assert_eq!(
1267 1 : do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
1268 1 : (1, vec![(1, input_start..input_end)])
1269 : );
1270 1 : }
1271 :
1272 : /// Test that ShardedRange behaves properly when used on un-sharded data
1273 : #[test]
1274 1 : fn sharded_range_fragment_unsharded() {
1275 1 : let shard_identity = ShardIdentity::unsharded();
1276 :
1277 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1278 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1279 1 : assert_eq!(
1280 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1281 1 : (
1282 1 : 0x10000,
1283 1 : vec![
1284 1 : (0x8000, input_start..input_start.add(0x8000)),
1285 1 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1286 1 : ]
1287 1 : )
1288 : );
1289 1 : }
1290 :
1291 : #[test]
1292 1 : fn sharded_range_fragment_cross_relation() {
1293 1 : let shard_identity = ShardIdentity::unsharded();
1294 :
1295 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1296 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1297 1 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1298 1 : assert_eq!(
1299 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1300 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1301 : );
1302 :
1303 : // Same, but using a sharded identity
1304 1 : let shard_identity =
1305 1 : ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
1306 1 : assert_eq!(
1307 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1308 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1309 : );
1310 1 : }
1311 :
1312 : #[test]
1313 1 : fn sharded_range_fragment_tiny_nblocks() {
1314 1 : let shard_identity = ShardIdentity::unsharded();
1315 :
1316 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1317 1 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1318 1 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1319 1 : assert_eq!(
1320 1 : do_fragment(input_start, input_end, &shard_identity, 16),
1321 1 : (
1322 1 : 0x38,
1323 1 : vec![
1324 1 : (16, input_start..input_start.add(16)),
1325 1 : (16, input_start.add(16)..input_start.add(32)),
1326 1 : (16, input_start.add(32)..input_start.add(48)),
1327 1 : (8, input_start.add(48)..input_end),
1328 1 : ]
1329 1 : )
1330 : );
1331 1 : }
1332 :
1333 : #[test]
1334 1 : fn sharded_range_fragment_fuzz() {
1335 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1336 : // the test to be reproducible.
1337 1 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1338 :
1339 1001 : for _i in 0..1000 {
1340 1000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1341 519 : ShardIdentity::unsharded()
1342 : } else {
1343 481 : let shard_count = prng.next_u32() % 127 + 1;
1344 481 : ShardIdentity::new(
1345 481 : ShardNumber((prng.next_u32() % shard_count) as u8),
1346 481 : ShardCount::new(shard_count as u8),
1347 : DEFAULT_STRIPE_SIZE,
1348 : )
1349 481 : .unwrap()
1350 : };
1351 :
1352 1000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1353 :
1354 1000 : let start_offset = prng.next_u32() % 16384;
1355 :
1356 : // Try ranges up to 4GiB in size, that are always at least 1
1357 1000 : let range_size = prng.next_u32() % 8192 + 1;
1358 :
1359 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1360 1000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1361 1000 : .unwrap()
1362 1000 : .add(start_offset);
1363 1000 : let input_end = input_start.add(range_size);
1364 :
1365 : // This test's main success conditions are the invariants baked into do_fragment
1366 1000 : let (_total_size, fragments) =
1367 1000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1368 :
1369 : // Pick a random key within the range and check it appears in the output
1370 1000 : let example_key = input_start.add(prng.next_u32() % range_size);
1371 :
1372 : // Panic on unwrap if it isn't found
1373 1000 : let example_key_frag = fragments
1374 1000 : .iter()
1375 1073 : .find(|f| f.1.contains(&example_key))
1376 1000 : .unwrap();
1377 :
1378 : // Check that the fragment containing our random key has a nonzero size if
1379 : // that key is shard-local
1380 1000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1381 1000 : if example_key_local {
1382 536 : assert!(example_key_frag.0 > 0);
1383 464 : }
1384 : }
1385 1 : }
1386 : }
|