Line data Source code
1 : use postgres_ffi::BLCKSZ;
2 : use std::ops::Range;
3 :
4 : use crate::{
5 : key::Key,
6 : shard::{ShardCount, ShardIdentity},
7 : };
8 : use itertools::Itertools;
9 :
10 : ///
11 : /// Represents a set of Keys, in a compact form.
12 : ///
13 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
14 : pub struct KeySpace {
15 : /// Contiguous ranges of keys that belong to the key space. In key order,
16 : /// and with no overlap.
17 : pub ranges: Vec<Range<Key>>,
18 : }
19 :
20 : /// A wrapper type for sparse keyspaces.
21 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
22 : pub struct SparseKeySpace(pub KeySpace);
23 :
24 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
25 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
26 : /// shard.
27 : ///
28 : /// When we iterate over keys within this object, we will skip any keys that don't belong
29 : /// to this shard.
30 : ///
31 : /// The start + end keys may not belong to the shard: these specify where layer files should
32 : /// start + end, but we will never actually read/write those keys.
33 : #[derive(Clone, Debug, PartialEq, Eq)]
34 : pub struct ShardedRange<'a> {
35 : pub shard_identity: &'a ShardIdentity,
36 : pub range: Range<Key>,
37 : }
38 :
39 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
40 : // top page in the previous relation's space.
41 4349383 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
42 4349383 : debug_assert!(is_contiguous_range(range));
43 4349383 : if range.start.field6 == 0xffffffff {
44 8 : range.end.field6 + 1
45 : } else {
46 4349375 : range.end.field6 - range.start.field6
47 : }
48 4349383 : }
49 :
50 : /// Return true if this key range includes only keys in the same relation's data blocks, or
51 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
52 : ///
53 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
54 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
55 : /// of a given contiguous range is present on one shard.
56 : ///
57 : /// This matters, because:
58 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
59 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
60 8698716 : fn is_contiguous_range(range: &Range<Key>) -> bool {
61 8698716 : range.start.field1 == range.end.field1
62 8698560 : && range.start.field2 == range.end.field2
63 8698532 : && range.start.field3 == range.end.field3
64 8698462 : && range.start.field4 == range.end.field4
65 8698460 : && (range.start.field5 == range.end.field5
66 24 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
67 8698716 : }
68 :
69 : impl<'a> ShardedRange<'a> {
70 3580 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
71 3580 : Self {
72 3580 : shard_identity,
73 3580 : range,
74 3580 : }
75 3580 : }
76 :
77 : /// Break up this range into chunks, each of which has at least one local key in it if the
78 : /// total range has at least one local key.
79 3566 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
80 3566 : // Optimization for single-key case (e.g. logical size keys)
81 3566 : if self.range.end == self.range.start.add(1) {
82 1287 : return vec![(
83 1287 : if self.shard_identity.is_key_disposable(&self.range.start) {
84 0 : 0
85 : } else {
86 1287 : 1
87 : },
88 1287 : self.range,
89 : )];
90 2279 : }
91 2279 :
92 2279 : if !is_contiguous_range(&self.range) {
93 : // Ranges that span relations are not fragmented. We only get these ranges as a result
94 : // of operations that act on existing layers, so we trust that the existing range is
95 : // reasonably small.
96 4 : return vec![(u32::MAX, self.range)];
97 2275 : }
98 2275 :
99 2275 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
100 2275 :
101 2275 : let mut cursor = self.range.start;
102 4844 : while cursor < self.range.end {
103 2569 : let advance_by = self.distance_to_next_boundary(cursor);
104 2569 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
105 :
106 : // If the previous fragment is undersized, then we seek to consume enough
107 : // blocks to complete it.
108 2569 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
109 294 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
110 272 : Some(frag) => {
111 272 : // Prev block is complete, want the full number.
112 272 : (
113 272 : target_nblocks,
114 272 : if is_fragment_disposable {
115 : // If this current range will be empty (not shard-local data), we will merge into previous
116 0 : Some(frag)
117 : } else {
118 272 : None
119 : },
120 : )
121 : }
122 : None => {
123 : // First iteration, want the full number
124 2275 : (target_nblocks, None)
125 : }
126 : };
127 :
128 2569 : let advance_by = if is_fragment_disposable {
129 936 : advance_by
130 : } else {
131 1633 : std::cmp::min(advance_by, want_blocks)
132 : };
133 :
134 2569 : let next_cursor = cursor.add(advance_by);
135 :
136 2569 : let this_frag = (
137 2569 : if is_fragment_disposable {
138 936 : 0
139 : } else {
140 1633 : advance_by
141 : },
142 2569 : cursor..next_cursor,
143 2569 : );
144 2569 : cursor = next_cursor;
145 :
146 2569 : if let Some(last_fragment) = merge_last_fragment {
147 22 : // Previous fragment was short or this one is empty, merge into it
148 22 : last_fragment.0 += this_frag.0;
149 22 : last_fragment.1.end = this_frag.1.end;
150 2547 : } else {
151 2547 : fragments.push(this_frag);
152 2547 : }
153 : }
154 :
155 2275 : fragments
156 3566 : }
157 :
158 : /// Estimate the physical pages that are within this range, on this shard. This returns
159 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
160 2038 : pub fn page_count(&self) -> u32 {
161 2038 : // Special cases for single keys like logical sizes
162 2038 : if self.range.end == self.range.start.add(1) {
163 12 : return if self.shard_identity.is_key_disposable(&self.range.start) {
164 6 : 0
165 : } else {
166 6 : 1
167 : };
168 2026 : }
169 2026 :
170 2026 : // We can only do an authentic calculation of contiguous key ranges
171 2026 : if !is_contiguous_range(&self.range) {
172 8 : return u32::MAX;
173 2018 : }
174 2018 :
175 2018 : // Special case for single sharded tenants: our logical and physical sizes are the same
176 2018 : if self.shard_identity.count < ShardCount::new(2) {
177 1048 : return contiguous_range_len(&self.range);
178 970 : }
179 970 :
180 970 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
181 970 : // to Self, and add the stripe's block count to our total if so.
182 970 : let mut result: u64 = 0;
183 970 : let mut cursor = self.range.start;
184 1962 : while cursor < self.range.end {
185 : // Count up to the next stripe_size boundary or end of range
186 992 : let advance_by = self.distance_to_next_boundary(cursor);
187 992 :
188 992 : // If this blocks in this stripe belong to us, add them to our count
189 992 : if !self.shard_identity.is_key_disposable(&cursor) {
190 56 : result += advance_by as u64;
191 936 : }
192 :
193 992 : cursor = cursor.add(advance_by);
194 : }
195 :
196 970 : if result > u32::MAX as u64 {
197 0 : u32::MAX
198 : } else {
199 970 : result as u32
200 : }
201 2038 : }
202 :
203 : /// Advance the cursor to the next potential fragment boundary: this is either
204 : /// a stripe boundary, or the end of the range.
205 3561 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
206 3561 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
207 3561 :
208 3561 : if self.shard_identity.count < ShardCount::new(2) {
209 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
210 1563 : return distance_to_range_end;
211 1998 : }
212 1998 :
213 1998 : if cursor.field6 == 0xffffffff {
214 : // We are wrapping from one relation's logical size to the next relation's first data block
215 8 : return 1;
216 1990 : }
217 1990 :
218 1990 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
219 1990 : let stripe_remainder = self.shard_identity.stripe_size.0
220 1990 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
221 1990 :
222 1990 : if cfg!(debug_assertions) {
223 : // We should never overflow field5 and field6 -- our callers check this earlier
224 : // and would have returned their u32::MAX cases if the input range violated this.
225 1990 : let next_cursor = cursor.add(stripe_remainder);
226 1990 : debug_assert!(
227 1990 : next_cursor.field1 == cursor.field1
228 1990 : && next_cursor.field2 == cursor.field2
229 1990 : && next_cursor.field3 == cursor.field3
230 1990 : && next_cursor.field4 == cursor.field4
231 1990 : && next_cursor.field5 == cursor.field5
232 : )
233 0 : }
234 :
235 1990 : std::cmp::min(stripe_remainder, distance_to_range_end)
236 3561 : }
237 :
238 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
239 : /// this function simply calculates the number of pages in the space, without accounting for those
240 : /// pages that would not actually be stored on this node.
241 : ///
242 : /// Don't use this function in code that works with physical entities like layer files.
243 4345020 : pub fn raw_size(range: &Range<Key>) -> u32 {
244 4345020 : if is_contiguous_range(range) {
245 4344774 : contiguous_range_len(range)
246 : } else {
247 246 : u32::MAX
248 : }
249 4345020 : }
250 : }
251 :
252 : impl KeySpace {
253 : /// Create a key space with a single range.
254 132 : pub fn single(key_range: Range<Key>) -> Self {
255 132 : Self {
256 132 : ranges: vec![key_range],
257 132 : }
258 132 : }
259 :
260 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
261 : /// in each partition.
262 : ///
263 257 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
264 257 : // Assume that each value is 8k in size.
265 257 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
266 257 :
267 257 : let mut parts = Vec::new();
268 257 : let mut current_part = Vec::new();
269 257 : let mut current_part_size: usize = 0;
270 1799 : for range in &self.ranges {
271 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
272 : // will respect shard striping rather than assuming all keys within a range are present.
273 1542 : let range = ShardedRange::new(range.clone(), shard_identity);
274 :
275 : // Chunk up the range into parts that each contain up to target_size local blocks
276 1548 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
277 : // If appending the next contiguous range in the keyspace to the current
278 : // partition would cause it to be too large, and our current partition
279 : // covers at least one block that is physically present in this shard,
280 : // then start a new partition
281 1548 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
282 26 : && current_part_size > 0
283 26 : {
284 26 : parts.push(KeySpace {
285 26 : ranges: current_part,
286 26 : });
287 26 : current_part = Vec::new();
288 26 : current_part_size = 0;
289 1522 : }
290 1548 : current_part.push(frag_range.start..frag_range.end);
291 1548 : current_part_size += frag_on_shard_size as usize;
292 : }
293 : }
294 :
295 : // add last partition that wasn't full yet.
296 257 : if !current_part.is_empty() {
297 257 : parts.push(KeySpace {
298 257 : ranges: current_part,
299 257 : });
300 257 : }
301 :
302 257 : KeyPartitioning { parts }
303 257 : }
304 :
305 618 : pub fn is_empty(&self) -> bool {
306 618 : self.total_raw_size() == 0
307 618 : }
308 :
309 : /// Merge another keyspace into the current one.
310 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
311 414 : pub fn merge(&mut self, other: &KeySpace) {
312 414 : let all_ranges = self
313 414 : .ranges
314 414 : .iter()
315 70435 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
316 414 :
317 414 : let mut accum = KeySpaceAccum::new();
318 414 : let mut prev: Option<&Range<Key>> = None;
319 107481 : for range in all_ranges {
320 107067 : if let Some(prev) = prev {
321 106859 : let overlap =
322 106859 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
323 106859 : assert!(
324 106859 : !overlap,
325 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
326 : prev, range
327 : );
328 208 : }
329 :
330 107067 : accum.add_range(range.clone());
331 107067 : prev = Some(range);
332 : }
333 :
334 414 : self.ranges = accum.to_keyspace().ranges;
335 414 : }
336 :
337 : /// Remove all keys in `other` from `self`.
338 : /// This can involve splitting or removing of existing ranges.
339 : /// Returns the removed keyspace
340 866 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
341 866 : let (self_start, self_end) = match (self.start(), self.end()) {
342 796 : (Some(start), Some(end)) => (start, end),
343 : _ => {
344 : // self is empty
345 70 : return KeySpace::default();
346 : }
347 : };
348 :
349 : // Key spaces are sorted by definition, so skip ahead to the first
350 : // potentially intersecting range. Similarly, ignore ranges that start
351 : // after the current keyspace ends.
352 796 : let other_ranges = other
353 796 : .ranges
354 796 : .iter()
355 10548 : .skip_while(|range| self_start >= range.end)
356 70362 : .take_while(|range| self_end > range.start);
357 796 :
358 796 : let mut removed_accum = KeySpaceRandomAccum::new();
359 71108 : for range in other_ranges {
360 202596 : while let Some(overlap_at) = self.overlaps_at(range) {
361 132284 : let overlapped = self.ranges[overlap_at].clone();
362 132284 :
363 132284 : if overlapped.start < range.start && overlapped.end <= range.end {
364 16 : // Higher part of the range is completely overlapped.
365 16 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
366 16 : self.ranges[overlap_at].end = range.start;
367 132268 : }
368 132284 : if overlapped.start >= range.start && overlapped.end > range.end {
369 71 : // Lower part of the range is completely overlapped.
370 71 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
371 71 : self.ranges[overlap_at].start = range.end;
372 132213 : }
373 132284 : if overlapped.start < range.start && overlapped.end > range.end {
374 69973 : // Middle part of the range is overlapped.
375 69973 : removed_accum.add_range(range.clone());
376 69973 : self.ranges[overlap_at].end = range.start;
377 69973 : self.ranges
378 69973 : .insert(overlap_at + 1, range.end..overlapped.end);
379 69973 : }
380 132284 : if overlapped.start >= range.start && overlapped.end <= range.end {
381 62224 : // Whole range is overlapped
382 62224 : removed_accum.add_range(self.ranges[overlap_at].clone());
383 62224 : self.ranges.remove(overlap_at);
384 70060 : }
385 : }
386 : }
387 :
388 796 : removed_accum.to_keyspace()
389 866 : }
390 :
391 886 : pub fn start(&self) -> Option<Key> {
392 886 : self.ranges.first().map(|range| range.start)
393 886 : }
394 :
395 866 : pub fn end(&self) -> Option<Key> {
396 866 : self.ranges.last().map(|range| range.end)
397 866 : }
398 :
399 : /// The size of the keyspace in pages, before accounting for sharding
400 1704 : pub fn total_raw_size(&self) -> usize {
401 1704 : self.ranges
402 1704 : .iter()
403 73124 : .map(|range| ShardedRange::raw_size(range) as usize)
404 1704 : .sum()
405 1704 : }
406 :
407 278593 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
408 2077263 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
409 114 : Ok(0) => None,
410 2584 : Err(0) => None,
411 112075 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
412 163820 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
413 112778 : _ => None,
414 : }
415 278593 : }
416 :
417 : ///
418 : /// Check if key space contains overlapping range
419 : ///
420 75997 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
421 75997 : self.overlaps_at(range).is_some()
422 75997 : }
423 :
424 : /// Check if the keyspace contains a key
425 74220 : pub fn contains(&self, key: &Key) -> bool {
426 74220 : self.overlaps(&(*key..key.next()))
427 74220 : }
428 : }
429 :
430 : ///
431 : /// Represents a partitioning of the key space.
432 : ///
433 : /// The only kind of partitioning we do is to partition the key space into
434 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
435 : /// But this data structure could represent any partitioning.
436 : ///
437 : #[derive(Clone, Debug, Default)]
438 : pub struct KeyPartitioning {
439 : pub parts: Vec<KeySpace>,
440 : }
441 :
442 : /// Represents a partitioning of the sparse key space.
443 : #[derive(Clone, Debug, Default)]
444 : pub struct SparseKeyPartitioning {
445 : pub parts: Vec<SparseKeySpace>,
446 : }
447 :
448 : impl KeyPartitioning {
449 758 : pub fn new() -> Self {
450 758 : KeyPartitioning { parts: Vec::new() }
451 758 : }
452 :
453 : /// Convert a key partitioning to a sparse partition.
454 379 : pub fn into_sparse(self) -> SparseKeyPartitioning {
455 379 : SparseKeyPartitioning {
456 379 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
457 379 : }
458 379 : }
459 : }
460 :
461 : impl SparseKeyPartitioning {
462 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
463 : /// cause long/dead loops.
464 364 : pub fn into_dense(self) -> KeyPartitioning {
465 364 : KeyPartitioning {
466 364 : parts: self.parts.into_iter().map(|x| x.0).collect(),
467 364 : }
468 364 : }
469 : }
470 :
471 : ///
472 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
473 : /// object. This takes care of merging adjacent keys and key ranges into
474 : /// contiguous ranges.
475 : ///
476 : #[derive(Clone, Debug, Default)]
477 : pub struct KeySpaceAccum {
478 : accum: Option<Range<Key>>,
479 :
480 : ranges: Vec<Range<Key>>,
481 : size: u64,
482 : }
483 :
484 : impl KeySpaceAccum {
485 79898 : pub fn new() -> Self {
486 79898 : Self {
487 79898 : accum: None,
488 79898 : ranges: Vec::new(),
489 79898 : size: 0,
490 79898 : }
491 79898 : }
492 :
493 : #[inline(always)]
494 4081170 : pub fn add_key(&mut self, key: Key) {
495 4081170 : self.add_range(singleton_range(key))
496 4081170 : }
497 :
498 : #[inline(always)]
499 4271890 : pub fn add_range(&mut self, range: Range<Key>) {
500 4271890 : self.size += ShardedRange::raw_size(&range) as u64;
501 4271890 :
502 4271890 : match self.accum.as_mut() {
503 4178232 : Some(accum) => {
504 4178232 : if range.start == accum.end {
505 4067064 : accum.end = range.end;
506 4067064 : } else {
507 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
508 : // a new range here if the skipped region was all keys that don't belong on this shard.
509 : // (https://github.com/neondatabase/neon/issues/6247)
510 111168 : assert!(range.start > accum.end);
511 111168 : self.ranges.push(accum.clone());
512 111168 : *accum = range;
513 : }
514 : }
515 93658 : None => self.accum = Some(range),
516 : }
517 4271890 : }
518 :
519 89034 : pub fn to_keyspace(mut self) -> KeySpace {
520 89034 : if let Some(accum) = self.accum.take() {
521 85546 : self.ranges.push(accum);
522 85546 : }
523 89034 : KeySpace {
524 89034 : ranges: self.ranges,
525 89034 : }
526 89034 : }
527 :
528 938 : pub fn consume_keyspace(&mut self) -> KeySpace {
529 938 : std::mem::take(self).to_keyspace()
530 938 : }
531 :
532 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
533 : // the keys to be omitted in storage on this shard.
534 2178 : pub fn raw_size(&self) -> u64 {
535 2178 : self.size
536 2178 : }
537 : }
538 :
539 : ///
540 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
541 : /// object. Key ranges may be inserted in any order and can overlap.
542 : ///
543 : #[derive(Clone, Debug, Default)]
544 : pub struct KeySpaceRandomAccum {
545 : ranges: Vec<Range<Key>>,
546 : }
547 :
548 : impl KeySpaceRandomAccum {
549 2368 : pub fn new() -> Self {
550 2368 : Self { ranges: Vec::new() }
551 2368 : }
552 :
553 40508 : pub fn add_key(&mut self, key: Key) {
554 40508 : self.add_range(singleton_range(key))
555 40508 : }
556 :
557 236797 : pub fn add_range(&mut self, range: Range<Key>) {
558 236797 : self.ranges.push(range);
559 236797 : }
560 :
561 63895 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
562 127792 : for range in keyspace.ranges {
563 63897 : self.add_range(range);
564 63897 : }
565 63895 : }
566 :
567 1630 : pub fn to_keyspace(mut self) -> KeySpace {
568 1630 : let mut ranges = Vec::new();
569 1630 : if !self.ranges.is_empty() {
570 472978 : self.ranges.sort_by_key(|r| r.start);
571 936 : let mut start = self.ranges.first().unwrap().start;
572 936 : let mut end = self.ranges.first().unwrap().end;
573 237661 : for r in self.ranges {
574 236725 : assert!(r.start >= start);
575 236725 : if r.start > end {
576 235395 : ranges.push(start..end);
577 235395 : start = r.start;
578 235395 : end = r.end;
579 235395 : } else if r.end > end {
580 380 : end = r.end;
581 950 : }
582 : }
583 936 : ranges.push(start..end);
584 694 : }
585 1630 : KeySpace { ranges }
586 1630 : }
587 :
588 816 : pub fn consume_keyspace(&mut self) -> KeySpace {
589 816 : let mut prev_accum = KeySpaceRandomAccum::new();
590 816 : std::mem::swap(self, &mut prev_accum);
591 816 :
592 816 : prev_accum.to_keyspace()
593 816 : }
594 : }
595 :
596 4121678 : pub fn singleton_range(key: Key) -> Range<Key> {
597 4121678 : key..key.next()
598 4121678 : }
599 :
600 : #[cfg(test)]
601 : mod tests {
602 : use rand::{RngCore, SeedableRng};
603 :
604 : use crate::{
605 : models::ShardParameters,
606 : shard::{ShardCount, ShardNumber},
607 : };
608 :
609 : use super::*;
610 : use std::fmt::Write;
611 :
612 : // Helper function to create a key range.
613 : //
614 : // Make the tests below less verbose.
615 92 : fn kr(irange: Range<i128>) -> Range<Key> {
616 92 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
617 92 : }
618 :
619 : #[allow(dead_code)]
620 0 : fn dump_keyspace(ks: &KeySpace) {
621 0 : for r in ks.ranges.iter() {
622 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
623 0 : }
624 0 : }
625 :
626 22 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
627 22 : if actual.ranges != expected {
628 0 : let mut msg = String::new();
629 0 :
630 0 : writeln!(msg, "expected:").unwrap();
631 0 : for r in &expected {
632 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
633 0 : }
634 0 : writeln!(msg, "got:").unwrap();
635 0 : for r in &actual.ranges {
636 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
637 0 : }
638 0 : panic!("{}", msg);
639 22 : }
640 22 : }
641 :
642 : #[test]
643 2 : fn keyspace_consume() {
644 2 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
645 2 :
646 2 : let mut accum = KeySpaceAccum::new();
647 8 : for range in &ranges {
648 6 : accum.add_range(range.clone());
649 6 : }
650 :
651 2 : let expected_size: u64 = ranges
652 2 : .iter()
653 6 : .map(|r| ShardedRange::raw_size(r) as u64)
654 2 : .sum();
655 2 : assert_eq!(accum.raw_size(), expected_size);
656 :
657 2 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
658 2 : assert_eq!(accum.raw_size(), 0);
659 :
660 2 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
661 2 : assert_eq!(accum.raw_size(), 0);
662 :
663 8 : for range in &ranges {
664 6 : accum.add_range(range.clone());
665 6 : }
666 2 : assert_ks_eq(&accum.to_keyspace(), ranges);
667 2 : }
668 :
669 : #[test]
670 2 : fn keyspace_add_range() {
671 2 : // two separate ranges
672 2 : //
673 2 : // #####
674 2 : // #####
675 2 : let mut ks = KeySpaceRandomAccum::default();
676 2 : ks.add_range(kr(0..10));
677 2 : ks.add_range(kr(20..30));
678 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
679 2 :
680 2 : // two separate ranges, added in reverse order
681 2 : //
682 2 : // #####
683 2 : // #####
684 2 : let mut ks = KeySpaceRandomAccum::default();
685 2 : ks.add_range(kr(20..30));
686 2 : ks.add_range(kr(0..10));
687 2 :
688 2 : // add range that is adjacent to the end of an existing range
689 2 : //
690 2 : // #####
691 2 : // #####
692 2 : ks.add_range(kr(0..10));
693 2 : ks.add_range(kr(10..30));
694 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
695 2 :
696 2 : // add range that is adjacent to the start of an existing range
697 2 : //
698 2 : // #####
699 2 : // #####
700 2 : let mut ks = KeySpaceRandomAccum::default();
701 2 : ks.add_range(kr(10..30));
702 2 : ks.add_range(kr(0..10));
703 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
704 2 :
705 2 : // add range that overlaps with the end of an existing range
706 2 : //
707 2 : // #####
708 2 : // #####
709 2 : let mut ks = KeySpaceRandomAccum::default();
710 2 : ks.add_range(kr(0..10));
711 2 : ks.add_range(kr(5..30));
712 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
713 2 :
714 2 : // add range that overlaps with the start of an existing range
715 2 : //
716 2 : // #####
717 2 : // #####
718 2 : let mut ks = KeySpaceRandomAccum::default();
719 2 : ks.add_range(kr(5..30));
720 2 : ks.add_range(kr(0..10));
721 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
722 2 :
723 2 : // add range that is fully covered by an existing range
724 2 : //
725 2 : // #########
726 2 : // #####
727 2 : let mut ks = KeySpaceRandomAccum::default();
728 2 : ks.add_range(kr(0..30));
729 2 : ks.add_range(kr(10..20));
730 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
731 2 :
732 2 : // add range that extends an existing range from both ends
733 2 : //
734 2 : // #####
735 2 : // #########
736 2 : let mut ks = KeySpaceRandomAccum::default();
737 2 : ks.add_range(kr(10..20));
738 2 : ks.add_range(kr(0..30));
739 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
740 2 :
741 2 : // add a range that overlaps with two existing ranges, joining them
742 2 : //
743 2 : // ##### #####
744 2 : // #######
745 2 : let mut ks = KeySpaceRandomAccum::default();
746 2 : ks.add_range(kr(0..10));
747 2 : ks.add_range(kr(20..30));
748 2 : ks.add_range(kr(5..25));
749 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
750 2 : }
751 :
752 : #[test]
753 2 : fn keyspace_overlaps() {
754 2 : let mut ks = KeySpaceRandomAccum::default();
755 2 : ks.add_range(kr(10..20));
756 2 : ks.add_range(kr(30..40));
757 2 : let ks = ks.to_keyspace();
758 2 :
759 2 : // ##### #####
760 2 : // xxxx
761 2 : assert!(!ks.overlaps(&kr(0..5)));
762 :
763 : // ##### #####
764 : // xxxx
765 2 : assert!(!ks.overlaps(&kr(5..9)));
766 :
767 : // ##### #####
768 : // xxxx
769 2 : assert!(!ks.overlaps(&kr(5..10)));
770 :
771 : // ##### #####
772 : // xxxx
773 2 : assert!(ks.overlaps(&kr(5..11)));
774 :
775 : // ##### #####
776 : // xxxx
777 2 : assert!(ks.overlaps(&kr(10..15)));
778 :
779 : // ##### #####
780 : // xxxx
781 2 : assert!(ks.overlaps(&kr(15..20)));
782 :
783 : // ##### #####
784 : // xxxx
785 2 : assert!(ks.overlaps(&kr(15..25)));
786 :
787 : // ##### #####
788 : // xxxx
789 2 : assert!(!ks.overlaps(&kr(22..28)));
790 :
791 : // ##### #####
792 : // xxxx
793 2 : assert!(!ks.overlaps(&kr(25..30)));
794 :
795 : // ##### #####
796 : // xxxx
797 2 : assert!(ks.overlaps(&kr(35..35)));
798 :
799 : // ##### #####
800 : // xxxx
801 2 : assert!(!ks.overlaps(&kr(40..45)));
802 :
803 : // ##### #####
804 : // xxxx
805 2 : assert!(!ks.overlaps(&kr(45..50)));
806 :
807 : // ##### #####
808 : // xxxxxxxxxxx
809 2 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
810 2 : }
811 :
812 : #[test]
813 2 : fn test_remove_full_overlapps() {
814 2 : let mut key_space1 = KeySpace {
815 2 : ranges: vec![
816 2 : Key::from_i128(1)..Key::from_i128(4),
817 2 : Key::from_i128(5)..Key::from_i128(8),
818 2 : Key::from_i128(10)..Key::from_i128(12),
819 2 : ],
820 2 : };
821 2 : let key_space2 = KeySpace {
822 2 : ranges: vec![
823 2 : Key::from_i128(2)..Key::from_i128(3),
824 2 : Key::from_i128(6)..Key::from_i128(7),
825 2 : Key::from_i128(11)..Key::from_i128(13),
826 2 : ],
827 2 : };
828 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
829 2 : let removed_expected = KeySpace {
830 2 : ranges: vec![
831 2 : Key::from_i128(2)..Key::from_i128(3),
832 2 : Key::from_i128(6)..Key::from_i128(7),
833 2 : Key::from_i128(11)..Key::from_i128(12),
834 2 : ],
835 2 : };
836 2 : assert_eq!(removed, removed_expected);
837 :
838 2 : assert_eq!(
839 2 : key_space1.ranges,
840 2 : vec![
841 2 : Key::from_i128(1)..Key::from_i128(2),
842 2 : Key::from_i128(3)..Key::from_i128(4),
843 2 : Key::from_i128(5)..Key::from_i128(6),
844 2 : Key::from_i128(7)..Key::from_i128(8),
845 2 : Key::from_i128(10)..Key::from_i128(11)
846 2 : ]
847 2 : );
848 2 : }
849 :
850 : #[test]
851 2 : fn test_remove_partial_overlaps() {
852 2 : // Test partial ovelaps
853 2 : let mut key_space1 = KeySpace {
854 2 : ranges: vec![
855 2 : Key::from_i128(1)..Key::from_i128(5),
856 2 : Key::from_i128(7)..Key::from_i128(10),
857 2 : Key::from_i128(12)..Key::from_i128(15),
858 2 : ],
859 2 : };
860 2 : let key_space2 = KeySpace {
861 2 : ranges: vec![
862 2 : Key::from_i128(3)..Key::from_i128(6),
863 2 : Key::from_i128(8)..Key::from_i128(11),
864 2 : Key::from_i128(14)..Key::from_i128(17),
865 2 : ],
866 2 : };
867 2 :
868 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
869 2 : let removed_expected = KeySpace {
870 2 : ranges: vec![
871 2 : Key::from_i128(3)..Key::from_i128(5),
872 2 : Key::from_i128(8)..Key::from_i128(10),
873 2 : Key::from_i128(14)..Key::from_i128(15),
874 2 : ],
875 2 : };
876 2 : assert_eq!(removed, removed_expected);
877 :
878 2 : assert_eq!(
879 2 : key_space1.ranges,
880 2 : vec![
881 2 : Key::from_i128(1)..Key::from_i128(3),
882 2 : Key::from_i128(7)..Key::from_i128(8),
883 2 : Key::from_i128(12)..Key::from_i128(14),
884 2 : ]
885 2 : );
886 2 : }
887 :
888 : #[test]
889 2 : fn test_remove_no_overlaps() {
890 2 : let mut key_space1 = KeySpace {
891 2 : ranges: vec![
892 2 : Key::from_i128(1)..Key::from_i128(5),
893 2 : Key::from_i128(7)..Key::from_i128(10),
894 2 : Key::from_i128(12)..Key::from_i128(15),
895 2 : ],
896 2 : };
897 2 : let key_space2 = KeySpace {
898 2 : ranges: vec![
899 2 : Key::from_i128(6)..Key::from_i128(7),
900 2 : Key::from_i128(11)..Key::from_i128(12),
901 2 : Key::from_i128(15)..Key::from_i128(17),
902 2 : ],
903 2 : };
904 2 :
905 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
906 2 : let removed_expected = KeySpace::default();
907 2 : assert_eq!(removed, removed_expected);
908 :
909 2 : assert_eq!(
910 2 : key_space1.ranges,
911 2 : vec![
912 2 : Key::from_i128(1)..Key::from_i128(5),
913 2 : Key::from_i128(7)..Key::from_i128(10),
914 2 : Key::from_i128(12)..Key::from_i128(15),
915 2 : ]
916 2 : );
917 2 : }
918 :
919 : #[test]
920 2 : fn test_remove_one_range_overlaps_multiple() {
921 2 : let mut key_space1 = KeySpace {
922 2 : ranges: vec![
923 2 : Key::from_i128(1)..Key::from_i128(3),
924 2 : Key::from_i128(3)..Key::from_i128(6),
925 2 : Key::from_i128(6)..Key::from_i128(10),
926 2 : Key::from_i128(12)..Key::from_i128(15),
927 2 : Key::from_i128(17)..Key::from_i128(20),
928 2 : Key::from_i128(20)..Key::from_i128(30),
929 2 : Key::from_i128(30)..Key::from_i128(40),
930 2 : ],
931 2 : };
932 2 : let key_space2 = KeySpace {
933 2 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
934 2 : };
935 2 :
936 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
937 2 : let removed_expected = KeySpace {
938 2 : ranges: vec![
939 2 : Key::from_i128(9)..Key::from_i128(10),
940 2 : Key::from_i128(12)..Key::from_i128(15),
941 2 : Key::from_i128(17)..Key::from_i128(19),
942 2 : ],
943 2 : };
944 2 : assert_eq!(removed, removed_expected);
945 :
946 2 : assert_eq!(
947 2 : key_space1.ranges,
948 2 : vec![
949 2 : Key::from_i128(1)..Key::from_i128(3),
950 2 : Key::from_i128(3)..Key::from_i128(6),
951 2 : Key::from_i128(6)..Key::from_i128(9),
952 2 : Key::from_i128(19)..Key::from_i128(20),
953 2 : Key::from_i128(20)..Key::from_i128(30),
954 2 : Key::from_i128(30)..Key::from_i128(40),
955 2 : ]
956 2 : );
957 2 : }
958 : #[test]
959 2 : fn sharded_range_relation_gap() {
960 2 : let shard_identity = ShardIdentity::new(
961 2 : ShardNumber(0),
962 2 : ShardCount::new(4),
963 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
964 2 : )
965 2 : .unwrap();
966 2 :
967 2 : let range = ShardedRange::new(
968 2 : Range {
969 2 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
970 2 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
971 2 : },
972 2 : &shard_identity,
973 2 : );
974 2 :
975 2 : // Key range spans relations, expect MAX
976 2 : assert_eq!(range.page_count(), u32::MAX);
977 2 : }
978 :
979 : #[test]
980 2 : fn shard_identity_keyspaces_single_key() {
981 2 : let shard_identity = ShardIdentity::new(
982 2 : ShardNumber(1),
983 2 : ShardCount::new(4),
984 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
985 2 : )
986 2 : .unwrap();
987 2 :
988 2 : let range = ShardedRange::new(
989 2 : Range {
990 2 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
991 2 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
992 2 : },
993 2 : &shard_identity,
994 2 : );
995 2 : // Single-key range on logical size key
996 2 : assert_eq!(range.page_count(), 1);
997 2 : }
998 :
999 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
1000 : #[test]
1001 2 : fn contiguous_range_check() {
1002 2 : assert!(!is_contiguous_range(
1003 2 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1004 2 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1005 2 : ),);
1006 :
1007 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1008 : // not considered a rel block range because 0xffffffff stores logical sizes,
1009 : // not blocks.
1010 2 : assert!(!is_contiguous_range(
1011 2 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1012 2 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1013 2 : ),);
1014 :
1015 : // Keys within the normal data region of a relation
1016 2 : assert!(is_contiguous_range(
1017 2 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1018 2 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1019 2 : ),);
1020 :
1021 : // The logical size key of one forkno, then some blocks in the next
1022 2 : assert!(is_contiguous_range(
1023 2 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1024 2 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1025 2 : ),);
1026 2 : }
1027 :
1028 : #[test]
1029 2 : fn shard_identity_keyspaces_forkno_gap() {
1030 2 : let shard_identity = ShardIdentity::new(
1031 2 : ShardNumber(1),
1032 2 : ShardCount::new(4),
1033 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1034 2 : )
1035 2 : .unwrap();
1036 2 :
1037 2 : let range = ShardedRange::new(
1038 2 : Range {
1039 2 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1040 2 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1041 2 : },
1042 2 : &shard_identity,
1043 2 : );
1044 2 :
1045 2 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1046 2 : // calculate a valid size, because we have no way to know if they keys between start
1047 2 : // and end are actually in use.
1048 2 : assert_eq!(range.page_count(), u32::MAX);
1049 2 : }
1050 :
1051 : #[test]
1052 2 : fn shard_identity_keyspaces_one_relation() {
1053 10 : for shard_number in 0..4 {
1054 8 : let shard_identity = ShardIdentity::new(
1055 8 : ShardNumber(shard_number),
1056 8 : ShardCount::new(4),
1057 8 : ShardParameters::DEFAULT_STRIPE_SIZE,
1058 8 : )
1059 8 : .unwrap();
1060 8 :
1061 8 : let range = ShardedRange::new(
1062 8 : Range {
1063 8 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1064 8 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1065 8 : },
1066 8 : &shard_identity,
1067 8 : );
1068 8 :
1069 8 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1070 8 : if shard_number == 0 {
1071 2 : assert_eq!(range.page_count(), 1);
1072 : } else {
1073 : // Other shards should perceive the range's size as zero
1074 6 : assert_eq!(range.page_count(), 0);
1075 : }
1076 : }
1077 2 : }
1078 :
1079 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1080 : /// the total page count in the range and the fragments.
1081 2024 : fn do_fragment(
1082 2024 : range_start: Key,
1083 2024 : range_end: Key,
1084 2024 : shard_identity: &ShardIdentity,
1085 2024 : target_nblocks: u32,
1086 2024 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1087 2024 : let range = ShardedRange::new(
1088 2024 : Range {
1089 2024 : start: range_start,
1090 2024 : end: range_end,
1091 2024 : },
1092 2024 : shard_identity,
1093 2024 : );
1094 2024 :
1095 2024 : let page_count = range.page_count();
1096 2024 : let fragments = range.fragment(target_nblocks);
1097 2024 :
1098 2024 : // Invariant: we always get at least one fragment
1099 2024 : assert!(!fragments.is_empty());
1100 :
1101 : // Invariant: the first/last fragment start/end should equal the input start/end
1102 2024 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1103 2024 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1104 :
1105 2024 : if page_count > 0 {
1106 : // Invariant: every fragment must contain at least one shard-local page, if the
1107 : // total range contains at least one shard-local page
1108 1374 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1109 1108 : if !all_nonzero {
1110 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1111 1108 : }
1112 1108 : assert!(all_nonzero);
1113 : } else {
1114 : // A range with no shard-local pages should always be returned as a single fragment
1115 916 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1116 : }
1117 :
1118 : // Invariant: fragments must be ordered and non-overlapping
1119 2024 : let mut last: Option<Range<Key>> = None;
1120 4314 : for frag in &fragments {
1121 2290 : if let Some(last) = last {
1122 266 : assert!(frag.1.start >= last.end);
1123 266 : assert!(frag.1.start > last.start);
1124 2024 : }
1125 2290 : last = Some(frag.1.clone())
1126 : }
1127 :
1128 : // Invariant: fragments respect target_nblocks
1129 4314 : for frag in &fragments {
1130 2290 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1131 : }
1132 :
1133 2024 : (page_count, fragments)
1134 2024 : }
1135 :
1136 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1137 : /// for a single tenant.
1138 : #[test]
1139 2 : fn sharded_range_fragment_simple() {
1140 2 : let shard_identity = ShardIdentity::new(
1141 2 : ShardNumber(0),
1142 2 : ShardCount::new(4),
1143 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1144 2 : )
1145 2 : .unwrap();
1146 2 :
1147 2 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1148 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1149 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
1150 2 :
1151 2 : // Ask for stripe_size blocks, we get the whole stripe
1152 2 : assert_eq!(
1153 2 : do_fragment(input_start, input_end, &shard_identity, 32768),
1154 2 : (32768, vec![(32768, input_start..input_end)])
1155 2 : );
1156 :
1157 : // Ask for more, we still get the whole stripe
1158 2 : assert_eq!(
1159 2 : do_fragment(input_start, input_end, &shard_identity, 10000000),
1160 2 : (32768, vec![(32768, input_start..input_end)])
1161 2 : );
1162 :
1163 : // Ask for target_nblocks of half the stripe size, we get two halves
1164 2 : assert_eq!(
1165 2 : do_fragment(input_start, input_end, &shard_identity, 16384),
1166 2 : (
1167 2 : 32768,
1168 2 : vec![
1169 2 : (16384, input_start..input_start.add(16384)),
1170 2 : (16384, input_start.add(16384)..input_end)
1171 2 : ]
1172 2 : )
1173 2 : );
1174 2 : }
1175 :
1176 : #[test]
1177 2 : fn sharded_range_fragment_multi_stripe() {
1178 2 : let shard_identity = ShardIdentity::new(
1179 2 : ShardNumber(0),
1180 2 : ShardCount::new(4),
1181 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1182 2 : )
1183 2 : .unwrap();
1184 2 :
1185 2 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1186 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1187 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1188 2 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1189 2 : // its size to be just the blocks belonging to our shard.
1190 2 : assert_eq!(
1191 2 : do_fragment(input_start, input_end, &shard_identity, 131072),
1192 2 : (32768, vec![(32768, input_start..input_end)])
1193 2 : );
1194 :
1195 : // Ask for a sub-stripe quantity
1196 2 : assert_eq!(
1197 2 : do_fragment(input_start, input_end, &shard_identity, 16000),
1198 2 : (
1199 2 : 32768,
1200 2 : vec![
1201 2 : (16000, input_start..input_start.add(16000)),
1202 2 : (16000, input_start.add(16000)..input_start.add(32000)),
1203 2 : (768, input_start.add(32000)..input_end),
1204 2 : ]
1205 2 : )
1206 2 : );
1207 :
1208 : // Try on a range that starts slightly after our owned stripe
1209 2 : assert_eq!(
1210 2 : do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
1211 2 : (32767, vec![(32767, input_start.add(1)..input_end)])
1212 2 : );
1213 2 : }
1214 :
1215 : /// Test our calculations work correctly when we start a range from the logical size key of
1216 : /// a previous relation.
1217 : #[test]
1218 2 : fn sharded_range_fragment_starting_from_logical_size() {
1219 2 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1220 2 : let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
1221 2 :
1222 2 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1223 2 : let shard_identity = ShardIdentity::new(
1224 2 : ShardNumber(0),
1225 2 : ShardCount::new(4),
1226 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1227 2 : )
1228 2 : .unwrap();
1229 2 : assert_eq!(
1230 2 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1231 2 : (0x8001, vec![(0x8001, input_start..input_end)])
1232 2 : );
1233 :
1234 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1235 : // store all logical sizes)
1236 2 : let shard_identity = ShardIdentity::new(
1237 2 : ShardNumber(1),
1238 2 : ShardCount::new(4),
1239 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1240 2 : )
1241 2 : .unwrap();
1242 2 : assert_eq!(
1243 2 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1244 2 : (0x1, vec![(0x1, input_start..input_end)])
1245 2 : );
1246 2 : }
1247 :
1248 : /// Test that ShardedRange behaves properly when used on un-sharded data
1249 : #[test]
1250 2 : fn sharded_range_fragment_unsharded() {
1251 2 : let shard_identity = ShardIdentity::unsharded();
1252 2 :
1253 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1254 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1255 2 : assert_eq!(
1256 2 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1257 2 : (
1258 2 : 0x10000,
1259 2 : vec![
1260 2 : (0x8000, input_start..input_start.add(0x8000)),
1261 2 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1262 2 : ]
1263 2 : )
1264 2 : );
1265 2 : }
1266 :
1267 : #[test]
1268 2 : fn sharded_range_fragment_cross_relation() {
1269 2 : let shard_identity = ShardIdentity::unsharded();
1270 2 :
1271 2 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1272 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1273 2 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1274 2 : assert_eq!(
1275 2 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1276 2 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1277 2 : );
1278 :
1279 : // Same, but using a sharded identity
1280 2 : let shard_identity = ShardIdentity::new(
1281 2 : ShardNumber(0),
1282 2 : ShardCount::new(4),
1283 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1284 2 : )
1285 2 : .unwrap();
1286 2 : assert_eq!(
1287 2 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1288 2 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1289 2 : );
1290 2 : }
1291 :
1292 : #[test]
1293 2 : fn sharded_range_fragment_tiny_nblocks() {
1294 2 : let shard_identity = ShardIdentity::unsharded();
1295 2 :
1296 2 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1297 2 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1298 2 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1299 2 : assert_eq!(
1300 2 : do_fragment(input_start, input_end, &shard_identity, 16),
1301 2 : (
1302 2 : 0x38,
1303 2 : vec![
1304 2 : (16, input_start..input_start.add(16)),
1305 2 : (16, input_start.add(16)..input_start.add(32)),
1306 2 : (16, input_start.add(32)..input_start.add(48)),
1307 2 : (8, input_start.add(48)..input_end),
1308 2 : ]
1309 2 : )
1310 2 : );
1311 2 : }
1312 :
1313 : #[test]
1314 2 : fn sharded_range_fragment_fuzz() {
1315 2 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1316 2 : // the test to be reproducible.
1317 2 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1318 :
1319 2002 : for _i in 0..1000 {
1320 2000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1321 1038 : ShardIdentity::unsharded()
1322 : } else {
1323 962 : let shard_count = prng.next_u32() % 127 + 1;
1324 962 : ShardIdentity::new(
1325 962 : ShardNumber((prng.next_u32() % shard_count) as u8),
1326 962 : ShardCount::new(shard_count as u8),
1327 962 : ShardParameters::DEFAULT_STRIPE_SIZE,
1328 962 : )
1329 962 : .unwrap()
1330 : };
1331 :
1332 2000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1333 2000 :
1334 2000 : let start_offset = prng.next_u32() % 16384;
1335 2000 :
1336 2000 : // Try ranges up to 4GiB in size, that are always at least 1
1337 2000 : let range_size = prng.next_u32() % 8192 + 1;
1338 2000 :
1339 2000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1340 2000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1341 2000 : .unwrap()
1342 2000 : .add(start_offset);
1343 2000 : let input_end = input_start.add(range_size);
1344 2000 :
1345 2000 : // This test's main success conditions are the invariants baked into do_fragment
1346 2000 : let (_total_size, fragments) =
1347 2000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1348 2000 :
1349 2000 : // Pick a random key within the range and check it appears in the output
1350 2000 : let example_key = input_start.add(prng.next_u32() % range_size);
1351 2000 :
1352 2000 : // Panic on unwrap if it isn't found
1353 2000 : let example_key_frag = fragments
1354 2000 : .iter()
1355 2146 : .find(|f| f.1.contains(&example_key))
1356 2000 : .unwrap();
1357 2000 :
1358 2000 : // Check that the fragment containing our random key has a nonzero size if
1359 2000 : // that key is shard-local
1360 2000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1361 2000 : if example_key_local {
1362 1084 : assert!(example_key_frag.0 > 0);
1363 916 : }
1364 : }
1365 2 : }
1366 : }
|