Line data Source code
1 : use std::ops::Range;
2 :
3 : use itertools::Itertools;
4 : use postgres_ffi::BLCKSZ;
5 :
6 : use crate::key::Key;
7 : use crate::shard::{ShardCount, ShardIdentity};
8 :
9 : ///
10 : /// Represents a set of Keys, in a compact form.
11 : ///
12 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
13 : pub struct KeySpace {
14 : /// Contiguous ranges of keys that belong to the key space. In key order,
15 : /// and with no overlap.
16 : pub ranges: Vec<Range<Key>>,
17 : }
18 :
19 : impl std::fmt::Display for KeySpace {
20 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 0 : write!(f, "[")?;
22 0 : for range in &self.ranges {
23 0 : write!(f, "{}..{},", range.start, range.end)?;
24 : }
25 0 : write!(f, "]")
26 0 : }
27 : }
28 :
29 : /// A wrapper type for sparse keyspaces.
30 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
31 : pub struct SparseKeySpace(pub KeySpace);
32 :
33 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
34 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
35 : /// shard.
36 : ///
37 : /// When we iterate over keys within this object, we will skip any keys that don't belong
38 : /// to this shard.
39 : ///
40 : /// The start + end keys may not belong to the shard: these specify where layer files should
41 : /// start + end, but we will never actually read/write those keys.
42 : #[derive(Clone, Debug, PartialEq, Eq)]
43 : pub struct ShardedRange<'a> {
44 : pub shard_identity: &'a ShardIdentity,
45 : pub range: Range<Key>,
46 : }
47 :
48 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
49 : // top page in the previous relation's space.
50 13903516 : pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
51 13903516 : debug_assert!(is_contiguous_range(range));
52 13903516 : if range.start.field6 == 0xffffffff {
53 1654212 : range.end.field6 + 1
54 : } else {
55 12249304 : range.end.field6 - range.start.field6
56 : }
57 13903516 : }
58 :
59 : /// Return true if this key range includes only keys in the same relation's data blocks, or
60 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
61 : ///
62 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
63 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
64 : /// of a given contiguous range is present on one shard.
65 : ///
66 : /// This matters, because:
67 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
68 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
69 27810735 : pub fn is_contiguous_range(range: &Range<Key>) -> bool {
70 27810735 : range.start.field1 == range.end.field1
71 27806887 : && range.start.field2 == range.end.field2
72 27806883 : && range.start.field3 == range.end.field3
73 27806875 : && range.start.field4 == range.end.field4
74 27806874 : && (range.start.field5 == range.end.field5
75 3308428 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
76 27810735 : }
77 :
78 : impl<'a> ShardedRange<'a> {
79 4895 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
80 4895 : Self {
81 4895 : shard_identity,
82 4895 : range,
83 4895 : }
84 4895 : }
85 :
86 : /// Break up this range into chunks, each of which has at least one local key in it if the
87 : /// total range has at least one local key.
88 4888 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
89 4888 : // Optimization for single-key case (e.g. logical size keys)
90 4888 : if self.range.end == self.range.start.add(1) {
91 3225 : return vec![(
92 3225 : if self.shard_identity.is_key_disposable(&self.range.start) {
93 0 : 0
94 : } else {
95 3225 : 1
96 : },
97 3225 : self.range,
98 : )];
99 1663 : }
100 1663 :
101 1663 : if !is_contiguous_range(&self.range) {
102 : // Ranges that span relations are not fragmented. We only get these ranges as a result
103 : // of operations that act on existing layers, so we trust that the existing range is
104 : // reasonably small.
105 2 : return vec![(u32::MAX, self.range)];
106 1661 : }
107 1661 :
108 1661 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
109 1661 :
110 1661 : let mut cursor = self.range.start;
111 3474 : while cursor < self.range.end {
112 1813 : let advance_by = self.distance_to_next_boundary(cursor);
113 1813 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
114 :
115 : // If the previous fragment is undersized, then we seek to consume enough
116 : // blocks to complete it.
117 1813 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
118 152 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
119 141 : Some(frag) => {
120 141 : // Prev block is complete, want the full number.
121 141 : (
122 141 : target_nblocks,
123 141 : if is_fragment_disposable {
124 : // If this current range will be empty (not shard-local data), we will merge into previous
125 0 : Some(frag)
126 : } else {
127 141 : None
128 : },
129 : )
130 : }
131 : None => {
132 : // First iteration, want the full number
133 1661 : (target_nblocks, None)
134 : }
135 : };
136 :
137 1813 : let advance_by = if is_fragment_disposable {
138 468 : advance_by
139 : } else {
140 1345 : std::cmp::min(advance_by, want_blocks)
141 : };
142 :
143 1813 : let next_cursor = cursor.add(advance_by);
144 :
145 1813 : let this_frag = (
146 1813 : if is_fragment_disposable {
147 468 : 0
148 : } else {
149 1345 : advance_by
150 : },
151 1813 : cursor..next_cursor,
152 1813 : );
153 1813 : cursor = next_cursor;
154 :
155 1813 : if let Some(last_fragment) = merge_last_fragment {
156 11 : // Previous fragment was short or this one is empty, merge into it
157 11 : last_fragment.0 += this_frag.0;
158 11 : last_fragment.1.end = this_frag.1.end;
159 1802 : } else {
160 1802 : fragments.push(this_frag);
161 1802 : }
162 : }
163 :
164 1661 : fragments
165 4888 : }
166 :
167 : /// Estimate the physical pages that are within this range, on this shard. This returns
168 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
169 1019 : pub fn page_count(&self) -> u32 {
170 1019 : // Special cases for single keys like logical sizes
171 1019 : if self.range.end == self.range.start.add(1) {
172 6 : return if self.shard_identity.is_key_disposable(&self.range.start) {
173 3 : 0
174 : } else {
175 3 : 1
176 : };
177 1013 : }
178 1013 :
179 1013 : // We can only do an authentic calculation of contiguous key ranges
180 1013 : if !is_contiguous_range(&self.range) {
181 4 : return u32::MAX;
182 1009 : }
183 1009 :
184 1009 : // Special case for single sharded tenants: our logical and physical sizes are the same
185 1009 : if self.shard_identity.count < ShardCount::new(2) {
186 524 : return contiguous_range_len(&self.range);
187 485 : }
188 485 :
189 485 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
190 485 : // to Self, and add the stripe's block count to our total if so.
191 485 : let mut result: u64 = 0;
192 485 : let mut cursor = self.range.start;
193 981 : while cursor < self.range.end {
194 : // Count up to the next stripe_size boundary or end of range
195 496 : let advance_by = self.distance_to_next_boundary(cursor);
196 496 :
197 496 : // If this blocks in this stripe belong to us, add them to our count
198 496 : if !self.shard_identity.is_key_disposable(&cursor) {
199 28 : result += advance_by as u64;
200 468 : }
201 :
202 496 : cursor = cursor.add(advance_by);
203 : }
204 :
205 485 : if result > u32::MAX as u64 {
206 0 : u32::MAX
207 : } else {
208 485 : result as u32
209 : }
210 1019 : }
211 :
212 : /// Advance the cursor to the next potential fragment boundary: this is either
213 : /// a stripe boundary, or the end of the range.
214 2309 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
215 2309 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
216 2309 :
217 2309 : if self.shard_identity.count < ShardCount::new(2) {
218 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
219 1310 : return distance_to_range_end;
220 999 : }
221 999 :
222 999 : if cursor.field6 == 0xffffffff {
223 : // We are wrapping from one relation's logical size to the next relation's first data block
224 4 : return 1;
225 995 : }
226 995 :
227 995 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
228 995 : let stripe_remainder = self.shard_identity.stripe_size.0
229 995 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
230 995 :
231 995 : if cfg!(debug_assertions) {
232 : // We should never overflow field5 and field6 -- our callers check this earlier
233 : // and would have returned their u32::MAX cases if the input range violated this.
234 995 : let next_cursor = cursor.add(stripe_remainder);
235 995 : debug_assert!(
236 995 : next_cursor.field1 == cursor.field1
237 995 : && next_cursor.field2 == cursor.field2
238 995 : && next_cursor.field3 == cursor.field3
239 995 : && next_cursor.field4 == cursor.field4
240 995 : && next_cursor.field5 == cursor.field5
241 : )
242 0 : }
243 :
244 995 : std::cmp::min(stripe_remainder, distance_to_range_end)
245 2309 : }
246 :
247 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
248 : /// this function simply calculates the number of pages in the space, without accounting for those
249 : /// pages that would not actually be stored on this node.
250 : ///
251 : /// Don't use this function in code that works with physical entities like layer files.
252 13904539 : pub fn raw_size(range: &Range<Key>) -> u32 {
253 13904539 : if is_contiguous_range(range) {
254 13900683 : contiguous_range_len(range)
255 : } else {
256 3856 : u32::MAX
257 : }
258 13904539 : }
259 : }
260 :
261 : impl KeySpace {
262 : /// Create a key space with a single range.
263 45408 : pub fn single(key_range: Range<Key>) -> Self {
264 45408 : Self {
265 45408 : ranges: vec![key_range],
266 45408 : }
267 45408 : }
268 :
269 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
270 : /// in each partition.
271 : ///
272 652 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
273 652 : // Assume that each value is 8k in size.
274 652 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
275 652 :
276 652 : let mut parts = Vec::new();
277 652 : let mut current_part = Vec::new();
278 652 : let mut current_part_size: usize = 0;
279 4528 : for range in &self.ranges {
280 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
281 : // will respect shard striping rather than assuming all keys within a range are present.
282 3876 : let range = ShardedRange::new(range.clone(), shard_identity);
283 :
284 : // Chunk up the range into parts that each contain up to target_size local blocks
285 3884 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
286 : // If appending the next contiguous range in the keyspace to the current
287 : // partition would cause it to be too large, and our current partition
288 : // covers at least one block that is physically present in this shard,
289 : // then start a new partition
290 3884 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
291 48 : && current_part_size > 0
292 48 : {
293 48 : parts.push(KeySpace {
294 48 : ranges: current_part,
295 48 : });
296 48 : current_part = Vec::new();
297 48 : current_part_size = 0;
298 3836 : }
299 3884 : current_part.push(frag_range.start..frag_range.end);
300 3884 : current_part_size += frag_on_shard_size as usize;
301 : }
302 : }
303 :
304 : // add last partition that wasn't full yet.
305 652 : if !current_part.is_empty() {
306 652 : parts.push(KeySpace {
307 652 : ranges: current_part,
308 652 : });
309 652 : }
310 :
311 652 : KeyPartitioning { parts }
312 652 : }
313 :
314 5551992 : pub fn is_empty(&self) -> bool {
315 5551992 : self.total_raw_size() == 0
316 5551992 : }
317 :
318 : /// Merge another keyspace into the current one.
319 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
320 3397488 : pub fn merge(&mut self, other: &KeySpace) {
321 3397488 : let all_ranges = self
322 3397488 : .ranges
323 3397488 : .iter()
324 3397488 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
325 3397488 :
326 3397488 : let mut accum = KeySpaceAccum::new();
327 3397488 : let mut prev: Option<&Range<Key>> = None;
328 4604938 : for range in all_ranges {
329 1207450 : if let Some(prev) = prev {
330 68 : let overlap =
331 68 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
332 68 : assert!(
333 68 : !overlap,
334 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
335 : prev, range
336 : );
337 1207382 : }
338 :
339 1207450 : accum.add_range(range.clone());
340 1207450 : prev = Some(range);
341 : }
342 :
343 3397488 : self.ranges = accum.to_keyspace().ranges;
344 3397488 : }
345 :
346 : /// Remove all keys in `other` from `self`.
347 : /// This can involve splitting or removing of existing ranges.
348 : /// Returns the removed keyspace
349 7751112 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
350 7751112 : let (self_start, self_end) = match (self.start(), self.end()) {
351 6092757 : (Some(start), Some(end)) => (start, end),
352 : _ => {
353 : // self is empty
354 1658355 : return KeySpace::default();
355 : }
356 : };
357 :
358 : // Key spaces are sorted by definition, so skip ahead to the first
359 : // potentially intersecting range. Similarly, ignore ranges that start
360 : // after the current keyspace ends.
361 6092757 : let other_ranges = other
362 6092757 : .ranges
363 6092757 : .iter()
364 6092757 : .skip_while(|range| self_start >= range.end)
365 6092757 : .take_while(|range| self_end > range.start);
366 6092757 :
367 6092757 : let mut removed_accum = KeySpaceRandomAccum::new();
368 8599374 : for range in other_ranges {
369 5013710 : while let Some(overlap_at) = self.overlaps_at(range) {
370 2507093 : let overlapped = self.ranges[overlap_at].clone();
371 2507093 :
372 2507093 : if overlapped.start < range.start && overlapped.end <= range.end {
373 9 : // Higher part of the range is completely overlapped.
374 9 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
375 9 : self.ranges[overlap_at].end = range.start;
376 2507084 : }
377 2507093 : if overlapped.start >= range.start && overlapped.end > range.end {
378 9 : // Lower part of the range is completely overlapped.
379 9 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
380 9 : self.ranges[overlap_at].start = range.end;
381 2507084 : }
382 2507093 : if overlapped.start < range.start && overlapped.end > range.end {
383 26 : // Middle part of the range is overlapped.
384 26 : removed_accum.add_range(range.clone());
385 26 : self.ranges[overlap_at].end = range.start;
386 26 : self.ranges
387 26 : .insert(overlap_at + 1, range.end..overlapped.end);
388 2507067 : }
389 2507093 : if overlapped.start >= range.start && overlapped.end <= range.end {
390 2507049 : // Whole range is overlapped
391 2507049 : removed_accum.add_range(self.ranges[overlap_at].clone());
392 2507049 : self.ranges.remove(overlap_at);
393 2507049 : }
394 : }
395 : }
396 :
397 6092757 : removed_accum.to_keyspace()
398 7751112 : }
399 :
400 7751188 : pub fn start(&self) -> Option<Key> {
401 7751188 : self.ranges.first().map(|range| range.start)
402 7751188 : }
403 :
404 7751772 : pub fn end(&self) -> Option<Key> {
405 7751772 : self.ranges.last().map(|range| range.end)
406 7751772 : }
407 :
408 : /// The size of the keyspace in pages, before accounting for sharding
409 5591424 : pub fn total_raw_size(&self) -> usize {
410 5591424 : self.ranges
411 5591424 : .iter()
412 5591424 : .map(|range| ShardedRange::raw_size(range) as usize)
413 5591424 : .sum()
414 5591424 : }
415 :
416 5019335 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
417 5019335 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
418 25 : Ok(0) => None,
419 2508538 : Err(0) => None,
420 37 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
421 2510735 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
422 1529 : _ => None,
423 : }
424 5019335 : }
425 :
426 : ///
427 : /// Check if key space contains overlapping range
428 : ///
429 5625 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
430 5625 : self.overlaps_at(range).is_some()
431 5625 : }
432 :
433 : /// Check if the keyspace contains a key
434 1580 : pub fn contains(&self, key: &Key) -> bool {
435 1580 : self.overlaps(&(*key..key.next()))
436 1580 : }
437 : }
438 :
439 : ///
440 : /// Represents a partitioning of the key space.
441 : ///
442 : /// The only kind of partitioning we do is to partition the key space into
443 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
444 : /// But this data structure could represent any partitioning.
445 : ///
446 : #[derive(Clone, Debug, Default)]
447 : pub struct KeyPartitioning {
448 : pub parts: Vec<KeySpace>,
449 : }
450 :
451 : /// Represents a partitioning of the sparse key space.
452 : #[derive(Clone, Debug, Default)]
453 : pub struct SparseKeyPartitioning {
454 : pub parts: Vec<SparseKeySpace>,
455 : }
456 :
457 : impl KeyPartitioning {
458 1808 : pub fn new() -> Self {
459 1808 : KeyPartitioning { parts: Vec::new() }
460 1808 : }
461 :
462 : /// Convert a key partitioning to a sparse partition.
463 904 : pub fn into_sparse(self) -> SparseKeyPartitioning {
464 904 : SparseKeyPartitioning {
465 904 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
466 904 : }
467 904 : }
468 : }
469 :
470 : impl SparseKeyPartitioning {
471 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
472 : /// cause long/dead loops.
473 1148 : pub fn into_dense(self) -> KeyPartitioning {
474 1148 : KeyPartitioning {
475 1148 : parts: self.parts.into_iter().map(|x| x.0).collect(),
476 1148 : }
477 1148 : }
478 : }
479 :
480 : ///
481 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
482 : /// object. This takes care of merging adjacent keys and key ranges into
483 : /// contiguous ranges.
484 : ///
485 : #[derive(Clone, Debug, Default)]
486 : pub struct KeySpaceAccum {
487 : accum: Option<Range<Key>>,
488 :
489 : ranges: Vec<Range<Key>>,
490 : size: u64,
491 : }
492 :
493 : impl KeySpaceAccum {
494 3654351 : pub fn new() -> Self {
495 3654351 : Self {
496 3654351 : accum: None,
497 3654351 : ranges: Vec::new(),
498 3654351 : size: 0,
499 3654351 : }
500 3654351 : }
501 :
502 : #[inline(always)]
503 8323044 : pub fn add_key(&mut self, key: Key) {
504 8323044 : self.add_range(singleton_range(key))
505 8323044 : }
506 :
507 : #[inline(always)]
508 11267137 : pub fn add_range(&mut self, range: Range<Key>) {
509 11267137 : self.size += ShardedRange::raw_size(&range) as u64;
510 11267137 :
511 11267137 : match self.accum.as_mut() {
512 8270876 : Some(accum) => {
513 8270876 : if range.start == accum.end {
514 8256960 : accum.end = range.end;
515 8256960 : } else {
516 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
517 : // a new range here if the skipped region was all keys that don't belong on this shard.
518 : // (https://github.com/neondatabase/neon/issues/6247)
519 13916 : assert!(range.start > accum.end);
520 13916 : self.ranges.push(accum.clone());
521 13916 : *accum = range;
522 : }
523 : }
524 2996261 : None => self.accum = Some(range),
525 : }
526 11267137 : }
527 :
528 5186344 : pub fn to_keyspace(mut self) -> KeySpace {
529 5186344 : if let Some(accum) = self.accum.take() {
530 2996233 : self.ranges.push(accum);
531 2996233 : }
532 5186344 : KeySpace {
533 5186344 : ranges: self.ranges,
534 5186344 : }
535 5186344 : }
536 :
537 2658 : pub fn consume_keyspace(&mut self) -> KeySpace {
538 2658 : std::mem::take(self).to_keyspace()
539 2658 : }
540 :
541 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
542 : // the keys to be omitted in storage on this shard.
543 5759 : pub fn raw_size(&self) -> u64 {
544 5759 : self.size
545 5759 : }
546 : }
547 :
548 : ///
549 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
550 : /// object. Key ranges may be inserted in any order and can overlap.
551 : ///
552 : #[derive(Clone, Debug, Default)]
553 : pub struct KeySpaceRandomAccum {
554 : ranges: Vec<Range<Key>>,
555 : }
556 :
557 : impl KeySpaceRandomAccum {
558 17546781 : pub fn new() -> Self {
559 17546781 : Self { ranges: Vec::new() }
560 17546781 : }
561 :
562 1208636 : pub fn add_key(&mut self, key: Key) {
563 1208636 : self.add_range(singleton_range(key))
564 1208636 : }
565 :
566 5625879 : pub fn add_range(&mut self, range: Range<Key>) {
567 5625879 : self.ranges.push(range);
568 5625879 : }
569 :
570 1693729 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
571 3387466 : for range in keyspace.ranges {
572 1693737 : self.add_range(range);
573 1693737 : }
574 1693729 : }
575 :
576 12893958 : pub fn to_keyspace(mut self) -> KeySpace {
577 12893958 : let mut ranges = Vec::new();
578 12893958 : if !self.ranges.is_empty() {
579 5455927 : self.ranges.sort_by_key(|r| r.start);
580 5455927 : let mut start = self.ranges.first().unwrap().start;
581 5455927 : let mut end = self.ranges.first().unwrap().end;
582 11081712 : for r in self.ranges {
583 5625785 : assert!(r.start >= start);
584 5625785 : if r.start > end {
585 3168 : ranges.push(start..end);
586 3168 : start = r.start;
587 3168 : end = r.end;
588 5622617 : } else if r.end > end {
589 9218 : end = r.end;
590 5613399 : }
591 : }
592 5455927 : ranges.push(start..end);
593 7438031 : }
594 12893958 : KeySpace { ranges }
595 12893958 : }
596 :
597 6794936 : pub fn consume_keyspace(&mut self) -> KeySpace {
598 6794936 : let mut prev_accum = KeySpaceRandomAccum::new();
599 6794936 : std::mem::swap(self, &mut prev_accum);
600 6794936 :
601 6794936 : prev_accum.to_keyspace()
602 6794936 : }
603 : }
604 :
605 9531680 : pub fn singleton_range(key: Key) -> Range<Key> {
606 9531680 : key..key.next()
607 9531680 : }
608 :
609 : #[cfg(test)]
610 : mod tests {
611 : use std::fmt::Write;
612 :
613 : use rand::{RngCore, SeedableRng};
614 :
615 : use super::*;
616 : use crate::models::ShardParameters;
617 : use crate::shard::{ShardCount, ShardNumber};
618 :
619 : // Helper function to create a key range.
620 : //
621 : // Make the tests below less verbose.
622 46 : fn kr(irange: Range<i128>) -> Range<Key> {
623 46 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
624 46 : }
625 :
626 : #[allow(dead_code)]
627 0 : fn dump_keyspace(ks: &KeySpace) {
628 0 : for r in ks.ranges.iter() {
629 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
630 0 : }
631 0 : }
632 :
633 11 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
634 11 : if actual.ranges != expected {
635 0 : let mut msg = String::new();
636 0 :
637 0 : writeln!(msg, "expected:").unwrap();
638 0 : for r in &expected {
639 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
640 0 : }
641 0 : writeln!(msg, "got:").unwrap();
642 0 : for r in &actual.ranges {
643 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
644 0 : }
645 0 : panic!("{}", msg);
646 11 : }
647 11 : }
648 :
649 : #[test]
650 1 : fn keyspace_consume() {
651 1 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
652 1 :
653 1 : let mut accum = KeySpaceAccum::new();
654 4 : for range in &ranges {
655 3 : accum.add_range(range.clone());
656 3 : }
657 :
658 1 : let expected_size: u64 = ranges
659 1 : .iter()
660 3 : .map(|r| ShardedRange::raw_size(r) as u64)
661 1 : .sum();
662 1 : assert_eq!(accum.raw_size(), expected_size);
663 :
664 1 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
665 1 : assert_eq!(accum.raw_size(), 0);
666 :
667 1 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
668 1 : assert_eq!(accum.raw_size(), 0);
669 :
670 4 : for range in &ranges {
671 3 : accum.add_range(range.clone());
672 3 : }
673 1 : assert_ks_eq(&accum.to_keyspace(), ranges);
674 1 : }
675 :
676 : #[test]
677 1 : fn keyspace_add_range() {
678 1 : // two separate ranges
679 1 : //
680 1 : // #####
681 1 : // #####
682 1 : let mut ks = KeySpaceRandomAccum::default();
683 1 : ks.add_range(kr(0..10));
684 1 : ks.add_range(kr(20..30));
685 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
686 1 :
687 1 : // two separate ranges, added in reverse order
688 1 : //
689 1 : // #####
690 1 : // #####
691 1 : let mut ks = KeySpaceRandomAccum::default();
692 1 : ks.add_range(kr(20..30));
693 1 : ks.add_range(kr(0..10));
694 1 :
695 1 : // add range that is adjacent to the end of an existing range
696 1 : //
697 1 : // #####
698 1 : // #####
699 1 : ks.add_range(kr(0..10));
700 1 : ks.add_range(kr(10..30));
701 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
702 1 :
703 1 : // add range that is adjacent to the start of an existing range
704 1 : //
705 1 : // #####
706 1 : // #####
707 1 : let mut ks = KeySpaceRandomAccum::default();
708 1 : ks.add_range(kr(10..30));
709 1 : ks.add_range(kr(0..10));
710 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
711 1 :
712 1 : // add range that overlaps with the end of an existing range
713 1 : //
714 1 : // #####
715 1 : // #####
716 1 : let mut ks = KeySpaceRandomAccum::default();
717 1 : ks.add_range(kr(0..10));
718 1 : ks.add_range(kr(5..30));
719 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
720 1 :
721 1 : // add range that overlaps with the start of an existing range
722 1 : //
723 1 : // #####
724 1 : // #####
725 1 : let mut ks = KeySpaceRandomAccum::default();
726 1 : ks.add_range(kr(5..30));
727 1 : ks.add_range(kr(0..10));
728 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
729 1 :
730 1 : // add range that is fully covered by an existing range
731 1 : //
732 1 : // #########
733 1 : // #####
734 1 : let mut ks = KeySpaceRandomAccum::default();
735 1 : ks.add_range(kr(0..30));
736 1 : ks.add_range(kr(10..20));
737 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
738 1 :
739 1 : // add range that extends an existing range from both ends
740 1 : //
741 1 : // #####
742 1 : // #########
743 1 : let mut ks = KeySpaceRandomAccum::default();
744 1 : ks.add_range(kr(10..20));
745 1 : ks.add_range(kr(0..30));
746 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
747 1 :
748 1 : // add a range that overlaps with two existing ranges, joining them
749 1 : //
750 1 : // ##### #####
751 1 : // #######
752 1 : let mut ks = KeySpaceRandomAccum::default();
753 1 : ks.add_range(kr(0..10));
754 1 : ks.add_range(kr(20..30));
755 1 : ks.add_range(kr(5..25));
756 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
757 1 : }
758 :
759 : #[test]
760 1 : fn keyspace_overlaps() {
761 1 : let mut ks = KeySpaceRandomAccum::default();
762 1 : ks.add_range(kr(10..20));
763 1 : ks.add_range(kr(30..40));
764 1 : let ks = ks.to_keyspace();
765 1 :
766 1 : // ##### #####
767 1 : // xxxx
768 1 : assert!(!ks.overlaps(&kr(0..5)));
769 :
770 : // ##### #####
771 : // xxxx
772 1 : assert!(!ks.overlaps(&kr(5..9)));
773 :
774 : // ##### #####
775 : // xxxx
776 1 : assert!(!ks.overlaps(&kr(5..10)));
777 :
778 : // ##### #####
779 : // xxxx
780 1 : assert!(ks.overlaps(&kr(5..11)));
781 :
782 : // ##### #####
783 : // xxxx
784 1 : assert!(ks.overlaps(&kr(10..15)));
785 :
786 : // ##### #####
787 : // xxxx
788 1 : assert!(ks.overlaps(&kr(15..20)));
789 :
790 : // ##### #####
791 : // xxxx
792 1 : assert!(ks.overlaps(&kr(15..25)));
793 :
794 : // ##### #####
795 : // xxxx
796 1 : assert!(!ks.overlaps(&kr(22..28)));
797 :
798 : // ##### #####
799 : // xxxx
800 1 : assert!(!ks.overlaps(&kr(25..30)));
801 :
802 : // ##### #####
803 : // xxxx
804 1 : assert!(ks.overlaps(&kr(35..35)));
805 :
806 : // ##### #####
807 : // xxxx
808 1 : assert!(!ks.overlaps(&kr(40..45)));
809 :
810 : // ##### #####
811 : // xxxx
812 1 : assert!(!ks.overlaps(&kr(45..50)));
813 :
814 : // ##### #####
815 : // xxxxxxxxxxx
816 1 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
817 1 : }
818 :
819 : #[test]
820 1 : fn test_remove_full_overlapps() {
821 1 : let mut key_space1 = KeySpace {
822 1 : ranges: vec![
823 1 : Key::from_i128(1)..Key::from_i128(4),
824 1 : Key::from_i128(5)..Key::from_i128(8),
825 1 : Key::from_i128(10)..Key::from_i128(12),
826 1 : ],
827 1 : };
828 1 : let key_space2 = KeySpace {
829 1 : ranges: vec![
830 1 : Key::from_i128(2)..Key::from_i128(3),
831 1 : Key::from_i128(6)..Key::from_i128(7),
832 1 : Key::from_i128(11)..Key::from_i128(13),
833 1 : ],
834 1 : };
835 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
836 1 : let removed_expected = KeySpace {
837 1 : ranges: vec![
838 1 : Key::from_i128(2)..Key::from_i128(3),
839 1 : Key::from_i128(6)..Key::from_i128(7),
840 1 : Key::from_i128(11)..Key::from_i128(12),
841 1 : ],
842 1 : };
843 1 : assert_eq!(removed, removed_expected);
844 :
845 1 : assert_eq!(
846 1 : key_space1.ranges,
847 1 : vec![
848 1 : Key::from_i128(1)..Key::from_i128(2),
849 1 : Key::from_i128(3)..Key::from_i128(4),
850 1 : Key::from_i128(5)..Key::from_i128(6),
851 1 : Key::from_i128(7)..Key::from_i128(8),
852 1 : Key::from_i128(10)..Key::from_i128(11)
853 1 : ]
854 1 : );
855 1 : }
856 :
857 : #[test]
858 1 : fn test_remove_partial_overlaps() {
859 1 : // Test partial ovelaps
860 1 : let mut key_space1 = KeySpace {
861 1 : ranges: vec![
862 1 : Key::from_i128(1)..Key::from_i128(5),
863 1 : Key::from_i128(7)..Key::from_i128(10),
864 1 : Key::from_i128(12)..Key::from_i128(15),
865 1 : ],
866 1 : };
867 1 : let key_space2 = KeySpace {
868 1 : ranges: vec![
869 1 : Key::from_i128(3)..Key::from_i128(6),
870 1 : Key::from_i128(8)..Key::from_i128(11),
871 1 : Key::from_i128(14)..Key::from_i128(17),
872 1 : ],
873 1 : };
874 1 :
875 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
876 1 : let removed_expected = KeySpace {
877 1 : ranges: vec![
878 1 : Key::from_i128(3)..Key::from_i128(5),
879 1 : Key::from_i128(8)..Key::from_i128(10),
880 1 : Key::from_i128(14)..Key::from_i128(15),
881 1 : ],
882 1 : };
883 1 : assert_eq!(removed, removed_expected);
884 :
885 1 : assert_eq!(
886 1 : key_space1.ranges,
887 1 : vec![
888 1 : Key::from_i128(1)..Key::from_i128(3),
889 1 : Key::from_i128(7)..Key::from_i128(8),
890 1 : Key::from_i128(12)..Key::from_i128(14),
891 1 : ]
892 1 : );
893 1 : }
894 :
895 : #[test]
896 1 : fn test_remove_no_overlaps() {
897 1 : let mut key_space1 = KeySpace {
898 1 : ranges: vec![
899 1 : Key::from_i128(1)..Key::from_i128(5),
900 1 : Key::from_i128(7)..Key::from_i128(10),
901 1 : Key::from_i128(12)..Key::from_i128(15),
902 1 : ],
903 1 : };
904 1 : let key_space2 = KeySpace {
905 1 : ranges: vec![
906 1 : Key::from_i128(6)..Key::from_i128(7),
907 1 : Key::from_i128(11)..Key::from_i128(12),
908 1 : Key::from_i128(15)..Key::from_i128(17),
909 1 : ],
910 1 : };
911 1 :
912 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
913 1 : let removed_expected = KeySpace::default();
914 1 : assert_eq!(removed, removed_expected);
915 :
916 1 : assert_eq!(
917 1 : key_space1.ranges,
918 1 : vec![
919 1 : Key::from_i128(1)..Key::from_i128(5),
920 1 : Key::from_i128(7)..Key::from_i128(10),
921 1 : Key::from_i128(12)..Key::from_i128(15),
922 1 : ]
923 1 : );
924 1 : }
925 :
926 : #[test]
927 1 : fn test_remove_one_range_overlaps_multiple() {
928 1 : let mut key_space1 = KeySpace {
929 1 : ranges: vec![
930 1 : Key::from_i128(1)..Key::from_i128(3),
931 1 : Key::from_i128(3)..Key::from_i128(6),
932 1 : Key::from_i128(6)..Key::from_i128(10),
933 1 : Key::from_i128(12)..Key::from_i128(15),
934 1 : Key::from_i128(17)..Key::from_i128(20),
935 1 : Key::from_i128(20)..Key::from_i128(30),
936 1 : Key::from_i128(30)..Key::from_i128(40),
937 1 : ],
938 1 : };
939 1 : let key_space2 = KeySpace {
940 1 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
941 1 : };
942 1 :
943 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
944 1 : let removed_expected = KeySpace {
945 1 : ranges: vec![
946 1 : Key::from_i128(9)..Key::from_i128(10),
947 1 : Key::from_i128(12)..Key::from_i128(15),
948 1 : Key::from_i128(17)..Key::from_i128(19),
949 1 : ],
950 1 : };
951 1 : assert_eq!(removed, removed_expected);
952 :
953 1 : assert_eq!(
954 1 : key_space1.ranges,
955 1 : vec![
956 1 : Key::from_i128(1)..Key::from_i128(3),
957 1 : Key::from_i128(3)..Key::from_i128(6),
958 1 : Key::from_i128(6)..Key::from_i128(9),
959 1 : Key::from_i128(19)..Key::from_i128(20),
960 1 : Key::from_i128(20)..Key::from_i128(30),
961 1 : Key::from_i128(30)..Key::from_i128(40),
962 1 : ]
963 1 : );
964 1 : }
965 : #[test]
966 1 : fn sharded_range_relation_gap() {
967 1 : let shard_identity = ShardIdentity::new(
968 1 : ShardNumber(0),
969 1 : ShardCount::new(4),
970 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
971 1 : )
972 1 : .unwrap();
973 1 :
974 1 : let range = ShardedRange::new(
975 1 : Range {
976 1 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
977 1 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
978 1 : },
979 1 : &shard_identity,
980 1 : );
981 1 :
982 1 : // Key range spans relations, expect MAX
983 1 : assert_eq!(range.page_count(), u32::MAX);
984 1 : }
985 :
986 : #[test]
987 1 : fn shard_identity_keyspaces_single_key() {
988 1 : let shard_identity = ShardIdentity::new(
989 1 : ShardNumber(1),
990 1 : ShardCount::new(4),
991 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
992 1 : )
993 1 : .unwrap();
994 1 :
995 1 : let range = ShardedRange::new(
996 1 : Range {
997 1 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
998 1 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
999 1 : },
1000 1 : &shard_identity,
1001 1 : );
1002 1 : // Single-key range on logical size key
1003 1 : assert_eq!(range.page_count(), 1);
1004 1 : }
1005 :
1006 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
1007 : #[test]
1008 1 : fn contiguous_range_check() {
1009 1 : assert!(!is_contiguous_range(
1010 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1011 1 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1012 1 : ),);
1013 :
1014 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1015 : // not considered a rel block range because 0xffffffff stores logical sizes,
1016 : // not blocks.
1017 1 : assert!(!is_contiguous_range(
1018 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1019 1 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1020 1 : ),);
1021 :
1022 : // Keys within the normal data region of a relation
1023 1 : assert!(is_contiguous_range(
1024 1 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1025 1 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1026 1 : ),);
1027 :
1028 : // The logical size key of one forkno, then some blocks in the next
1029 1 : assert!(is_contiguous_range(
1030 1 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1031 1 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1032 1 : ),);
1033 1 : }
1034 :
1035 : #[test]
1036 1 : fn shard_identity_keyspaces_forkno_gap() {
1037 1 : let shard_identity = ShardIdentity::new(
1038 1 : ShardNumber(1),
1039 1 : ShardCount::new(4),
1040 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1041 1 : )
1042 1 : .unwrap();
1043 1 :
1044 1 : let range = ShardedRange::new(
1045 1 : Range {
1046 1 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1047 1 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1048 1 : },
1049 1 : &shard_identity,
1050 1 : );
1051 1 :
1052 1 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1053 1 : // calculate a valid size, because we have no way to know if they keys between start
1054 1 : // and end are actually in use.
1055 1 : assert_eq!(range.page_count(), u32::MAX);
1056 1 : }
1057 :
1058 : #[test]
1059 1 : fn shard_identity_keyspaces_one_relation() {
1060 5 : for shard_number in 0..4 {
1061 4 : let shard_identity = ShardIdentity::new(
1062 4 : ShardNumber(shard_number),
1063 4 : ShardCount::new(4),
1064 4 : ShardParameters::DEFAULT_STRIPE_SIZE,
1065 4 : )
1066 4 : .unwrap();
1067 4 :
1068 4 : let range = ShardedRange::new(
1069 4 : Range {
1070 4 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1071 4 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1072 4 : },
1073 4 : &shard_identity,
1074 4 : );
1075 4 :
1076 4 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1077 4 : if shard_number == 0 {
1078 1 : assert_eq!(range.page_count(), 1);
1079 : } else {
1080 : // Other shards should perceive the range's size as zero
1081 3 : assert_eq!(range.page_count(), 0);
1082 : }
1083 : }
1084 1 : }
1085 :
1086 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1087 : /// the total page count in the range and the fragments.
1088 1012 : fn do_fragment(
1089 1012 : range_start: Key,
1090 1012 : range_end: Key,
1091 1012 : shard_identity: &ShardIdentity,
1092 1012 : target_nblocks: u32,
1093 1012 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1094 1012 : let range = ShardedRange::new(
1095 1012 : Range {
1096 1012 : start: range_start,
1097 1012 : end: range_end,
1098 1012 : },
1099 1012 : shard_identity,
1100 1012 : );
1101 1012 :
1102 1012 : let page_count = range.page_count();
1103 1012 : let fragments = range.fragment(target_nblocks);
1104 1012 :
1105 1012 : // Invariant: we always get at least one fragment
1106 1012 : assert!(!fragments.is_empty());
1107 :
1108 : // Invariant: the first/last fragment start/end should equal the input start/end
1109 1012 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1110 1012 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1111 :
1112 1012 : if page_count > 0 {
1113 : // Invariant: every fragment must contain at least one shard-local page, if the
1114 : // total range contains at least one shard-local page
1115 687 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1116 554 : if !all_nonzero {
1117 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1118 554 : }
1119 554 : assert!(all_nonzero);
1120 : } else {
1121 : // A range with no shard-local pages should always be returned as a single fragment
1122 458 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1123 : }
1124 :
1125 : // Invariant: fragments must be ordered and non-overlapping
1126 1012 : let mut last: Option<Range<Key>> = None;
1127 2157 : for frag in &fragments {
1128 1145 : if let Some(last) = last {
1129 133 : assert!(frag.1.start >= last.end);
1130 133 : assert!(frag.1.start > last.start);
1131 1012 : }
1132 1145 : last = Some(frag.1.clone())
1133 : }
1134 :
1135 : // Invariant: fragments respect target_nblocks
1136 2157 : for frag in &fragments {
1137 1145 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1138 : }
1139 :
1140 1012 : (page_count, fragments)
1141 1012 : }
1142 :
1143 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1144 : /// for a single tenant.
1145 : #[test]
1146 1 : fn sharded_range_fragment_simple() {
1147 1 : let shard_identity = ShardIdentity::new(
1148 1 : ShardNumber(0),
1149 1 : ShardCount::new(4),
1150 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1151 1 : )
1152 1 : .unwrap();
1153 1 :
1154 1 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1155 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1156 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
1157 1 :
1158 1 : // Ask for stripe_size blocks, we get the whole stripe
1159 1 : assert_eq!(
1160 1 : do_fragment(input_start, input_end, &shard_identity, 32768),
1161 1 : (32768, vec![(32768, input_start..input_end)])
1162 1 : );
1163 :
1164 : // Ask for more, we still get the whole stripe
1165 1 : assert_eq!(
1166 1 : do_fragment(input_start, input_end, &shard_identity, 10000000),
1167 1 : (32768, vec![(32768, input_start..input_end)])
1168 1 : );
1169 :
1170 : // Ask for target_nblocks of half the stripe size, we get two halves
1171 1 : assert_eq!(
1172 1 : do_fragment(input_start, input_end, &shard_identity, 16384),
1173 1 : (
1174 1 : 32768,
1175 1 : vec![
1176 1 : (16384, input_start..input_start.add(16384)),
1177 1 : (16384, input_start.add(16384)..input_end)
1178 1 : ]
1179 1 : )
1180 1 : );
1181 1 : }
1182 :
1183 : #[test]
1184 1 : fn sharded_range_fragment_multi_stripe() {
1185 1 : let shard_identity = ShardIdentity::new(
1186 1 : ShardNumber(0),
1187 1 : ShardCount::new(4),
1188 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1189 1 : )
1190 1 : .unwrap();
1191 1 :
1192 1 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1193 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1194 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1195 1 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1196 1 : // its size to be just the blocks belonging to our shard.
1197 1 : assert_eq!(
1198 1 : do_fragment(input_start, input_end, &shard_identity, 131072),
1199 1 : (32768, vec![(32768, input_start..input_end)])
1200 1 : );
1201 :
1202 : // Ask for a sub-stripe quantity
1203 1 : assert_eq!(
1204 1 : do_fragment(input_start, input_end, &shard_identity, 16000),
1205 1 : (
1206 1 : 32768,
1207 1 : vec![
1208 1 : (16000, input_start..input_start.add(16000)),
1209 1 : (16000, input_start.add(16000)..input_start.add(32000)),
1210 1 : (768, input_start.add(32000)..input_end),
1211 1 : ]
1212 1 : )
1213 1 : );
1214 :
1215 : // Try on a range that starts slightly after our owned stripe
1216 1 : assert_eq!(
1217 1 : do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
1218 1 : (32767, vec![(32767, input_start.add(1)..input_end)])
1219 1 : );
1220 1 : }
1221 :
1222 : /// Test our calculations work correctly when we start a range from the logical size key of
1223 : /// a previous relation.
1224 : #[test]
1225 1 : fn sharded_range_fragment_starting_from_logical_size() {
1226 1 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1227 1 : let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
1228 1 :
1229 1 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1230 1 : let shard_identity = ShardIdentity::new(
1231 1 : ShardNumber(0),
1232 1 : ShardCount::new(4),
1233 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1234 1 : )
1235 1 : .unwrap();
1236 1 : assert_eq!(
1237 1 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1238 1 : (0x8001, vec![(0x8001, input_start..input_end)])
1239 1 : );
1240 :
1241 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1242 : // store all logical sizes)
1243 1 : let shard_identity = ShardIdentity::new(
1244 1 : ShardNumber(1),
1245 1 : ShardCount::new(4),
1246 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1247 1 : )
1248 1 : .unwrap();
1249 1 : assert_eq!(
1250 1 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1251 1 : (0x1, vec![(0x1, input_start..input_end)])
1252 1 : );
1253 1 : }
1254 :
1255 : /// Test that ShardedRange behaves properly when used on un-sharded data
1256 : #[test]
1257 1 : fn sharded_range_fragment_unsharded() {
1258 1 : let shard_identity = ShardIdentity::unsharded();
1259 1 :
1260 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1261 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1262 1 : assert_eq!(
1263 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1264 1 : (
1265 1 : 0x10000,
1266 1 : vec![
1267 1 : (0x8000, input_start..input_start.add(0x8000)),
1268 1 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1269 1 : ]
1270 1 : )
1271 1 : );
1272 1 : }
1273 :
1274 : #[test]
1275 1 : fn sharded_range_fragment_cross_relation() {
1276 1 : let shard_identity = ShardIdentity::unsharded();
1277 1 :
1278 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1279 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1280 1 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1281 1 : assert_eq!(
1282 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1283 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1284 1 : );
1285 :
1286 : // Same, but using a sharded identity
1287 1 : let shard_identity = ShardIdentity::new(
1288 1 : ShardNumber(0),
1289 1 : ShardCount::new(4),
1290 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1291 1 : )
1292 1 : .unwrap();
1293 1 : assert_eq!(
1294 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1295 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1296 1 : );
1297 1 : }
1298 :
1299 : #[test]
1300 1 : fn sharded_range_fragment_tiny_nblocks() {
1301 1 : let shard_identity = ShardIdentity::unsharded();
1302 1 :
1303 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1304 1 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1305 1 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1306 1 : assert_eq!(
1307 1 : do_fragment(input_start, input_end, &shard_identity, 16),
1308 1 : (
1309 1 : 0x38,
1310 1 : vec![
1311 1 : (16, input_start..input_start.add(16)),
1312 1 : (16, input_start.add(16)..input_start.add(32)),
1313 1 : (16, input_start.add(32)..input_start.add(48)),
1314 1 : (8, input_start.add(48)..input_end),
1315 1 : ]
1316 1 : )
1317 1 : );
1318 1 : }
1319 :
1320 : #[test]
1321 1 : fn sharded_range_fragment_fuzz() {
1322 1 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1323 1 : // the test to be reproducible.
1324 1 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1325 :
1326 1001 : for _i in 0..1000 {
1327 1000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1328 519 : ShardIdentity::unsharded()
1329 : } else {
1330 481 : let shard_count = prng.next_u32() % 127 + 1;
1331 481 : ShardIdentity::new(
1332 481 : ShardNumber((prng.next_u32() % shard_count) as u8),
1333 481 : ShardCount::new(shard_count as u8),
1334 481 : ShardParameters::DEFAULT_STRIPE_SIZE,
1335 481 : )
1336 481 : .unwrap()
1337 : };
1338 :
1339 1000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1340 1000 :
1341 1000 : let start_offset = prng.next_u32() % 16384;
1342 1000 :
1343 1000 : // Try ranges up to 4GiB in size, that are always at least 1
1344 1000 : let range_size = prng.next_u32() % 8192 + 1;
1345 1000 :
1346 1000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1347 1000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1348 1000 : .unwrap()
1349 1000 : .add(start_offset);
1350 1000 : let input_end = input_start.add(range_size);
1351 1000 :
1352 1000 : // This test's main success conditions are the invariants baked into do_fragment
1353 1000 : let (_total_size, fragments) =
1354 1000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1355 1000 :
1356 1000 : // Pick a random key within the range and check it appears in the output
1357 1000 : let example_key = input_start.add(prng.next_u32() % range_size);
1358 1000 :
1359 1000 : // Panic on unwrap if it isn't found
1360 1000 : let example_key_frag = fragments
1361 1000 : .iter()
1362 1073 : .find(|f| f.1.contains(&example_key))
1363 1000 : .unwrap();
1364 1000 :
1365 1000 : // Check that the fragment containing our random key has a nonzero size if
1366 1000 : // that key is shard-local
1367 1000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1368 1000 : if example_key_local {
1369 542 : assert!(example_key_frag.0 > 0);
1370 458 : }
1371 : }
1372 1 : }
1373 : }
|