Line data Source code
1 : use postgres_ffi::BLCKSZ;
2 : use std::ops::Range;
3 :
4 : use crate::{
5 : key::Key,
6 : shard::{ShardCount, ShardIdentity},
7 : };
8 : use itertools::Itertools;
9 :
10 : ///
11 : /// Represents a set of Keys, in a compact form.
12 : ///
13 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
14 : pub struct KeySpace {
15 : /// Contiguous ranges of keys that belong to the key space. In key order,
16 : /// and with no overlap.
17 : pub ranges: Vec<Range<Key>>,
18 : }
19 :
20 : /// A wrapper type for sparse keyspaces.
21 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
22 : pub struct SparseKeySpace(pub KeySpace);
23 :
24 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
25 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
26 : /// shard.
27 : ///
28 : /// When we iterate over keys within this object, we will skip any keys that don't belong
29 : /// to this shard.
30 : ///
31 : /// The start + end keys may not belong to the shard: these specify where layer files should
32 : /// start + end, but we will never actually read/write those keys.
33 : #[derive(Clone, Debug, PartialEq, Eq)]
34 : pub struct ShardedRange<'a> {
35 : pub shard_identity: &'a ShardIdentity,
36 : pub range: Range<Key>,
37 : }
38 :
39 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
40 : // top page in the previous relation's space.
41 32040212 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
42 32040212 : debug_assert!(is_contiguous_range(range));
43 32040212 : if range.start.field6 == 0xffffffff {
44 8 : range.end.field6 + 1
45 : } else {
46 32040204 : range.end.field6 - range.start.field6
47 : }
48 32040212 : }
49 :
50 : /// Return true if this key range includes only keys in the same relation's data blocks, or
51 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
52 : ///
53 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
54 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
55 : /// of a given contiguous range is present on one shard.
56 : ///
57 : /// This matters, because:
58 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
59 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
60 64097338 : fn is_contiguous_range(range: &Range<Key>) -> bool {
61 64097338 : range.start.field1 == range.end.field1
62 64095930 : && range.start.field2 == range.end.field2
63 64095906 : && range.start.field3 == range.end.field3
64 64080120 : && range.start.field4 == range.end.field4
65 64080118 : && (range.start.field5 == range.end.field5
66 24 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
67 64097338 : }
68 :
69 : impl<'a> ShardedRange<'a> {
70 3418 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
71 3418 : Self {
72 3418 : shard_identity,
73 3418 : range,
74 3418 : }
75 3418 : }
76 :
77 : /// Break up this range into chunks, each of which has at least one local key in it if the
78 : /// total range has at least one local key.
79 3404 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
80 3404 : // Optimization for single-key case (e.g. logical size keys)
81 3404 : if self.range.end == self.range.start.add(1) {
82 1152 : return vec![(
83 1152 : if self.shard_identity.is_key_disposable(&self.range.start) {
84 0 : 0
85 : } else {
86 1152 : 1
87 : },
88 1152 : self.range,
89 : )];
90 2252 : }
91 2252 :
92 2252 : if !is_contiguous_range(&self.range) {
93 : // Ranges that span relations are not fragmented. We only get these ranges as a result
94 : // of operations that act on existing layers, so we trust that the existing range is
95 : // reasonably small.
96 4 : return vec![(u32::MAX, self.range)];
97 2248 : }
98 2248 :
99 2248 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
100 2248 :
101 2248 : let mut cursor = self.range.start;
102 4790 : while cursor < self.range.end {
103 2542 : let advance_by = self.distance_to_next_boundary(cursor);
104 2542 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
105 :
106 : // If the previous fragment is undersized, then we seek to consume enough
107 : // blocks to complete it.
108 2542 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
109 294 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
110 272 : Some(frag) => {
111 272 : // Prev block is complete, want the full number.
112 272 : (
113 272 : target_nblocks,
114 272 : if is_fragment_disposable {
115 : // If this current range will be empty (not shard-local data), we will merge into previous
116 0 : Some(frag)
117 : } else {
118 272 : None
119 : },
120 : )
121 : }
122 : None => {
123 : // First iteration, want the full number
124 2248 : (target_nblocks, None)
125 : }
126 : };
127 :
128 2542 : let advance_by = if is_fragment_disposable {
129 936 : advance_by
130 : } else {
131 1606 : std::cmp::min(advance_by, want_blocks)
132 : };
133 :
134 2542 : let next_cursor = cursor.add(advance_by);
135 :
136 2542 : let this_frag = (
137 2542 : if is_fragment_disposable {
138 936 : 0
139 : } else {
140 1606 : advance_by
141 : },
142 2542 : cursor..next_cursor,
143 2542 : );
144 2542 : cursor = next_cursor;
145 :
146 2542 : if let Some(last_fragment) = merge_last_fragment {
147 22 : // Previous fragment was short or this one is empty, merge into it
148 22 : last_fragment.0 += this_frag.0;
149 22 : last_fragment.1.end = this_frag.1.end;
150 2520 : } else {
151 2520 : fragments.push(this_frag);
152 2520 : }
153 : }
154 :
155 2248 : fragments
156 3404 : }
157 :
158 : /// Estimate the physical pages that are within this range, on this shard. This returns
159 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
160 2038 : pub fn page_count(&self) -> u32 {
161 2038 : // Special cases for single keys like logical sizes
162 2038 : if self.range.end == self.range.start.add(1) {
163 12 : return if self.shard_identity.is_key_disposable(&self.range.start) {
164 6 : 0
165 : } else {
166 6 : 1
167 : };
168 2026 : }
169 2026 :
170 2026 : // We can only do an authentic calculation of contiguous key ranges
171 2026 : if !is_contiguous_range(&self.range) {
172 8 : return u32::MAX;
173 2018 : }
174 2018 :
175 2018 : // Special case for single sharded tenants: our logical and physical sizes are the same
176 2018 : if self.shard_identity.count < ShardCount::new(2) {
177 1048 : return contiguous_range_len(&self.range);
178 970 : }
179 970 :
180 970 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
181 970 : // to Self, and add the stripe's block count to our total if so.
182 970 : let mut result: u64 = 0;
183 970 : let mut cursor = self.range.start;
184 1962 : while cursor < self.range.end {
185 : // Count up to the next stripe_size boundary or end of range
186 992 : let advance_by = self.distance_to_next_boundary(cursor);
187 992 :
188 992 : // If this blocks in this stripe belong to us, add them to our count
189 992 : if !self.shard_identity.is_key_disposable(&cursor) {
190 56 : result += advance_by as u64;
191 936 : }
192 :
193 992 : cursor = cursor.add(advance_by);
194 : }
195 :
196 970 : if result > u32::MAX as u64 {
197 0 : u32::MAX
198 : } else {
199 970 : result as u32
200 : }
201 2038 : }
202 :
203 : /// Advance the cursor to the next potential fragment boundary: this is either
204 : /// a stripe boundary, or the end of the range.
205 3534 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
206 3534 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
207 3534 :
208 3534 : if self.shard_identity.count < ShardCount::new(2) {
209 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
210 1536 : return distance_to_range_end;
211 1998 : }
212 1998 :
213 1998 : if cursor.field6 == 0xffffffff {
214 : // We are wrapping from one relation's logical size to the next relation's first data block
215 8 : return 1;
216 1990 : }
217 1990 :
218 1990 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
219 1990 : let stripe_remainder = self.shard_identity.stripe_size.0
220 1990 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
221 1990 :
222 1990 : if cfg!(debug_assertions) {
223 : // We should never overflow field5 and field6 -- our callers check this earlier
224 : // and would have returned their u32::MAX cases if the input range violated this.
225 1990 : let next_cursor = cursor.add(stripe_remainder);
226 1990 : debug_assert!(
227 1990 : next_cursor.field1 == cursor.field1
228 1990 : && next_cursor.field2 == cursor.field2
229 1990 : && next_cursor.field3 == cursor.field3
230 1990 : && next_cursor.field4 == cursor.field4
231 1990 : && next_cursor.field5 == cursor.field5
232 : )
233 0 : }
234 :
235 1990 : std::cmp::min(stripe_remainder, distance_to_range_end)
236 3534 : }
237 :
238 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
239 : /// this function simply calculates the number of pages in the space, without accounting for those
240 : /// pages that would not actually be stored on this node.
241 : ///
242 : /// Don't use this function in code that works with physical entities like layer files.
243 32052840 : pub fn raw_size(range: &Range<Key>) -> u32 {
244 32052840 : if is_contiguous_range(range) {
245 32035630 : contiguous_range_len(range)
246 : } else {
247 17210 : u32::MAX
248 : }
249 32052840 : }
250 : }
251 :
252 : impl KeySpace {
253 : /// Create a key space with a single range.
254 374 : pub fn single(key_range: Range<Key>) -> Self {
255 374 : Self {
256 374 : ranges: vec![key_range],
257 374 : }
258 374 : }
259 :
260 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
261 : /// in each partition.
262 : ///
263 230 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
264 230 : // Assume that each value is 8k in size.
265 230 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
266 230 :
267 230 : let mut parts = Vec::new();
268 230 : let mut current_part = Vec::new();
269 230 : let mut current_part_size: usize = 0;
270 1610 : for range in &self.ranges {
271 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
272 : // will respect shard striping rather than assuming all keys within a range are present.
273 1380 : let range = ShardedRange::new(range.clone(), shard_identity);
274 :
275 : // Chunk up the range into parts that each contain up to target_size local blocks
276 1386 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
277 : // If appending the next contiguous range in the keyspace to the current
278 : // partition would cause it to be too large, and our current partition
279 : // covers at least one block that is physically present in this shard,
280 : // then start a new partition
281 1386 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
282 26 : && current_part_size > 0
283 26 : {
284 26 : parts.push(KeySpace {
285 26 : ranges: current_part,
286 26 : });
287 26 : current_part = Vec::new();
288 26 : current_part_size = 0;
289 1360 : }
290 1386 : current_part.push(frag_range.start..frag_range.end);
291 1386 : current_part_size += frag_on_shard_size as usize;
292 : }
293 : }
294 :
295 : // add last partition that wasn't full yet.
296 230 : if !current_part.is_empty() {
297 230 : parts.push(KeySpace {
298 230 : ranges: current_part,
299 230 : });
300 230 : }
301 :
302 230 : KeyPartitioning { parts }
303 230 : }
304 :
305 1408 : pub fn is_empty(&self) -> bool {
306 1408 : self.total_raw_size() == 0
307 1408 : }
308 :
309 : /// Merge another keyspace into the current one.
310 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
311 64644 : pub fn merge(&mut self, other: &KeySpace) {
312 64644 : let all_ranges = self
313 64644 : .ranges
314 64644 : .iter()
315 27676235 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
316 64644 :
317 64644 : let mut accum = KeySpaceAccum::new();
318 64644 : let mut prev: Option<&Range<Key>> = None;
319 27878724 : for range in all_ranges {
320 27814080 : if let Some(prev) = prev {
321 27750082 : let overlap =
322 27750082 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
323 27750082 : assert!(
324 27750082 : !overlap,
325 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
326 : prev, range
327 : );
328 63998 : }
329 :
330 27814080 : accum.add_range(range.clone());
331 27814080 : prev = Some(range);
332 : }
333 :
334 64644 : self.ranges = accum.to_keyspace().ranges;
335 64644 : }
336 :
337 : /// Remove all keys in `other` from `self`.
338 : /// This can involve splitting or removing of existing ranges.
339 : /// Returns the removed keyspace
340 2022 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
341 2022 : let (self_start, self_end) = match (self.start(), self.end()) {
342 1966 : (Some(start), Some(end)) => (start, end),
343 : _ => {
344 : // self is empty
345 56 : return KeySpace::default();
346 : }
347 : };
348 :
349 : // Key spaces are sorted by definition, so skip ahead to the first
350 : // potentially intersecting range. Similarly, ignore ranges that start
351 : // after the current keyspace ends.
352 1966 : let other_ranges = other
353 1966 : .ranges
354 1966 : .iter()
355 1966 : .skip_while(|range| self_start >= range.end)
356 156720 : .take_while(|range| self_end > range.start);
357 1966 :
358 1966 : let mut removed_accum = KeySpaceRandomAccum::new();
359 158636 : for range in other_ranges {
360 461298 : while let Some(overlap_at) = self.overlaps_at(range) {
361 304628 : let overlapped = self.ranges[overlap_at].clone();
362 304628 :
363 304628 : if overlapped.start < range.start && overlapped.end <= range.end {
364 14 : // Higher part of the range is completely overlapped.
365 14 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
366 14 : self.ranges[overlap_at].end = range.start;
367 304614 : }
368 304628 : if overlapped.start >= range.start && overlapped.end > range.end {
369 68 : // Lower part of the range is completely overlapped.
370 68 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
371 68 : self.ranges[overlap_at].start = range.end;
372 304560 : }
373 304628 : if overlapped.start < range.start && overlapped.end > range.end {
374 155978 : // Middle part of the range is overlapped.
375 155978 : removed_accum.add_range(range.clone());
376 155978 : self.ranges[overlap_at].end = range.start;
377 155978 : self.ranges
378 155978 : .insert(overlap_at + 1, range.end..overlapped.end);
379 155978 : }
380 304628 : if overlapped.start >= range.start && overlapped.end <= range.end {
381 148568 : // Whole range is overlapped
382 148568 : removed_accum.add_range(self.ranges[overlap_at].clone());
383 148568 : self.ranges.remove(overlap_at);
384 156060 : }
385 : }
386 : }
387 :
388 1966 : removed_accum.to_keyspace()
389 2022 : }
390 :
391 2042 : pub fn start(&self) -> Option<Key> {
392 2042 : self.ranges.first().map(|range| range.start)
393 2042 : }
394 :
395 2022 : pub fn end(&self) -> Option<Key> {
396 2022 : self.ranges.last().map(|range| range.end)
397 2022 : }
398 :
399 : /// The size of the keyspace in pages, before accounting for sharding
400 2662 : pub fn total_raw_size(&self) -> usize {
401 2662 : self.ranges
402 2662 : .iter()
403 73818 : .map(|range| ShardedRange::raw_size(range) as usize)
404 2662 : .sum()
405 2662 : }
406 :
407 535378 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
408 4140450 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
409 102 : Ok(0) => None,
410 956 : Err(0) => None,
411 198046 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
412 336274 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
413 198715 : _ => None,
414 : }
415 535378 : }
416 :
417 : ///
418 : /// Check if key space contains overlapping range
419 : ///
420 74080 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
421 74080 : self.overlaps_at(range).is_some()
422 74080 : }
423 :
424 : /// Check if the keyspace contains a key
425 72328 : pub fn contains(&self, key: &Key) -> bool {
426 72328 : self.overlaps(&(*key..key.next()))
427 72328 : }
428 : }
429 :
430 : ///
431 : /// Represents a partitioning of the key space.
432 : ///
433 : /// The only kind of partitioning we do is to partition the key space into
434 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
435 : /// But this data structure could represent any partitioning.
436 : ///
437 : #[derive(Clone, Debug, Default)]
438 : pub struct KeyPartitioning {
439 : pub parts: Vec<KeySpace>,
440 : }
441 :
442 : /// Represents a partitioning of the sparse key space.
443 : #[derive(Clone, Debug, Default)]
444 : pub struct SparseKeyPartitioning {
445 : pub parts: Vec<SparseKeySpace>,
446 : }
447 :
448 : impl KeyPartitioning {
449 704 : pub fn new() -> Self {
450 704 : KeyPartitioning { parts: Vec::new() }
451 704 : }
452 :
453 : /// Convert a key partitioning to a sparse partition.
454 352 : pub fn into_sparse(self) -> SparseKeyPartitioning {
455 352 : SparseKeyPartitioning {
456 352 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
457 352 : }
458 352 : }
459 : }
460 :
461 : impl SparseKeyPartitioning {
462 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
463 : /// cause long/dead loops.
464 366 : pub fn into_dense(self) -> KeyPartitioning {
465 366 : KeyPartitioning {
466 366 : parts: self.parts.into_iter().map(|x| x.0).collect(),
467 366 : }
468 366 : }
469 : }
470 :
471 : ///
472 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
473 : /// object. This takes care of merging adjacent keys and key ranges into
474 : /// contiguous ranges.
475 : ///
476 : #[derive(Clone, Debug, Default)]
477 : pub struct KeySpaceAccum {
478 : accum: Option<Range<Key>>,
479 :
480 : ranges: Vec<Range<Key>>,
481 : size: u64,
482 : }
483 :
484 : impl KeySpaceAccum {
485 144626 : pub fn new() -> Self {
486 144626 : Self {
487 144626 : accum: None,
488 144626 : ranges: Vec::new(),
489 144626 : size: 0,
490 144626 : }
491 144626 : }
492 :
493 : #[inline(always)]
494 4080756 : pub fn add_key(&mut self, key: Key) {
495 4080756 : self.add_range(singleton_range(key))
496 4080756 : }
497 :
498 : #[inline(always)]
499 31979016 : pub fn add_range(&mut self, range: Range<Key>) {
500 31979016 : self.size += ShardedRange::raw_size(&range) as u64;
501 31979016 :
502 31979016 : match self.accum.as_mut() {
503 31821218 : Some(accum) => {
504 31821218 : if range.start == accum.end {
505 4066948 : accum.end = range.end;
506 4066948 : } else {
507 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
508 : // a new range here if the skipped region was all keys that don't belong on this shard.
509 : // (https://github.com/neondatabase/neon/issues/6247)
510 27754270 : assert!(range.start > accum.end);
511 27754270 : self.ranges.push(accum.clone());
512 27754270 : *accum = range;
513 : }
514 : }
515 157798 : None => self.accum = Some(range),
516 : }
517 31979016 : }
518 :
519 153328 : pub fn to_keyspace(mut self) -> KeySpace {
520 153328 : if let Some(accum) = self.accum.take() {
521 149400 : self.ranges.push(accum);
522 149400 : }
523 153328 : KeySpace {
524 153328 : ranges: self.ranges,
525 153328 : }
526 153328 : }
527 :
528 778 : pub fn consume_keyspace(&mut self) -> KeySpace {
529 778 : std::mem::take(self).to_keyspace()
530 778 : }
531 :
532 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
533 : // the keys to be omitted in storage on this shard.
534 1806 : pub fn raw_size(&self) -> u64 {
535 1806 : self.size
536 1806 : }
537 : }
538 :
539 : ///
540 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
541 : /// object. Key ranges may be inserted in any order and can overlap.
542 : ///
543 : #[derive(Clone, Debug, Default)]
544 : pub struct KeySpaceRandomAccum {
545 : ranges: Vec<Range<Key>>,
546 : }
547 :
548 : impl KeySpaceRandomAccum {
549 4514 : pub fn new() -> Self {
550 4514 : Self { ranges: Vec::new() }
551 4514 : }
552 :
553 78418 : pub fn add_key(&mut self, key: Key) {
554 78418 : self.add_range(singleton_range(key))
555 78418 : }
556 :
557 383190 : pub fn add_range(&mut self, range: Range<Key>) {
558 383190 : self.ranges.push(range);
559 383190 : }
560 :
561 3356 : pub fn to_keyspace(mut self) -> KeySpace {
562 3356 : let mut ranges = Vec::new();
563 3356 : if !self.ranges.is_empty() {
564 765354 : self.ranges.sort_by_key(|r| r.start);
565 1206 : let mut start = self.ranges.first().unwrap().start;
566 1206 : let mut end = self.ranges.first().unwrap().end;
567 384396 : for r in self.ranges {
568 383190 : assert!(r.start >= start);
569 383190 : if r.start > end {
570 381614 : ranges.push(start..end);
571 381614 : start = r.start;
572 381614 : end = r.end;
573 381614 : } else if r.end > end {
574 356 : end = r.end;
575 1220 : }
576 : }
577 1206 : ranges.push(start..end);
578 2150 : }
579 3356 : KeySpace { ranges }
580 3356 : }
581 :
582 1372 : pub fn consume_keyspace(&mut self) -> KeySpace {
583 1372 : let mut prev_accum = KeySpaceRandomAccum::new();
584 1372 : std::mem::swap(self, &mut prev_accum);
585 1372 :
586 1372 : prev_accum.to_keyspace()
587 1372 : }
588 : }
589 :
590 4159174 : pub fn singleton_range(key: Key) -> Range<Key> {
591 4159174 : key..key.next()
592 4159174 : }
593 :
594 : #[cfg(test)]
595 : mod tests {
596 : use rand::{RngCore, SeedableRng};
597 :
598 : use crate::{
599 : models::ShardParameters,
600 : shard::{ShardCount, ShardNumber},
601 : };
602 :
603 : use super::*;
604 : use std::fmt::Write;
605 :
606 : // Helper function to create a key range.
607 : //
608 : // Make the tests below less verbose.
609 92 : fn kr(irange: Range<i128>) -> Range<Key> {
610 92 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
611 92 : }
612 :
613 : #[allow(dead_code)]
614 0 : fn dump_keyspace(ks: &KeySpace) {
615 0 : for r in ks.ranges.iter() {
616 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
617 0 : }
618 0 : }
619 :
620 22 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
621 22 : if actual.ranges != expected {
622 0 : let mut msg = String::new();
623 0 :
624 0 : writeln!(msg, "expected:").unwrap();
625 0 : for r in &expected {
626 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
627 0 : }
628 0 : writeln!(msg, "got:").unwrap();
629 0 : for r in &actual.ranges {
630 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
631 0 : }
632 0 : panic!("{}", msg);
633 22 : }
634 22 : }
635 :
636 : #[test]
637 2 : fn keyspace_consume() {
638 2 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
639 2 :
640 2 : let mut accum = KeySpaceAccum::new();
641 8 : for range in &ranges {
642 6 : accum.add_range(range.clone());
643 6 : }
644 :
645 2 : let expected_size: u64 = ranges
646 2 : .iter()
647 6 : .map(|r| ShardedRange::raw_size(r) as u64)
648 2 : .sum();
649 2 : assert_eq!(accum.raw_size(), expected_size);
650 :
651 2 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
652 2 : assert_eq!(accum.raw_size(), 0);
653 :
654 2 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
655 2 : assert_eq!(accum.raw_size(), 0);
656 :
657 8 : for range in &ranges {
658 6 : accum.add_range(range.clone());
659 6 : }
660 2 : assert_ks_eq(&accum.to_keyspace(), ranges);
661 2 : }
662 :
663 : #[test]
664 2 : fn keyspace_add_range() {
665 2 : // two separate ranges
666 2 : //
667 2 : // #####
668 2 : // #####
669 2 : let mut ks = KeySpaceRandomAccum::default();
670 2 : ks.add_range(kr(0..10));
671 2 : ks.add_range(kr(20..30));
672 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
673 2 :
674 2 : // two separate ranges, added in reverse order
675 2 : //
676 2 : // #####
677 2 : // #####
678 2 : let mut ks = KeySpaceRandomAccum::default();
679 2 : ks.add_range(kr(20..30));
680 2 : ks.add_range(kr(0..10));
681 2 :
682 2 : // add range that is adjacent to the end of an existing range
683 2 : //
684 2 : // #####
685 2 : // #####
686 2 : ks.add_range(kr(0..10));
687 2 : ks.add_range(kr(10..30));
688 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
689 2 :
690 2 : // add range that is adjacent to the start of an existing range
691 2 : //
692 2 : // #####
693 2 : // #####
694 2 : let mut ks = KeySpaceRandomAccum::default();
695 2 : ks.add_range(kr(10..30));
696 2 : ks.add_range(kr(0..10));
697 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
698 2 :
699 2 : // add range that overlaps with the end of an existing range
700 2 : //
701 2 : // #####
702 2 : // #####
703 2 : let mut ks = KeySpaceRandomAccum::default();
704 2 : ks.add_range(kr(0..10));
705 2 : ks.add_range(kr(5..30));
706 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
707 2 :
708 2 : // add range that overlaps with the start of an existing range
709 2 : //
710 2 : // #####
711 2 : // #####
712 2 : let mut ks = KeySpaceRandomAccum::default();
713 2 : ks.add_range(kr(5..30));
714 2 : ks.add_range(kr(0..10));
715 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
716 2 :
717 2 : // add range that is fully covered by an existing range
718 2 : //
719 2 : // #########
720 2 : // #####
721 2 : let mut ks = KeySpaceRandomAccum::default();
722 2 : ks.add_range(kr(0..30));
723 2 : ks.add_range(kr(10..20));
724 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
725 2 :
726 2 : // add range that extends an existing range from both ends
727 2 : //
728 2 : // #####
729 2 : // #########
730 2 : let mut ks = KeySpaceRandomAccum::default();
731 2 : ks.add_range(kr(10..20));
732 2 : ks.add_range(kr(0..30));
733 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
734 2 :
735 2 : // add a range that overlaps with two existing ranges, joining them
736 2 : //
737 2 : // ##### #####
738 2 : // #######
739 2 : let mut ks = KeySpaceRandomAccum::default();
740 2 : ks.add_range(kr(0..10));
741 2 : ks.add_range(kr(20..30));
742 2 : ks.add_range(kr(5..25));
743 2 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
744 2 : }
745 :
746 : #[test]
747 2 : fn keyspace_overlaps() {
748 2 : let mut ks = KeySpaceRandomAccum::default();
749 2 : ks.add_range(kr(10..20));
750 2 : ks.add_range(kr(30..40));
751 2 : let ks = ks.to_keyspace();
752 2 :
753 2 : // ##### #####
754 2 : // xxxx
755 2 : assert!(!ks.overlaps(&kr(0..5)));
756 :
757 : // ##### #####
758 : // xxxx
759 2 : assert!(!ks.overlaps(&kr(5..9)));
760 :
761 : // ##### #####
762 : // xxxx
763 2 : assert!(!ks.overlaps(&kr(5..10)));
764 :
765 : // ##### #####
766 : // xxxx
767 2 : assert!(ks.overlaps(&kr(5..11)));
768 :
769 : // ##### #####
770 : // xxxx
771 2 : assert!(ks.overlaps(&kr(10..15)));
772 :
773 : // ##### #####
774 : // xxxx
775 2 : assert!(ks.overlaps(&kr(15..20)));
776 :
777 : // ##### #####
778 : // xxxx
779 2 : assert!(ks.overlaps(&kr(15..25)));
780 :
781 : // ##### #####
782 : // xxxx
783 2 : assert!(!ks.overlaps(&kr(22..28)));
784 :
785 : // ##### #####
786 : // xxxx
787 2 : assert!(!ks.overlaps(&kr(25..30)));
788 :
789 : // ##### #####
790 : // xxxx
791 2 : assert!(ks.overlaps(&kr(35..35)));
792 :
793 : // ##### #####
794 : // xxxx
795 2 : assert!(!ks.overlaps(&kr(40..45)));
796 :
797 : // ##### #####
798 : // xxxx
799 2 : assert!(!ks.overlaps(&kr(45..50)));
800 :
801 : // ##### #####
802 : // xxxxxxxxxxx
803 2 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
804 2 : }
805 :
806 : #[test]
807 2 : fn test_remove_full_overlapps() {
808 2 : let mut key_space1 = KeySpace {
809 2 : ranges: vec![
810 2 : Key::from_i128(1)..Key::from_i128(4),
811 2 : Key::from_i128(5)..Key::from_i128(8),
812 2 : Key::from_i128(10)..Key::from_i128(12),
813 2 : ],
814 2 : };
815 2 : let key_space2 = KeySpace {
816 2 : ranges: vec![
817 2 : Key::from_i128(2)..Key::from_i128(3),
818 2 : Key::from_i128(6)..Key::from_i128(7),
819 2 : Key::from_i128(11)..Key::from_i128(13),
820 2 : ],
821 2 : };
822 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
823 2 : let removed_expected = KeySpace {
824 2 : ranges: vec![
825 2 : Key::from_i128(2)..Key::from_i128(3),
826 2 : Key::from_i128(6)..Key::from_i128(7),
827 2 : Key::from_i128(11)..Key::from_i128(12),
828 2 : ],
829 2 : };
830 2 : assert_eq!(removed, removed_expected);
831 :
832 2 : assert_eq!(
833 2 : key_space1.ranges,
834 2 : vec![
835 2 : Key::from_i128(1)..Key::from_i128(2),
836 2 : Key::from_i128(3)..Key::from_i128(4),
837 2 : Key::from_i128(5)..Key::from_i128(6),
838 2 : Key::from_i128(7)..Key::from_i128(8),
839 2 : Key::from_i128(10)..Key::from_i128(11)
840 2 : ]
841 2 : );
842 2 : }
843 :
844 : #[test]
845 2 : fn test_remove_partial_overlaps() {
846 2 : // Test partial ovelaps
847 2 : let mut key_space1 = KeySpace {
848 2 : ranges: vec![
849 2 : Key::from_i128(1)..Key::from_i128(5),
850 2 : Key::from_i128(7)..Key::from_i128(10),
851 2 : Key::from_i128(12)..Key::from_i128(15),
852 2 : ],
853 2 : };
854 2 : let key_space2 = KeySpace {
855 2 : ranges: vec![
856 2 : Key::from_i128(3)..Key::from_i128(6),
857 2 : Key::from_i128(8)..Key::from_i128(11),
858 2 : Key::from_i128(14)..Key::from_i128(17),
859 2 : ],
860 2 : };
861 2 :
862 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
863 2 : let removed_expected = KeySpace {
864 2 : ranges: vec![
865 2 : Key::from_i128(3)..Key::from_i128(5),
866 2 : Key::from_i128(8)..Key::from_i128(10),
867 2 : Key::from_i128(14)..Key::from_i128(15),
868 2 : ],
869 2 : };
870 2 : assert_eq!(removed, removed_expected);
871 :
872 2 : assert_eq!(
873 2 : key_space1.ranges,
874 2 : vec![
875 2 : Key::from_i128(1)..Key::from_i128(3),
876 2 : Key::from_i128(7)..Key::from_i128(8),
877 2 : Key::from_i128(12)..Key::from_i128(14),
878 2 : ]
879 2 : );
880 2 : }
881 :
882 : #[test]
883 2 : fn test_remove_no_overlaps() {
884 2 : let mut key_space1 = KeySpace {
885 2 : ranges: vec![
886 2 : Key::from_i128(1)..Key::from_i128(5),
887 2 : Key::from_i128(7)..Key::from_i128(10),
888 2 : Key::from_i128(12)..Key::from_i128(15),
889 2 : ],
890 2 : };
891 2 : let key_space2 = KeySpace {
892 2 : ranges: vec![
893 2 : Key::from_i128(6)..Key::from_i128(7),
894 2 : Key::from_i128(11)..Key::from_i128(12),
895 2 : Key::from_i128(15)..Key::from_i128(17),
896 2 : ],
897 2 : };
898 2 :
899 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
900 2 : let removed_expected = KeySpace::default();
901 2 : assert_eq!(removed, removed_expected);
902 :
903 2 : assert_eq!(
904 2 : key_space1.ranges,
905 2 : vec![
906 2 : Key::from_i128(1)..Key::from_i128(5),
907 2 : Key::from_i128(7)..Key::from_i128(10),
908 2 : Key::from_i128(12)..Key::from_i128(15),
909 2 : ]
910 2 : );
911 2 : }
912 :
913 : #[test]
914 2 : fn test_remove_one_range_overlaps_multiple() {
915 2 : let mut key_space1 = KeySpace {
916 2 : ranges: vec![
917 2 : Key::from_i128(1)..Key::from_i128(3),
918 2 : Key::from_i128(3)..Key::from_i128(6),
919 2 : Key::from_i128(6)..Key::from_i128(10),
920 2 : Key::from_i128(12)..Key::from_i128(15),
921 2 : Key::from_i128(17)..Key::from_i128(20),
922 2 : Key::from_i128(20)..Key::from_i128(30),
923 2 : Key::from_i128(30)..Key::from_i128(40),
924 2 : ],
925 2 : };
926 2 : let key_space2 = KeySpace {
927 2 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
928 2 : };
929 2 :
930 2 : let removed = key_space1.remove_overlapping_with(&key_space2);
931 2 : let removed_expected = KeySpace {
932 2 : ranges: vec![
933 2 : Key::from_i128(9)..Key::from_i128(10),
934 2 : Key::from_i128(12)..Key::from_i128(15),
935 2 : Key::from_i128(17)..Key::from_i128(19),
936 2 : ],
937 2 : };
938 2 : assert_eq!(removed, removed_expected);
939 :
940 2 : assert_eq!(
941 2 : key_space1.ranges,
942 2 : vec![
943 2 : Key::from_i128(1)..Key::from_i128(3),
944 2 : Key::from_i128(3)..Key::from_i128(6),
945 2 : Key::from_i128(6)..Key::from_i128(9),
946 2 : Key::from_i128(19)..Key::from_i128(20),
947 2 : Key::from_i128(20)..Key::from_i128(30),
948 2 : Key::from_i128(30)..Key::from_i128(40),
949 2 : ]
950 2 : );
951 2 : }
952 : #[test]
953 2 : fn sharded_range_relation_gap() {
954 2 : let shard_identity = ShardIdentity::new(
955 2 : ShardNumber(0),
956 2 : ShardCount::new(4),
957 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
958 2 : )
959 2 : .unwrap();
960 2 :
961 2 : let range = ShardedRange::new(
962 2 : Range {
963 2 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
964 2 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
965 2 : },
966 2 : &shard_identity,
967 2 : );
968 2 :
969 2 : // Key range spans relations, expect MAX
970 2 : assert_eq!(range.page_count(), u32::MAX);
971 2 : }
972 :
973 : #[test]
974 2 : fn shard_identity_keyspaces_single_key() {
975 2 : let shard_identity = ShardIdentity::new(
976 2 : ShardNumber(1),
977 2 : ShardCount::new(4),
978 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
979 2 : )
980 2 : .unwrap();
981 2 :
982 2 : let range = ShardedRange::new(
983 2 : Range {
984 2 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
985 2 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
986 2 : },
987 2 : &shard_identity,
988 2 : );
989 2 : // Single-key range on logical size key
990 2 : assert_eq!(range.page_count(), 1);
991 2 : }
992 :
993 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
994 : #[test]
995 2 : fn contiguous_range_check() {
996 2 : assert!(!is_contiguous_range(
997 2 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
998 2 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
999 2 : ),);
1000 :
1001 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1002 : // not considered a rel block range because 0xffffffff stores logical sizes,
1003 : // not blocks.
1004 2 : assert!(!is_contiguous_range(
1005 2 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1006 2 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1007 2 : ),);
1008 :
1009 : // Keys within the normal data region of a relation
1010 2 : assert!(is_contiguous_range(
1011 2 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1012 2 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1013 2 : ),);
1014 :
1015 : // The logical size key of one forkno, then some blocks in the next
1016 2 : assert!(is_contiguous_range(
1017 2 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1018 2 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1019 2 : ),);
1020 2 : }
1021 :
1022 : #[test]
1023 2 : fn shard_identity_keyspaces_forkno_gap() {
1024 2 : let shard_identity = ShardIdentity::new(
1025 2 : ShardNumber(1),
1026 2 : ShardCount::new(4),
1027 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1028 2 : )
1029 2 : .unwrap();
1030 2 :
1031 2 : let range = ShardedRange::new(
1032 2 : Range {
1033 2 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1034 2 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1035 2 : },
1036 2 : &shard_identity,
1037 2 : );
1038 2 :
1039 2 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1040 2 : // calculate a valid size, because we have no way to know if they keys between start
1041 2 : // and end are actually in use.
1042 2 : assert_eq!(range.page_count(), u32::MAX);
1043 2 : }
1044 :
1045 : #[test]
1046 2 : fn shard_identity_keyspaces_one_relation() {
1047 10 : for shard_number in 0..4 {
1048 8 : let shard_identity = ShardIdentity::new(
1049 8 : ShardNumber(shard_number),
1050 8 : ShardCount::new(4),
1051 8 : ShardParameters::DEFAULT_STRIPE_SIZE,
1052 8 : )
1053 8 : .unwrap();
1054 8 :
1055 8 : let range = ShardedRange::new(
1056 8 : Range {
1057 8 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1058 8 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1059 8 : },
1060 8 : &shard_identity,
1061 8 : );
1062 8 :
1063 8 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1064 8 : if shard_number == 0 {
1065 2 : assert_eq!(range.page_count(), 1);
1066 : } else {
1067 : // Other shards should perceive the range's size as zero
1068 6 : assert_eq!(range.page_count(), 0);
1069 : }
1070 : }
1071 2 : }
1072 :
1073 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1074 : /// the total page count in the range and the fragments.
1075 2024 : fn do_fragment(
1076 2024 : range_start: Key,
1077 2024 : range_end: Key,
1078 2024 : shard_identity: &ShardIdentity,
1079 2024 : target_nblocks: u32,
1080 2024 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1081 2024 : let range = ShardedRange::new(
1082 2024 : Range {
1083 2024 : start: range_start,
1084 2024 : end: range_end,
1085 2024 : },
1086 2024 : shard_identity,
1087 2024 : );
1088 2024 :
1089 2024 : let page_count = range.page_count();
1090 2024 : let fragments = range.fragment(target_nblocks);
1091 2024 :
1092 2024 : // Invariant: we always get at least one fragment
1093 2024 : assert!(!fragments.is_empty());
1094 :
1095 : // Invariant: the first/last fragment start/end should equal the input start/end
1096 2024 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1097 2024 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1098 :
1099 2024 : if page_count > 0 {
1100 : // Invariant: every fragment must contain at least one shard-local page, if the
1101 : // total range contains at least one shard-local page
1102 1374 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1103 1108 : if !all_nonzero {
1104 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1105 1108 : }
1106 1108 : assert!(all_nonzero);
1107 : } else {
1108 : // A range with no shard-local pages should always be returned as a single fragment
1109 916 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1110 : }
1111 :
1112 : // Invariant: fragments must be ordered and non-overlapping
1113 2024 : let mut last: Option<Range<Key>> = None;
1114 4314 : for frag in &fragments {
1115 2290 : if let Some(last) = last {
1116 266 : assert!(frag.1.start >= last.end);
1117 266 : assert!(frag.1.start > last.start);
1118 2024 : }
1119 2290 : last = Some(frag.1.clone())
1120 : }
1121 :
1122 : // Invariant: fragments respect target_nblocks
1123 4314 : for frag in &fragments {
1124 2290 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1125 : }
1126 :
1127 2024 : (page_count, fragments)
1128 2024 : }
1129 :
1130 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1131 : /// for a single tenant.
1132 : #[test]
1133 2 : fn sharded_range_fragment_simple() {
1134 2 : let shard_identity = ShardIdentity::new(
1135 2 : ShardNumber(0),
1136 2 : ShardCount::new(4),
1137 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1138 2 : )
1139 2 : .unwrap();
1140 2 :
1141 2 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1142 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1143 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
1144 2 :
1145 2 : // Ask for stripe_size blocks, we get the whole stripe
1146 2 : assert_eq!(
1147 2 : do_fragment(input_start, input_end, &shard_identity, 32768),
1148 2 : (32768, vec![(32768, input_start..input_end)])
1149 2 : );
1150 :
1151 : // Ask for more, we still get the whole stripe
1152 2 : assert_eq!(
1153 2 : do_fragment(input_start, input_end, &shard_identity, 10000000),
1154 2 : (32768, vec![(32768, input_start..input_end)])
1155 2 : );
1156 :
1157 : // Ask for target_nblocks of half the stripe size, we get two halves
1158 2 : assert_eq!(
1159 2 : do_fragment(input_start, input_end, &shard_identity, 16384),
1160 2 : (
1161 2 : 32768,
1162 2 : vec![
1163 2 : (16384, input_start..input_start.add(16384)),
1164 2 : (16384, input_start.add(16384)..input_end)
1165 2 : ]
1166 2 : )
1167 2 : );
1168 2 : }
1169 :
1170 : #[test]
1171 2 : fn sharded_range_fragment_multi_stripe() {
1172 2 : let shard_identity = ShardIdentity::new(
1173 2 : ShardNumber(0),
1174 2 : ShardCount::new(4),
1175 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1176 2 : )
1177 2 : .unwrap();
1178 2 :
1179 2 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1180 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1181 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1182 2 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1183 2 : // its size to be just the blocks belonging to our shard.
1184 2 : assert_eq!(
1185 2 : do_fragment(input_start, input_end, &shard_identity, 131072),
1186 2 : (32768, vec![(32768, input_start..input_end)])
1187 2 : );
1188 :
1189 : // Ask for a sub-stripe quantity
1190 2 : assert_eq!(
1191 2 : do_fragment(input_start, input_end, &shard_identity, 16000),
1192 2 : (
1193 2 : 32768,
1194 2 : vec![
1195 2 : (16000, input_start..input_start.add(16000)),
1196 2 : (16000, input_start.add(16000)..input_start.add(32000)),
1197 2 : (768, input_start.add(32000)..input_end),
1198 2 : ]
1199 2 : )
1200 2 : );
1201 :
1202 : // Try on a range that starts slightly after our owned stripe
1203 2 : assert_eq!(
1204 2 : do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
1205 2 : (32767, vec![(32767, input_start.add(1)..input_end)])
1206 2 : );
1207 2 : }
1208 :
1209 : /// Test our calculations work correctly when we start a range from the logical size key of
1210 : /// a previous relation.
1211 : #[test]
1212 2 : fn sharded_range_fragment_starting_from_logical_size() {
1213 2 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1214 2 : let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
1215 2 :
1216 2 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1217 2 : let shard_identity = ShardIdentity::new(
1218 2 : ShardNumber(0),
1219 2 : ShardCount::new(4),
1220 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1221 2 : )
1222 2 : .unwrap();
1223 2 : assert_eq!(
1224 2 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1225 2 : (0x8001, vec![(0x8001, input_start..input_end)])
1226 2 : );
1227 :
1228 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1229 : // store all logical sizes)
1230 2 : let shard_identity = ShardIdentity::new(
1231 2 : ShardNumber(1),
1232 2 : ShardCount::new(4),
1233 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1234 2 : )
1235 2 : .unwrap();
1236 2 : assert_eq!(
1237 2 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1238 2 : (0x1, vec![(0x1, input_start..input_end)])
1239 2 : );
1240 2 : }
1241 :
1242 : /// Test that ShardedRange behaves properly when used on un-sharded data
1243 : #[test]
1244 2 : fn sharded_range_fragment_unsharded() {
1245 2 : let shard_identity = ShardIdentity::unsharded();
1246 2 :
1247 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1248 2 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1249 2 : assert_eq!(
1250 2 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1251 2 : (
1252 2 : 0x10000,
1253 2 : vec![
1254 2 : (0x8000, input_start..input_start.add(0x8000)),
1255 2 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1256 2 : ]
1257 2 : )
1258 2 : );
1259 2 : }
1260 :
1261 : #[test]
1262 2 : fn sharded_range_fragment_cross_relation() {
1263 2 : let shard_identity = ShardIdentity::unsharded();
1264 2 :
1265 2 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1266 2 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1267 2 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1268 2 : assert_eq!(
1269 2 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1270 2 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1271 2 : );
1272 :
1273 : // Same, but using a sharded identity
1274 2 : let shard_identity = ShardIdentity::new(
1275 2 : ShardNumber(0),
1276 2 : ShardCount::new(4),
1277 2 : ShardParameters::DEFAULT_STRIPE_SIZE,
1278 2 : )
1279 2 : .unwrap();
1280 2 : assert_eq!(
1281 2 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1282 2 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1283 2 : );
1284 2 : }
1285 :
1286 : #[test]
1287 2 : fn sharded_range_fragment_tiny_nblocks() {
1288 2 : let shard_identity = ShardIdentity::unsharded();
1289 2 :
1290 2 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1291 2 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1292 2 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1293 2 : assert_eq!(
1294 2 : do_fragment(input_start, input_end, &shard_identity, 16),
1295 2 : (
1296 2 : 0x38,
1297 2 : vec![
1298 2 : (16, input_start..input_start.add(16)),
1299 2 : (16, input_start.add(16)..input_start.add(32)),
1300 2 : (16, input_start.add(32)..input_start.add(48)),
1301 2 : (8, input_start.add(48)..input_end),
1302 2 : ]
1303 2 : )
1304 2 : );
1305 2 : }
1306 :
1307 : #[test]
1308 2 : fn sharded_range_fragment_fuzz() {
1309 2 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1310 2 : // the test to be reproducible.
1311 2 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1312 :
1313 2002 : for _i in 0..1000 {
1314 2000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1315 1038 : ShardIdentity::unsharded()
1316 : } else {
1317 962 : let shard_count = prng.next_u32() % 127 + 1;
1318 962 : ShardIdentity::new(
1319 962 : ShardNumber((prng.next_u32() % shard_count) as u8),
1320 962 : ShardCount::new(shard_count as u8),
1321 962 : ShardParameters::DEFAULT_STRIPE_SIZE,
1322 962 : )
1323 962 : .unwrap()
1324 : };
1325 :
1326 2000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1327 2000 :
1328 2000 : let start_offset = prng.next_u32() % 16384;
1329 2000 :
1330 2000 : // Try ranges up to 4GiB in size, that are always at least 1
1331 2000 : let range_size = prng.next_u32() % 8192 + 1;
1332 2000 :
1333 2000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1334 2000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1335 2000 : .unwrap()
1336 2000 : .add(start_offset);
1337 2000 : let input_end = input_start.add(range_size);
1338 2000 :
1339 2000 : // This test's main success conditions are the invariants baked into do_fragment
1340 2000 : let (_total_size, fragments) =
1341 2000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1342 2000 :
1343 2000 : // Pick a random key within the range and check it appears in the output
1344 2000 : let example_key = input_start.add(prng.next_u32() % range_size);
1345 2000 :
1346 2000 : // Panic on unwrap if it isn't found
1347 2000 : let example_key_frag = fragments
1348 2000 : .iter()
1349 2146 : .find(|f| f.1.contains(&example_key))
1350 2000 : .unwrap();
1351 2000 :
1352 2000 : // Check that the fragment containing our random key has a nonzero size if
1353 2000 : // that key is shard-local
1354 2000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1355 2000 : if example_key_local {
1356 1084 : assert!(example_key_frag.0 > 0);
1357 916 : }
1358 : }
1359 2 : }
1360 : }
|