Line data Source code
1 : use postgres_ffi::BLCKSZ;
2 : use std::ops::Range;
3 :
4 : use crate::{
5 : key::Key,
6 : shard::{ShardCount, ShardIdentity},
7 : };
8 : use itertools::Itertools;
9 :
10 : ///
11 : /// Represents a set of Keys, in a compact form.
12 : ///
13 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
14 : pub struct KeySpace {
15 : /// Contiguous ranges of keys that belong to the key space. In key order,
16 : /// and with no overlap.
17 : pub ranges: Vec<Range<Key>>,
18 : }
19 :
20 : impl std::fmt::Display for KeySpace {
21 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 0 : write!(f, "[")?;
23 0 : for range in &self.ranges {
24 0 : write!(f, "{}..{},", range.start, range.end)?;
25 : }
26 0 : write!(f, "]")
27 0 : }
28 : }
29 :
30 : /// A wrapper type for sparse keyspaces.
31 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
32 : pub struct SparseKeySpace(pub KeySpace);
33 :
34 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
35 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
36 : /// shard.
37 : ///
38 : /// When we iterate over keys within this object, we will skip any keys that don't belong
39 : /// to this shard.
40 : ///
41 : /// The start + end keys may not belong to the shard: these specify where layer files should
42 : /// start + end, but we will never actually read/write those keys.
43 : #[derive(Clone, Debug, PartialEq, Eq)]
44 : pub struct ShardedRange<'a> {
45 : pub shard_identity: &'a ShardIdentity,
46 : pub range: Range<Key>,
47 : }
48 :
49 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
50 : // top page in the previous relation's space.
51 7307452 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
52 7307452 : debug_assert!(is_contiguous_range(range));
53 7307452 : if range.start.field6 == 0xffffffff {
54 827104 : range.end.field6 + 1
55 : } else {
56 6480348 : range.end.field6 - range.start.field6
57 : }
58 7307452 : }
59 :
60 : /// Return true if this key range includes only keys in the same relation's data blocks, or
61 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
62 : ///
63 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
64 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
65 : /// of a given contiguous range is present on one shard.
66 : ///
67 : /// This matters, because:
68 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
69 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
70 14615859 : fn is_contiguous_range(range: &Range<Key>) -> bool {
71 14615859 : range.start.field1 == range.end.field1
72 14614845 : && range.start.field2 == range.end.field2
73 14614821 : && range.start.field3 == range.end.field3
74 14614751 : && range.start.field4 == range.end.field4
75 14614750 : && (range.start.field5 == range.end.field5
76 1654212 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
77 14615859 : }
78 :
79 : impl<'a> ShardedRange<'a> {
80 2735 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
81 2735 : Self {
82 2735 : shard_identity,
83 2735 : range,
84 2735 : }
85 2735 : }
86 :
87 : /// Break up this range into chunks, each of which has at least one local key in it if the
88 : /// total range has at least one local key.
89 2728 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
90 2728 : // Optimization for single-key case (e.g. logical size keys)
91 2728 : if self.range.end == self.range.start.add(1) {
92 1431 : return vec![(
93 1431 : if self.shard_identity.is_key_disposable(&self.range.start) {
94 0 : 0
95 : } else {
96 1431 : 1
97 : },
98 1431 : self.range,
99 : )];
100 1297 : }
101 1297 :
102 1297 : if !is_contiguous_range(&self.range) {
103 : // Ranges that span relations are not fragmented. We only get these ranges as a result
104 : // of operations that act on existing layers, so we trust that the existing range is
105 : // reasonably small.
106 2 : return vec![(u32::MAX, self.range)];
107 1295 : }
108 1295 :
109 1295 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
110 1295 :
111 1295 : let mut cursor = self.range.start;
112 2738 : while cursor < self.range.end {
113 1443 : let advance_by = self.distance_to_next_boundary(cursor);
114 1443 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
115 :
116 : // If the previous fragment is undersized, then we seek to consume enough
117 : // blocks to complete it.
118 1443 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
119 148 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
120 137 : Some(frag) => {
121 137 : // Prev block is complete, want the full number.
122 137 : (
123 137 : target_nblocks,
124 137 : if is_fragment_disposable {
125 : // If this current range will be empty (not shard-local data), we will merge into previous
126 0 : Some(frag)
127 : } else {
128 137 : None
129 : },
130 : )
131 : }
132 : None => {
133 : // First iteration, want the full number
134 1295 : (target_nblocks, None)
135 : }
136 : };
137 :
138 1443 : let advance_by = if is_fragment_disposable {
139 468 : advance_by
140 : } else {
141 975 : std::cmp::min(advance_by, want_blocks)
142 : };
143 :
144 1443 : let next_cursor = cursor.add(advance_by);
145 :
146 1443 : let this_frag = (
147 1443 : if is_fragment_disposable {
148 468 : 0
149 : } else {
150 975 : advance_by
151 : },
152 1443 : cursor..next_cursor,
153 1443 : );
154 1443 : cursor = next_cursor;
155 :
156 1443 : if let Some(last_fragment) = merge_last_fragment {
157 11 : // Previous fragment was short or this one is empty, merge into it
158 11 : last_fragment.0 += this_frag.0;
159 11 : last_fragment.1.end = this_frag.1.end;
160 1432 : } else {
161 1432 : fragments.push(this_frag);
162 1432 : }
163 : }
164 :
165 1295 : fragments
166 2728 : }
167 :
168 : /// Estimate the physical pages that are within this range, on this shard. This returns
169 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
170 1019 : pub fn page_count(&self) -> u32 {
171 1019 : // Special cases for single keys like logical sizes
172 1019 : if self.range.end == self.range.start.add(1) {
173 6 : return if self.shard_identity.is_key_disposable(&self.range.start) {
174 3 : 0
175 : } else {
176 3 : 1
177 : };
178 1013 : }
179 1013 :
180 1013 : // We can only do an authentic calculation of contiguous key ranges
181 1013 : if !is_contiguous_range(&self.range) {
182 4 : return u32::MAX;
183 1009 : }
184 1009 :
185 1009 : // Special case for single sharded tenants: our logical and physical sizes are the same
186 1009 : if self.shard_identity.count < ShardCount::new(2) {
187 524 : return contiguous_range_len(&self.range);
188 485 : }
189 485 :
190 485 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
191 485 : // to Self, and add the stripe's block count to our total if so.
192 485 : let mut result: u64 = 0;
193 485 : let mut cursor = self.range.start;
194 981 : while cursor < self.range.end {
195 : // Count up to the next stripe_size boundary or end of range
196 496 : let advance_by = self.distance_to_next_boundary(cursor);
197 496 :
198 496 : // If this blocks in this stripe belong to us, add them to our count
199 496 : if !self.shard_identity.is_key_disposable(&cursor) {
200 28 : result += advance_by as u64;
201 468 : }
202 :
203 496 : cursor = cursor.add(advance_by);
204 : }
205 :
206 485 : if result > u32::MAX as u64 {
207 0 : u32::MAX
208 : } else {
209 485 : result as u32
210 : }
211 1019 : }
212 :
213 : /// Advance the cursor to the next potential fragment boundary: this is either
214 : /// a stripe boundary, or the end of the range.
215 1939 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
216 1939 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
217 1939 :
218 1939 : if self.shard_identity.count < ShardCount::new(2) {
219 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
220 940 : return distance_to_range_end;
221 999 : }
222 999 :
223 999 : if cursor.field6 == 0xffffffff {
224 : // We are wrapping from one relation's logical size to the next relation's first data block
225 4 : return 1;
226 995 : }
227 995 :
228 995 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
229 995 : let stripe_remainder = self.shard_identity.stripe_size.0
230 995 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
231 995 :
232 995 : if cfg!(debug_assertions) {
233 : // We should never overflow field5 and field6 -- our callers check this earlier
234 : // and would have returned their u32::MAX cases if the input range violated this.
235 995 : let next_cursor = cursor.add(stripe_remainder);
236 995 : debug_assert!(
237 995 : next_cursor.field1 == cursor.field1
238 995 : && next_cursor.field2 == cursor.field2
239 995 : && next_cursor.field3 == cursor.field3
240 995 : && next_cursor.field4 == cursor.field4
241 995 : && next_cursor.field5 == cursor.field5
242 : )
243 0 : }
244 :
245 995 : std::cmp::min(stripe_remainder, distance_to_range_end)
246 1939 : }
247 :
248 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
249 : /// this function simply calculates the number of pages in the space, without accounting for those
250 : /// pages that would not actually be stored on this node.
251 : ///
252 : /// Don't use this function in code that works with physical entities like layer files.
253 7306093 : pub fn raw_size(range: &Range<Key>) -> u32 {
254 7306093 : if is_contiguous_range(range) {
255 7304989 : contiguous_range_len(range)
256 : } else {
257 1104 : u32::MAX
258 : }
259 7306093 : }
260 : }
261 :
262 : impl KeySpace {
263 : /// Create a key space with a single range.
264 8055 : pub fn single(key_range: Range<Key>) -> Self {
265 8055 : Self {
266 8055 : ranges: vec![key_range],
267 8055 : }
268 8055 : }
269 :
270 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
271 : /// in each partition.
272 : ///
273 286 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
274 286 : // Assume that each value is 8k in size.
275 286 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
276 286 :
277 286 : let mut parts = Vec::new();
278 286 : let mut current_part = Vec::new();
279 286 : let mut current_part_size: usize = 0;
280 2002 : for range in &self.ranges {
281 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
282 : // will respect shard striping rather than assuming all keys within a range are present.
283 1716 : let range = ShardedRange::new(range.clone(), shard_identity);
284 :
285 : // Chunk up the range into parts that each contain up to target_size local blocks
286 1720 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
287 : // If appending the next contiguous range in the keyspace to the current
288 : // partition would cause it to be too large, and our current partition
289 : // covers at least one block that is physically present in this shard,
290 : // then start a new partition
291 1720 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
292 24 : && current_part_size > 0
293 24 : {
294 24 : parts.push(KeySpace {
295 24 : ranges: current_part,
296 24 : });
297 24 : current_part = Vec::new();
298 24 : current_part_size = 0;
299 1696 : }
300 1720 : current_part.push(frag_range.start..frag_range.end);
301 1720 : current_part_size += frag_on_shard_size as usize;
302 : }
303 : }
304 :
305 : // add last partition that wasn't full yet.
306 286 : if !current_part.is_empty() {
307 286 : parts.push(KeySpace {
308 286 : ranges: current_part,
309 286 : });
310 286 : }
311 :
312 286 : KeyPartitioning { parts }
313 286 : }
314 :
315 2750630 : pub fn is_empty(&self) -> bool {
316 2750630 : self.total_raw_size() == 0
317 2750630 : }
318 :
319 : /// Merge another keyspace into the current one.
320 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
321 1671254 : pub fn merge(&mut self, other: &KeySpace) {
322 1671254 : let all_ranges = self
323 1671254 : .ranges
324 1671254 : .iter()
325 1671254 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
326 1671254 :
327 1671254 : let mut accum = KeySpaceAccum::new();
328 1671254 : let mut prev: Option<&Range<Key>> = None;
329 2402622 : for range in all_ranges {
330 731368 : if let Some(prev) = prev {
331 104972 : let overlap =
332 104972 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
333 104972 : assert!(
334 104972 : !overlap,
335 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
336 : prev, range
337 : );
338 626396 : }
339 :
340 731368 : accum.add_range(range.clone());
341 731368 : prev = Some(range);
342 : }
343 :
344 1671254 : self.ranges = accum.to_keyspace().ranges;
345 1671254 : }
346 :
347 : /// Remove all keys in `other` from `self`.
348 : /// This can involve splitting or removing of existing ranges.
349 : /// Returns the removed keyspace
350 3611536 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
351 3611536 : let (self_start, self_end) = match (self.start(), self.end()) {
352 2977399 : (Some(start), Some(end)) => (start, end),
353 : _ => {
354 : // self is empty
355 634137 : return KeySpace::default();
356 : }
357 : };
358 :
359 : // Key spaces are sorted by definition, so skip ahead to the first
360 : // potentially intersecting range. Similarly, ignore ranges that start
361 : // after the current keyspace ends.
362 2977399 : let other_ranges = other
363 2977399 : .ranges
364 2977399 : .iter()
365 2977399 : .skip_while(|range| self_start >= range.end)
366 2977399 : .take_while(|range| self_end > range.start);
367 2977399 :
368 2977399 : let mut removed_accum = KeySpaceRandomAccum::new();
369 4310262 : for range in other_ranges {
370 2737872 : while let Some(overlap_at) = self.overlaps_at(range) {
371 1405009 : let overlapped = self.ranges[overlap_at].clone();
372 1405009 :
373 1405009 : if overlapped.start < range.start && overlapped.end <= range.end {
374 11 : // Higher part of the range is completely overlapped.
375 11 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
376 11 : self.ranges[overlap_at].end = range.start;
377 1404998 : }
378 1405009 : if overlapped.start >= range.start && overlapped.end > range.end {
379 70 : // Lower part of the range is completely overlapped.
380 70 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
381 70 : self.ranges[overlap_at].start = range.end;
382 1404939 : }
383 1405009 : if overlapped.start < range.start && overlapped.end > range.end {
384 79979 : // Middle part of the range is overlapped.
385 79979 : removed_accum.add_range(range.clone());
386 79979 : self.ranges[overlap_at].end = range.start;
387 79979 : self.ranges
388 79979 : .insert(overlap_at + 1, range.end..overlapped.end);
389 1325030 : }
390 1405009 : if overlapped.start >= range.start && overlapped.end <= range.end {
391 1324949 : // Whole range is overlapped
392 1324949 : removed_accum.add_range(self.ranges[overlap_at].clone());
393 1324949 : self.ranges.remove(overlap_at);
394 1324949 : }
395 : }
396 : }
397 :
398 2977399 : removed_accum.to_keyspace()
399 3611536 : }
400 :
401 3611574 : pub fn start(&self) -> Option<Key> {
402 3611574 : self.ranges.first().map(|range| range.start)
403 3611574 : }
404 :
405 3611866 : pub fn end(&self) -> Option<Key> {
406 3611866 : self.ranges.last().map(|range| range.end)
407 3611866 : }
408 :
409 : /// The size of the keyspace in pages, before accounting for sharding
410 3378244 : pub fn total_raw_size(&self) -> usize {
411 3378244 : self.ranges
412 3378244 : .iter()
413 3378244 : .map(|range| ShardedRange::raw_size(range) as usize)
414 3378244 : .sum()
415 3378244 : }
416 :
417 3431712 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
418 4750418 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
419 109 : Ok(0) => None,
420 1253872 : Err(0) => None,
421 121987 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
422 2055744 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
423 122727 : _ => None,
424 : }
425 3431712 : }
426 :
427 : ///
428 : /// Check if key space contains overlapping range
429 : ///
430 693840 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
431 693840 : self.overlaps_at(range).is_some()
432 693840 : }
433 :
434 : /// Check if the keyspace contains a key
435 691891 : pub fn contains(&self, key: &Key) -> bool {
436 691891 : self.overlaps(&(*key..key.next()))
437 691891 : }
438 : }
439 :
440 : ///
441 : /// Represents a partitioning of the key space.
442 : ///
443 : /// The only kind of partitioning we do is to partition the key space into
444 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
445 : /// But this data structure could represent any partitioning.
446 : ///
447 : #[derive(Clone, Debug, Default)]
448 : pub struct KeyPartitioning {
449 : pub parts: Vec<KeySpace>,
450 : }
451 :
452 : /// Represents a partitioning of the sparse key space.
453 : #[derive(Clone, Debug, Default)]
454 : pub struct SparseKeyPartitioning {
455 : pub parts: Vec<SparseKeySpace>,
456 : }
457 :
458 : impl KeyPartitioning {
459 820 : pub fn new() -> Self {
460 820 : KeyPartitioning { parts: Vec::new() }
461 820 : }
462 :
463 : /// Convert a key partitioning to a sparse partition.
464 410 : pub fn into_sparse(self) -> SparseKeyPartitioning {
465 410 : SparseKeyPartitioning {
466 410 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
467 410 : }
468 410 : }
469 : }
470 :
471 : impl SparseKeyPartitioning {
472 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
473 : /// cause long/dead loops.
474 534 : pub fn into_dense(self) -> KeyPartitioning {
475 534 : KeyPartitioning {
476 534 : parts: self.parts.into_iter().map(|x| x.0).collect(),
477 534 : }
478 534 : }
479 : }
480 :
481 : ///
482 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
483 : /// object. This takes care of merging adjacent keys and key ranges into
484 : /// contiguous ranges.
485 : ///
486 : #[derive(Clone, Debug, Default)]
487 : pub struct KeySpaceAccum {
488 : accum: Option<Range<Key>>,
489 :
490 : ranges: Vec<Range<Key>>,
491 : size: u64,
492 : }
493 :
494 : impl KeySpaceAccum {
495 2189479 : pub fn new() -> Self {
496 2189479 : Self {
497 2189479 : accum: None,
498 2189479 : ranges: Vec::new(),
499 2189479 : size: 0,
500 2189479 : }
501 2189479 : }
502 :
503 : #[inline(always)]
504 4081270 : pub fn add_key(&mut self, key: Key) {
505 4081270 : self.add_range(singleton_range(key))
506 4081270 : }
507 :
508 : #[inline(always)]
509 5334963 : pub fn add_range(&mut self, range: Range<Key>) {
510 5334963 : self.size += ShardedRange::raw_size(&range) as u64;
511 5334963 :
512 5334963 : match self.accum.as_mut() {
513 4176236 : Some(accum) => {
514 4176236 : if range.start == accum.end {
515 4066820 : accum.end = range.end;
516 4066820 : } else {
517 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
518 : // a new range here if the skipped region was all keys that don't belong on this shard.
519 : // (https://github.com/neondatabase/neon/issues/6247)
520 109416 : assert!(range.start > accum.end);
521 109416 : self.ranges.push(accum.clone());
522 109416 : *accum = range;
523 : }
524 : }
525 1158727 : None => self.accum = Some(range),
526 : }
527 5334963 : }
528 :
529 1972175 : pub fn to_keyspace(mut self) -> KeySpace {
530 1972175 : if let Some(accum) = self.accum.take() {
531 924034 : self.ranges.push(accum);
532 1048141 : }
533 1972175 : KeySpace {
534 1972175 : ranges: self.ranges,
535 1972175 : }
536 1972175 : }
537 :
538 1108 : pub fn consume_keyspace(&mut self) -> KeySpace {
539 1108 : std::mem::take(self).to_keyspace()
540 1108 : }
541 :
542 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
543 : // the keys to be omitted in storage on this shard.
544 2397 : pub fn raw_size(&self) -> u64 {
545 2397 : self.size
546 2397 : }
547 : }
548 :
549 : ///
550 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
551 : /// object. Key ranges may be inserted in any order and can overlap.
552 : ///
553 : #[derive(Clone, Debug, Default)]
554 : pub struct KeySpaceRandomAccum {
555 : ranges: Vec<Range<Key>>,
556 : }
557 :
558 : impl KeySpaceRandomAccum {
559 8620979 : pub fn new() -> Self {
560 8620979 : Self { ranges: Vec::new() }
561 8620979 : }
562 :
563 666886 : pub fn add_key(&mut self, key: Key) {
564 666886 : self.add_range(singleton_range(key))
565 666886 : }
566 :
567 3047729 : pub fn add_range(&mut self, range: Range<Key>) {
568 3047729 : self.ranges.push(range);
569 3047729 : }
570 :
571 882038 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
572 1764248 : for range in keyspace.ranges {
573 882210 : self.add_range(range);
574 882210 : }
575 882038 : }
576 :
577 6323024 : pub fn to_keyspace(mut self) -> KeySpace {
578 6323024 : let mut ranges = Vec::new();
579 6323024 : if !self.ranges.is_empty() {
580 2707678 : self.ranges.sort_by_key(|r| r.start);
581 2707678 : let mut start = self.ranges.first().unwrap().start;
582 2707678 : let mut end = self.ranges.first().unwrap().end;
583 5755337 : for r in self.ranges {
584 3047659 : assert!(r.start >= start);
585 3047659 : if r.start > end {
586 256761 : ranges.push(start..end);
587 256761 : start = r.start;
588 256761 : end = r.end;
589 2790898 : } else if r.end > end {
590 4482 : end = r.end;
591 2786416 : }
592 : }
593 2707678 : ranges.push(start..end);
594 3615346 : }
595 6323024 : KeySpace { ranges }
596 6323024 : }
597 :
598 3342488 : pub fn consume_keyspace(&mut self) -> KeySpace {
599 3342488 : let mut prev_accum = KeySpaceRandomAccum::new();
600 3342488 : std::mem::swap(self, &mut prev_accum);
601 3342488 :
602 3342488 : prev_accum.to_keyspace()
603 3342488 : }
604 : }
605 :
606 4748156 : pub fn singleton_range(key: Key) -> Range<Key> {
607 4748156 : key..key.next()
608 4748156 : }
609 :
610 : #[cfg(test)]
611 : mod tests {
612 : use rand::{RngCore, SeedableRng};
613 :
614 : use crate::{
615 : models::ShardParameters,
616 : shard::{ShardCount, ShardNumber},
617 : };
618 :
619 : use super::*;
620 : use std::fmt::Write;
621 :
622 : // Helper function to create a key range.
623 : //
624 : // Make the tests below less verbose.
625 46 : fn kr(irange: Range<i128>) -> Range<Key> {
626 46 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
627 46 : }
628 :
629 : #[allow(dead_code)]
630 0 : fn dump_keyspace(ks: &KeySpace) {
631 0 : for r in ks.ranges.iter() {
632 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
633 0 : }
634 0 : }
635 :
636 11 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
637 11 : if actual.ranges != expected {
638 0 : let mut msg = String::new();
639 0 :
640 0 : writeln!(msg, "expected:").unwrap();
641 0 : for r in &expected {
642 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
643 0 : }
644 0 : writeln!(msg, "got:").unwrap();
645 0 : for r in &actual.ranges {
646 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
647 0 : }
648 0 : panic!("{}", msg);
649 11 : }
650 11 : }
651 :
652 : #[test]
653 1 : fn keyspace_consume() {
654 1 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
655 1 :
656 1 : let mut accum = KeySpaceAccum::new();
657 4 : for range in &ranges {
658 3 : accum.add_range(range.clone());
659 3 : }
660 :
661 1 : let expected_size: u64 = ranges
662 1 : .iter()
663 3 : .map(|r| ShardedRange::raw_size(r) as u64)
664 1 : .sum();
665 1 : assert_eq!(accum.raw_size(), expected_size);
666 :
667 1 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
668 1 : assert_eq!(accum.raw_size(), 0);
669 :
670 1 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
671 1 : assert_eq!(accum.raw_size(), 0);
672 :
673 4 : for range in &ranges {
674 3 : accum.add_range(range.clone());
675 3 : }
676 1 : assert_ks_eq(&accum.to_keyspace(), ranges);
677 1 : }
678 :
679 : #[test]
680 1 : fn keyspace_add_range() {
681 1 : // two separate ranges
682 1 : //
683 1 : // #####
684 1 : // #####
685 1 : let mut ks = KeySpaceRandomAccum::default();
686 1 : ks.add_range(kr(0..10));
687 1 : ks.add_range(kr(20..30));
688 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
689 1 :
690 1 : // two separate ranges, added in reverse order
691 1 : //
692 1 : // #####
693 1 : // #####
694 1 : let mut ks = KeySpaceRandomAccum::default();
695 1 : ks.add_range(kr(20..30));
696 1 : ks.add_range(kr(0..10));
697 1 :
698 1 : // add range that is adjacent to the end of an existing range
699 1 : //
700 1 : // #####
701 1 : // #####
702 1 : ks.add_range(kr(0..10));
703 1 : ks.add_range(kr(10..30));
704 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
705 1 :
706 1 : // add range that is adjacent to the start of an existing range
707 1 : //
708 1 : // #####
709 1 : // #####
710 1 : let mut ks = KeySpaceRandomAccum::default();
711 1 : ks.add_range(kr(10..30));
712 1 : ks.add_range(kr(0..10));
713 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
714 1 :
715 1 : // add range that overlaps with the end of an existing range
716 1 : //
717 1 : // #####
718 1 : // #####
719 1 : let mut ks = KeySpaceRandomAccum::default();
720 1 : ks.add_range(kr(0..10));
721 1 : ks.add_range(kr(5..30));
722 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
723 1 :
724 1 : // add range that overlaps with the start of an existing range
725 1 : //
726 1 : // #####
727 1 : // #####
728 1 : let mut ks = KeySpaceRandomAccum::default();
729 1 : ks.add_range(kr(5..30));
730 1 : ks.add_range(kr(0..10));
731 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
732 1 :
733 1 : // add range that is fully covered by an existing range
734 1 : //
735 1 : // #########
736 1 : // #####
737 1 : let mut ks = KeySpaceRandomAccum::default();
738 1 : ks.add_range(kr(0..30));
739 1 : ks.add_range(kr(10..20));
740 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
741 1 :
742 1 : // add range that extends an existing range from both ends
743 1 : //
744 1 : // #####
745 1 : // #########
746 1 : let mut ks = KeySpaceRandomAccum::default();
747 1 : ks.add_range(kr(10..20));
748 1 : ks.add_range(kr(0..30));
749 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
750 1 :
751 1 : // add a range that overlaps with two existing ranges, joining them
752 1 : //
753 1 : // ##### #####
754 1 : // #######
755 1 : let mut ks = KeySpaceRandomAccum::default();
756 1 : ks.add_range(kr(0..10));
757 1 : ks.add_range(kr(20..30));
758 1 : ks.add_range(kr(5..25));
759 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
760 1 : }
761 :
762 : #[test]
763 1 : fn keyspace_overlaps() {
764 1 : let mut ks = KeySpaceRandomAccum::default();
765 1 : ks.add_range(kr(10..20));
766 1 : ks.add_range(kr(30..40));
767 1 : let ks = ks.to_keyspace();
768 1 :
769 1 : // ##### #####
770 1 : // xxxx
771 1 : assert!(!ks.overlaps(&kr(0..5)));
772 :
773 : // ##### #####
774 : // xxxx
775 1 : assert!(!ks.overlaps(&kr(5..9)));
776 :
777 : // ##### #####
778 : // xxxx
779 1 : assert!(!ks.overlaps(&kr(5..10)));
780 :
781 : // ##### #####
782 : // xxxx
783 1 : assert!(ks.overlaps(&kr(5..11)));
784 :
785 : // ##### #####
786 : // xxxx
787 1 : assert!(ks.overlaps(&kr(10..15)));
788 :
789 : // ##### #####
790 : // xxxx
791 1 : assert!(ks.overlaps(&kr(15..20)));
792 :
793 : // ##### #####
794 : // xxxx
795 1 : assert!(ks.overlaps(&kr(15..25)));
796 :
797 : // ##### #####
798 : // xxxx
799 1 : assert!(!ks.overlaps(&kr(22..28)));
800 :
801 : // ##### #####
802 : // xxxx
803 1 : assert!(!ks.overlaps(&kr(25..30)));
804 :
805 : // ##### #####
806 : // xxxx
807 1 : assert!(ks.overlaps(&kr(35..35)));
808 :
809 : // ##### #####
810 : // xxxx
811 1 : assert!(!ks.overlaps(&kr(40..45)));
812 :
813 : // ##### #####
814 : // xxxx
815 1 : assert!(!ks.overlaps(&kr(45..50)));
816 :
817 : // ##### #####
818 : // xxxxxxxxxxx
819 1 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
820 1 : }
821 :
822 : #[test]
823 1 : fn test_remove_full_overlapps() {
824 1 : let mut key_space1 = KeySpace {
825 1 : ranges: vec![
826 1 : Key::from_i128(1)..Key::from_i128(4),
827 1 : Key::from_i128(5)..Key::from_i128(8),
828 1 : Key::from_i128(10)..Key::from_i128(12),
829 1 : ],
830 1 : };
831 1 : let key_space2 = KeySpace {
832 1 : ranges: vec![
833 1 : Key::from_i128(2)..Key::from_i128(3),
834 1 : Key::from_i128(6)..Key::from_i128(7),
835 1 : Key::from_i128(11)..Key::from_i128(13),
836 1 : ],
837 1 : };
838 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
839 1 : let removed_expected = KeySpace {
840 1 : ranges: vec![
841 1 : Key::from_i128(2)..Key::from_i128(3),
842 1 : Key::from_i128(6)..Key::from_i128(7),
843 1 : Key::from_i128(11)..Key::from_i128(12),
844 1 : ],
845 1 : };
846 1 : assert_eq!(removed, removed_expected);
847 :
848 1 : assert_eq!(
849 1 : key_space1.ranges,
850 1 : vec![
851 1 : Key::from_i128(1)..Key::from_i128(2),
852 1 : Key::from_i128(3)..Key::from_i128(4),
853 1 : Key::from_i128(5)..Key::from_i128(6),
854 1 : Key::from_i128(7)..Key::from_i128(8),
855 1 : Key::from_i128(10)..Key::from_i128(11)
856 1 : ]
857 1 : );
858 1 : }
859 :
860 : #[test]
861 1 : fn test_remove_partial_overlaps() {
862 1 : // Test partial ovelaps
863 1 : let mut key_space1 = KeySpace {
864 1 : ranges: vec![
865 1 : Key::from_i128(1)..Key::from_i128(5),
866 1 : Key::from_i128(7)..Key::from_i128(10),
867 1 : Key::from_i128(12)..Key::from_i128(15),
868 1 : ],
869 1 : };
870 1 : let key_space2 = KeySpace {
871 1 : ranges: vec![
872 1 : Key::from_i128(3)..Key::from_i128(6),
873 1 : Key::from_i128(8)..Key::from_i128(11),
874 1 : Key::from_i128(14)..Key::from_i128(17),
875 1 : ],
876 1 : };
877 1 :
878 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
879 1 : let removed_expected = KeySpace {
880 1 : ranges: vec![
881 1 : Key::from_i128(3)..Key::from_i128(5),
882 1 : Key::from_i128(8)..Key::from_i128(10),
883 1 : Key::from_i128(14)..Key::from_i128(15),
884 1 : ],
885 1 : };
886 1 : assert_eq!(removed, removed_expected);
887 :
888 1 : assert_eq!(
889 1 : key_space1.ranges,
890 1 : vec![
891 1 : Key::from_i128(1)..Key::from_i128(3),
892 1 : Key::from_i128(7)..Key::from_i128(8),
893 1 : Key::from_i128(12)..Key::from_i128(14),
894 1 : ]
895 1 : );
896 1 : }
897 :
898 : #[test]
899 1 : fn test_remove_no_overlaps() {
900 1 : let mut key_space1 = KeySpace {
901 1 : ranges: vec![
902 1 : Key::from_i128(1)..Key::from_i128(5),
903 1 : Key::from_i128(7)..Key::from_i128(10),
904 1 : Key::from_i128(12)..Key::from_i128(15),
905 1 : ],
906 1 : };
907 1 : let key_space2 = KeySpace {
908 1 : ranges: vec![
909 1 : Key::from_i128(6)..Key::from_i128(7),
910 1 : Key::from_i128(11)..Key::from_i128(12),
911 1 : Key::from_i128(15)..Key::from_i128(17),
912 1 : ],
913 1 : };
914 1 :
915 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
916 1 : let removed_expected = KeySpace::default();
917 1 : assert_eq!(removed, removed_expected);
918 :
919 1 : assert_eq!(
920 1 : key_space1.ranges,
921 1 : vec![
922 1 : Key::from_i128(1)..Key::from_i128(5),
923 1 : Key::from_i128(7)..Key::from_i128(10),
924 1 : Key::from_i128(12)..Key::from_i128(15),
925 1 : ]
926 1 : );
927 1 : }
928 :
929 : #[test]
930 1 : fn test_remove_one_range_overlaps_multiple() {
931 1 : let mut key_space1 = KeySpace {
932 1 : ranges: vec![
933 1 : Key::from_i128(1)..Key::from_i128(3),
934 1 : Key::from_i128(3)..Key::from_i128(6),
935 1 : Key::from_i128(6)..Key::from_i128(10),
936 1 : Key::from_i128(12)..Key::from_i128(15),
937 1 : Key::from_i128(17)..Key::from_i128(20),
938 1 : Key::from_i128(20)..Key::from_i128(30),
939 1 : Key::from_i128(30)..Key::from_i128(40),
940 1 : ],
941 1 : };
942 1 : let key_space2 = KeySpace {
943 1 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
944 1 : };
945 1 :
946 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
947 1 : let removed_expected = KeySpace {
948 1 : ranges: vec![
949 1 : Key::from_i128(9)..Key::from_i128(10),
950 1 : Key::from_i128(12)..Key::from_i128(15),
951 1 : Key::from_i128(17)..Key::from_i128(19),
952 1 : ],
953 1 : };
954 1 : assert_eq!(removed, removed_expected);
955 :
956 1 : assert_eq!(
957 1 : key_space1.ranges,
958 1 : vec![
959 1 : Key::from_i128(1)..Key::from_i128(3),
960 1 : Key::from_i128(3)..Key::from_i128(6),
961 1 : Key::from_i128(6)..Key::from_i128(9),
962 1 : Key::from_i128(19)..Key::from_i128(20),
963 1 : Key::from_i128(20)..Key::from_i128(30),
964 1 : Key::from_i128(30)..Key::from_i128(40),
965 1 : ]
966 1 : );
967 1 : }
968 : #[test]
969 1 : fn sharded_range_relation_gap() {
970 1 : let shard_identity = ShardIdentity::new(
971 1 : ShardNumber(0),
972 1 : ShardCount::new(4),
973 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
974 1 : )
975 1 : .unwrap();
976 1 :
977 1 : let range = ShardedRange::new(
978 1 : Range {
979 1 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
980 1 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
981 1 : },
982 1 : &shard_identity,
983 1 : );
984 1 :
985 1 : // Key range spans relations, expect MAX
986 1 : assert_eq!(range.page_count(), u32::MAX);
987 1 : }
988 :
989 : #[test]
990 1 : fn shard_identity_keyspaces_single_key() {
991 1 : let shard_identity = ShardIdentity::new(
992 1 : ShardNumber(1),
993 1 : ShardCount::new(4),
994 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
995 1 : )
996 1 : .unwrap();
997 1 :
998 1 : let range = ShardedRange::new(
999 1 : Range {
1000 1 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
1001 1 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
1002 1 : },
1003 1 : &shard_identity,
1004 1 : );
1005 1 : // Single-key range on logical size key
1006 1 : assert_eq!(range.page_count(), 1);
1007 1 : }
1008 :
1009 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
1010 : #[test]
1011 1 : fn contiguous_range_check() {
1012 1 : assert!(!is_contiguous_range(
1013 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1014 1 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1015 1 : ),);
1016 :
1017 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1018 : // not considered a rel block range because 0xffffffff stores logical sizes,
1019 : // not blocks.
1020 1 : assert!(!is_contiguous_range(
1021 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1022 1 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1023 1 : ),);
1024 :
1025 : // Keys within the normal data region of a relation
1026 1 : assert!(is_contiguous_range(
1027 1 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1028 1 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1029 1 : ),);
1030 :
1031 : // The logical size key of one forkno, then some blocks in the next
1032 1 : assert!(is_contiguous_range(
1033 1 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1034 1 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1035 1 : ),);
1036 1 : }
1037 :
1038 : #[test]
1039 1 : fn shard_identity_keyspaces_forkno_gap() {
1040 1 : let shard_identity = ShardIdentity::new(
1041 1 : ShardNumber(1),
1042 1 : ShardCount::new(4),
1043 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1044 1 : )
1045 1 : .unwrap();
1046 1 :
1047 1 : let range = ShardedRange::new(
1048 1 : Range {
1049 1 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1050 1 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1051 1 : },
1052 1 : &shard_identity,
1053 1 : );
1054 1 :
1055 1 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1056 1 : // calculate a valid size, because we have no way to know if they keys between start
1057 1 : // and end are actually in use.
1058 1 : assert_eq!(range.page_count(), u32::MAX);
1059 1 : }
1060 :
1061 : #[test]
1062 1 : fn shard_identity_keyspaces_one_relation() {
1063 5 : for shard_number in 0..4 {
1064 4 : let shard_identity = ShardIdentity::new(
1065 4 : ShardNumber(shard_number),
1066 4 : ShardCount::new(4),
1067 4 : ShardParameters::DEFAULT_STRIPE_SIZE,
1068 4 : )
1069 4 : .unwrap();
1070 4 :
1071 4 : let range = ShardedRange::new(
1072 4 : Range {
1073 4 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1074 4 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1075 4 : },
1076 4 : &shard_identity,
1077 4 : );
1078 4 :
1079 4 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1080 4 : if shard_number == 0 {
1081 1 : assert_eq!(range.page_count(), 1);
1082 : } else {
1083 : // Other shards should perceive the range's size as zero
1084 3 : assert_eq!(range.page_count(), 0);
1085 : }
1086 : }
1087 1 : }
1088 :
1089 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1090 : /// the total page count in the range and the fragments.
1091 1012 : fn do_fragment(
1092 1012 : range_start: Key,
1093 1012 : range_end: Key,
1094 1012 : shard_identity: &ShardIdentity,
1095 1012 : target_nblocks: u32,
1096 1012 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1097 1012 : let range = ShardedRange::new(
1098 1012 : Range {
1099 1012 : start: range_start,
1100 1012 : end: range_end,
1101 1012 : },
1102 1012 : shard_identity,
1103 1012 : );
1104 1012 :
1105 1012 : let page_count = range.page_count();
1106 1012 : let fragments = range.fragment(target_nblocks);
1107 1012 :
1108 1012 : // Invariant: we always get at least one fragment
1109 1012 : assert!(!fragments.is_empty());
1110 :
1111 : // Invariant: the first/last fragment start/end should equal the input start/end
1112 1012 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1113 1012 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1114 :
1115 1012 : if page_count > 0 {
1116 : // Invariant: every fragment must contain at least one shard-local page, if the
1117 : // total range contains at least one shard-local page
1118 687 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1119 554 : if !all_nonzero {
1120 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1121 554 : }
1122 554 : assert!(all_nonzero);
1123 : } else {
1124 : // A range with no shard-local pages should always be returned as a single fragment
1125 458 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1126 : }
1127 :
1128 : // Invariant: fragments must be ordered and non-overlapping
1129 1012 : let mut last: Option<Range<Key>> = None;
1130 2157 : for frag in &fragments {
1131 1145 : if let Some(last) = last {
1132 133 : assert!(frag.1.start >= last.end);
1133 133 : assert!(frag.1.start > last.start);
1134 1012 : }
1135 1145 : last = Some(frag.1.clone())
1136 : }
1137 :
1138 : // Invariant: fragments respect target_nblocks
1139 2157 : for frag in &fragments {
1140 1145 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1141 : }
1142 :
1143 1012 : (page_count, fragments)
1144 1012 : }
1145 :
1146 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1147 : /// for a single tenant.
1148 : #[test]
1149 1 : fn sharded_range_fragment_simple() {
1150 1 : let shard_identity = ShardIdentity::new(
1151 1 : ShardNumber(0),
1152 1 : ShardCount::new(4),
1153 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1154 1 : )
1155 1 : .unwrap();
1156 1 :
1157 1 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1158 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1159 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
1160 1 :
1161 1 : // Ask for stripe_size blocks, we get the whole stripe
1162 1 : assert_eq!(
1163 1 : do_fragment(input_start, input_end, &shard_identity, 32768),
1164 1 : (32768, vec![(32768, input_start..input_end)])
1165 1 : );
1166 :
1167 : // Ask for more, we still get the whole stripe
1168 1 : assert_eq!(
1169 1 : do_fragment(input_start, input_end, &shard_identity, 10000000),
1170 1 : (32768, vec![(32768, input_start..input_end)])
1171 1 : );
1172 :
1173 : // Ask for target_nblocks of half the stripe size, we get two halves
1174 1 : assert_eq!(
1175 1 : do_fragment(input_start, input_end, &shard_identity, 16384),
1176 1 : (
1177 1 : 32768,
1178 1 : vec![
1179 1 : (16384, input_start..input_start.add(16384)),
1180 1 : (16384, input_start.add(16384)..input_end)
1181 1 : ]
1182 1 : )
1183 1 : );
1184 1 : }
1185 :
1186 : #[test]
1187 1 : fn sharded_range_fragment_multi_stripe() {
1188 1 : let shard_identity = ShardIdentity::new(
1189 1 : ShardNumber(0),
1190 1 : ShardCount::new(4),
1191 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1192 1 : )
1193 1 : .unwrap();
1194 1 :
1195 1 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1196 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1197 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1198 1 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1199 1 : // its size to be just the blocks belonging to our shard.
1200 1 : assert_eq!(
1201 1 : do_fragment(input_start, input_end, &shard_identity, 131072),
1202 1 : (32768, vec![(32768, input_start..input_end)])
1203 1 : );
1204 :
1205 : // Ask for a sub-stripe quantity
1206 1 : assert_eq!(
1207 1 : do_fragment(input_start, input_end, &shard_identity, 16000),
1208 1 : (
1209 1 : 32768,
1210 1 : vec![
1211 1 : (16000, input_start..input_start.add(16000)),
1212 1 : (16000, input_start.add(16000)..input_start.add(32000)),
1213 1 : (768, input_start.add(32000)..input_end),
1214 1 : ]
1215 1 : )
1216 1 : );
1217 :
1218 : // Try on a range that starts slightly after our owned stripe
1219 1 : assert_eq!(
1220 1 : do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
1221 1 : (32767, vec![(32767, input_start.add(1)..input_end)])
1222 1 : );
1223 1 : }
1224 :
1225 : /// Test our calculations work correctly when we start a range from the logical size key of
1226 : /// a previous relation.
1227 : #[test]
1228 1 : fn sharded_range_fragment_starting_from_logical_size() {
1229 1 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1230 1 : let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
1231 1 :
1232 1 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1233 1 : let shard_identity = ShardIdentity::new(
1234 1 : ShardNumber(0),
1235 1 : ShardCount::new(4),
1236 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1237 1 : )
1238 1 : .unwrap();
1239 1 : assert_eq!(
1240 1 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1241 1 : (0x8001, vec![(0x8001, input_start..input_end)])
1242 1 : );
1243 :
1244 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1245 : // store all logical sizes)
1246 1 : let shard_identity = ShardIdentity::new(
1247 1 : ShardNumber(1),
1248 1 : ShardCount::new(4),
1249 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1250 1 : )
1251 1 : .unwrap();
1252 1 : assert_eq!(
1253 1 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1254 1 : (0x1, vec![(0x1, input_start..input_end)])
1255 1 : );
1256 1 : }
1257 :
1258 : /// Test that ShardedRange behaves properly when used on un-sharded data
1259 : #[test]
1260 1 : fn sharded_range_fragment_unsharded() {
1261 1 : let shard_identity = ShardIdentity::unsharded();
1262 1 :
1263 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1264 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1265 1 : assert_eq!(
1266 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1267 1 : (
1268 1 : 0x10000,
1269 1 : vec![
1270 1 : (0x8000, input_start..input_start.add(0x8000)),
1271 1 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1272 1 : ]
1273 1 : )
1274 1 : );
1275 1 : }
1276 :
1277 : #[test]
1278 1 : fn sharded_range_fragment_cross_relation() {
1279 1 : let shard_identity = ShardIdentity::unsharded();
1280 1 :
1281 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1282 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1283 1 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1284 1 : assert_eq!(
1285 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1286 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1287 1 : );
1288 :
1289 : // Same, but using a sharded identity
1290 1 : let shard_identity = ShardIdentity::new(
1291 1 : ShardNumber(0),
1292 1 : ShardCount::new(4),
1293 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1294 1 : )
1295 1 : .unwrap();
1296 1 : assert_eq!(
1297 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1298 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1299 1 : );
1300 1 : }
1301 :
1302 : #[test]
1303 1 : fn sharded_range_fragment_tiny_nblocks() {
1304 1 : let shard_identity = ShardIdentity::unsharded();
1305 1 :
1306 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1307 1 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1308 1 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1309 1 : assert_eq!(
1310 1 : do_fragment(input_start, input_end, &shard_identity, 16),
1311 1 : (
1312 1 : 0x38,
1313 1 : vec![
1314 1 : (16, input_start..input_start.add(16)),
1315 1 : (16, input_start.add(16)..input_start.add(32)),
1316 1 : (16, input_start.add(32)..input_start.add(48)),
1317 1 : (8, input_start.add(48)..input_end),
1318 1 : ]
1319 1 : )
1320 1 : );
1321 1 : }
1322 :
1323 : #[test]
1324 1 : fn sharded_range_fragment_fuzz() {
1325 1 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1326 1 : // the test to be reproducible.
1327 1 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1328 :
1329 1001 : for _i in 0..1000 {
1330 1000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1331 519 : ShardIdentity::unsharded()
1332 : } else {
1333 481 : let shard_count = prng.next_u32() % 127 + 1;
1334 481 : ShardIdentity::new(
1335 481 : ShardNumber((prng.next_u32() % shard_count) as u8),
1336 481 : ShardCount::new(shard_count as u8),
1337 481 : ShardParameters::DEFAULT_STRIPE_SIZE,
1338 481 : )
1339 481 : .unwrap()
1340 : };
1341 :
1342 1000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1343 1000 :
1344 1000 : let start_offset = prng.next_u32() % 16384;
1345 1000 :
1346 1000 : // Try ranges up to 4GiB in size, that are always at least 1
1347 1000 : let range_size = prng.next_u32() % 8192 + 1;
1348 1000 :
1349 1000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1350 1000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1351 1000 : .unwrap()
1352 1000 : .add(start_offset);
1353 1000 : let input_end = input_start.add(range_size);
1354 1000 :
1355 1000 : // This test's main success conditions are the invariants baked into do_fragment
1356 1000 : let (_total_size, fragments) =
1357 1000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1358 1000 :
1359 1000 : // Pick a random key within the range and check it appears in the output
1360 1000 : let example_key = input_start.add(prng.next_u32() % range_size);
1361 1000 :
1362 1000 : // Panic on unwrap if it isn't found
1363 1000 : let example_key_frag = fragments
1364 1000 : .iter()
1365 1073 : .find(|f| f.1.contains(&example_key))
1366 1000 : .unwrap();
1367 1000 :
1368 1000 : // Check that the fragment containing our random key has a nonzero size if
1369 1000 : // that key is shard-local
1370 1000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1371 1000 : if example_key_local {
1372 542 : assert!(example_key_frag.0 > 0);
1373 458 : }
1374 : }
1375 1 : }
1376 : }
|