Line data Source code
1 : use std::ops::Range;
2 :
3 : use itertools::Itertools;
4 : use postgres_ffi::BLCKSZ;
5 :
6 : use crate::key::Key;
7 : use crate::shard::{ShardCount, ShardIdentity};
8 :
9 : ///
10 : /// Represents a set of Keys, in a compact form.
11 : ///
12 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
13 : pub struct KeySpace {
14 : /// Contiguous ranges of keys that belong to the key space. In key order,
15 : /// and with no overlap.
16 : pub ranges: Vec<Range<Key>>,
17 : }
18 :
19 : impl std::fmt::Display for KeySpace {
20 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 0 : write!(f, "[")?;
22 0 : for range in &self.ranges {
23 0 : write!(f, "{}..{},", range.start, range.end)?;
24 : }
25 0 : write!(f, "]")
26 0 : }
27 : }
28 :
29 : /// A wrapper type for sparse keyspaces.
30 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
31 : pub struct SparseKeySpace(pub KeySpace);
32 :
33 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
34 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
35 : /// shard.
36 : ///
37 : /// When we iterate over keys within this object, we will skip any keys that don't belong
38 : /// to this shard.
39 : ///
40 : /// The start + end keys may not belong to the shard: these specify where layer files should
41 : /// start + end, but we will never actually read/write those keys.
42 : #[derive(Clone, Debug, PartialEq, Eq)]
43 : pub struct ShardedRange<'a> {
44 : pub shard_identity: &'a ShardIdentity,
45 : pub range: Range<Key>,
46 : }
47 :
48 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
49 : // top page in the previous relation's space.
50 12207838 : pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
51 12207838 : debug_assert!(is_contiguous_range(range));
52 12207838 : if range.start.field6 == 0xffffffff {
53 1102820 : range.end.field6 + 1
54 : } else {
55 11105018 : range.end.field6 - range.start.field6
56 : }
57 12207838 : }
58 :
59 : /// Return true if this key range includes only keys in the same relation's data blocks, or
60 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
61 : ///
62 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
63 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
64 : /// of a given contiguous range is present on one shard.
65 : ///
66 : /// This matters, because:
67 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
68 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
69 24416637 : pub fn is_contiguous_range(range: &Range<Key>) -> bool {
70 24416637 : range.start.field1 == range.end.field1
71 24413689 : && range.start.field2 == range.end.field2
72 24413685 : && range.start.field3 == range.end.field3
73 24413677 : && range.start.field4 == range.end.field4
74 24413676 : && (range.start.field5 == range.end.field5
75 2205644 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
76 24416637 : }
77 :
78 : impl<'a> ShardedRange<'a> {
79 4943 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
80 4943 : Self {
81 4943 : shard_identity,
82 4943 : range,
83 4943 : }
84 4943 : }
85 :
86 : /// Break up this range into chunks, each of which has at least one local key in it if the
87 : /// total range has at least one local key.
88 4936 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
89 4936 : // Optimization for single-key case (e.g. logical size keys)
90 4936 : if self.range.end == self.range.start.add(1) {
91 3265 : return vec![(
92 3265 : if self.shard_identity.is_key_disposable(&self.range.start) {
93 0 : 0
94 : } else {
95 3265 : 1
96 : },
97 3265 : self.range,
98 : )];
99 1671 : }
100 1671 :
101 1671 : if !is_contiguous_range(&self.range) {
102 : // Ranges that span relations are not fragmented. We only get these ranges as a result
103 : // of operations that act on existing layers, so we trust that the existing range is
104 : // reasonably small.
105 2 : return vec![(u32::MAX, self.range)];
106 1669 : }
107 1669 :
108 1669 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
109 1669 :
110 1669 : let mut cursor = self.range.start;
111 4411 : while cursor < self.range.end {
112 2742 : let advance_by = self.distance_to_next_boundary(cursor);
113 2742 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
114 :
115 : // If the previous fragment is undersized, then we seek to consume enough
116 : // blocks to complete it.
117 2742 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
118 1073 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
119 141 : Some(frag) => {
120 141 : // Prev block is complete, want the full number.
121 141 : (
122 141 : target_nblocks,
123 141 : if is_fragment_disposable {
124 : // If this current range will be empty (not shard-local data), we will merge into previous
125 0 : Some(frag)
126 : } else {
127 141 : None
128 : },
129 : )
130 : }
131 : None => {
132 : // First iteration, want the full number
133 1669 : (target_nblocks, None)
134 : }
135 : };
136 :
137 2742 : let advance_by = if is_fragment_disposable {
138 1371 : advance_by
139 : } else {
140 1371 : std::cmp::min(advance_by, want_blocks)
141 : };
142 :
143 2742 : let next_cursor = cursor.add(advance_by);
144 :
145 2742 : let this_frag = (
146 2742 : if is_fragment_disposable {
147 1371 : 0
148 : } else {
149 1371 : advance_by
150 : },
151 2742 : cursor..next_cursor,
152 2742 : );
153 2742 : cursor = next_cursor;
154 :
155 2742 : if let Some(last_fragment) = merge_last_fragment {
156 932 : // Previous fragment was short or this one is empty, merge into it
157 932 : last_fragment.0 += this_frag.0;
158 932 : last_fragment.1.end = this_frag.1.end;
159 1810 : } else {
160 1810 : fragments.push(this_frag);
161 1810 : }
162 : }
163 :
164 1669 : fragments
165 4936 : }
166 :
167 : /// Estimate the physical pages that are within this range, on this shard. This returns
168 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
169 1019 : pub fn page_count(&self) -> u32 {
170 1019 : // Special cases for single keys like logical sizes
171 1019 : if self.range.end == self.range.start.add(1) {
172 6 : return if self.shard_identity.is_key_disposable(&self.range.start) {
173 3 : 0
174 : } else {
175 3 : 1
176 : };
177 1013 : }
178 1013 :
179 1013 : // We can only do an authentic calculation of contiguous key ranges
180 1013 : if !is_contiguous_range(&self.range) {
181 4 : return u32::MAX;
182 1009 : }
183 1009 :
184 1009 : // Special case for single sharded tenants: our logical and physical sizes are the same
185 1009 : if self.shard_identity.count < ShardCount::new(2) {
186 524 : return contiguous_range_len(&self.range);
187 485 : }
188 485 :
189 485 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
190 485 : // to Self, and add the stripe's block count to our total if so.
191 485 : let mut result: u64 = 0;
192 485 : let mut cursor = self.range.start;
193 1902 : while cursor < self.range.end {
194 : // Count up to the next stripe_size boundary or end of range
195 1417 : let advance_by = self.distance_to_next_boundary(cursor);
196 1417 :
197 1417 : // If this blocks in this stripe belong to us, add them to our count
198 1417 : if !self.shard_identity.is_key_disposable(&cursor) {
199 46 : result += advance_by as u64;
200 1371 : }
201 :
202 1417 : cursor = cursor.add(advance_by);
203 : }
204 :
205 485 : if result > u32::MAX as u64 {
206 0 : u32::MAX
207 : } else {
208 485 : result as u32
209 : }
210 1019 : }
211 :
212 : /// Advance the cursor to the next potential fragment boundary: this is either
213 : /// a stripe boundary, or the end of the range.
214 4159 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
215 4159 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
216 4159 :
217 4159 : if self.shard_identity.count < ShardCount::new(2) {
218 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
219 1318 : return distance_to_range_end;
220 2841 : }
221 2841 :
222 2841 : if cursor.field6 == 0xffffffff {
223 : // We are wrapping from one relation's logical size to the next relation's first data block
224 4 : return 1;
225 2837 : }
226 2837 :
227 2837 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
228 2837 : let stripe_remainder = self.shard_identity.stripe_size.0
229 2837 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
230 2837 :
231 2837 : if cfg!(debug_assertions) {
232 : // We should never overflow field5 and field6 -- our callers check this earlier
233 : // and would have returned their u32::MAX cases if the input range violated this.
234 2837 : let next_cursor = cursor.add(stripe_remainder);
235 2837 : debug_assert!(
236 2837 : next_cursor.field1 == cursor.field1
237 2837 : && next_cursor.field2 == cursor.field2
238 2837 : && next_cursor.field3 == cursor.field3
239 2837 : && next_cursor.field4 == cursor.field4
240 2837 : && next_cursor.field5 == cursor.field5
241 : )
242 0 : }
243 :
244 2837 : std::cmp::min(stripe_remainder, distance_to_range_end)
245 4159 : }
246 :
247 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
248 : /// this function simply calculates the number of pages in the space, without accounting for those
249 : /// pages that would not actually be stored on this node.
250 : ///
251 : /// Don't use this function in code that works with physical entities like layer files.
252 12206111 : pub fn raw_size(range: &Range<Key>) -> u32 {
253 12206111 : if is_contiguous_range(range) {
254 12203155 : contiguous_range_len(range)
255 : } else {
256 2956 : u32::MAX
257 : }
258 12206111 : }
259 : }
260 :
261 : impl KeySpace {
262 : /// Create a key space with a single range.
263 45440 : pub fn single(key_range: Range<Key>) -> Self {
264 45440 : Self {
265 45440 : ranges: vec![key_range],
266 45440 : }
267 45440 : }
268 :
269 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
270 : /// in each partition.
271 : ///
272 660 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
273 660 : // Assume that each value is 8k in size.
274 660 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
275 660 :
276 660 : let mut parts = Vec::new();
277 660 : let mut current_part = Vec::new();
278 660 : let mut current_part_size: usize = 0;
279 4584 : for range in &self.ranges {
280 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
281 : // will respect shard striping rather than assuming all keys within a range are present.
282 3924 : let range = ShardedRange::new(range.clone(), shard_identity);
283 :
284 : // Chunk up the range into parts that each contain up to target_size local blocks
285 3932 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
286 : // If appending the next contiguous range in the keyspace to the current
287 : // partition would cause it to be too large, and our current partition
288 : // covers at least one block that is physically present in this shard,
289 : // then start a new partition
290 3932 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
291 48 : && current_part_size > 0
292 48 : {
293 48 : parts.push(KeySpace {
294 48 : ranges: current_part,
295 48 : });
296 48 : current_part = Vec::new();
297 48 : current_part_size = 0;
298 3884 : }
299 3932 : current_part.push(frag_range.start..frag_range.end);
300 3932 : current_part_size += frag_on_shard_size as usize;
301 : }
302 : }
303 :
304 : // add last partition that wasn't full yet.
305 660 : if !current_part.is_empty() {
306 660 : parts.push(KeySpace {
307 660 : ranges: current_part,
308 660 : });
309 660 : }
310 :
311 660 : KeyPartitioning { parts }
312 660 : }
313 :
314 3858357 : pub fn is_empty(&self) -> bool {
315 3858357 : self.total_raw_size() == 0
316 3858357 : }
317 :
318 : /// Merge another keyspace into the current one.
319 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
320 1693866 : pub fn merge(&mut self, other: &KeySpace) {
321 1693866 : let all_ranges = self
322 1693866 : .ranges
323 1693866 : .iter()
324 1693866 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
325 1693866 :
326 1693866 : let mut accum = KeySpaceAccum::new();
327 1693866 : let mut prev: Option<&Range<Key>> = None;
328 2901597 : for range in all_ranges {
329 1207731 : if let Some(prev) = prev {
330 68 : let overlap =
331 68 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
332 68 : assert!(
333 68 : !overlap,
334 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
335 : prev, range
336 : );
337 1207663 : }
338 :
339 1207731 : accum.add_range(range.clone());
340 1207731 : prev = Some(range);
341 : }
342 :
343 1693866 : self.ranges = accum.to_keyspace().ranges;
344 1693866 : }
345 :
346 : /// Remove all keys in `other` from `self`.
347 : /// This can involve splitting or removing of existing ranges.
348 : /// Returns the removed keyspace
349 6067484 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
350 6067484 : let (self_start, self_end) = match (self.start(), self.end()) {
351 4403991 : (Some(start), Some(end)) => (start, end),
352 : _ => {
353 : // self is empty
354 1663493 : return KeySpace::default();
355 : }
356 : };
357 :
358 : // Key spaces are sorted by definition, so skip ahead to the first
359 : // potentially intersecting range. Similarly, ignore ranges that start
360 : // after the current keyspace ends.
361 4403991 : let other_ranges = other
362 4403991 : .ranges
363 4403991 : .iter()
364 4403991 : .skip_while(|range| self_start >= range.end)
365 4403991 : .take_while(|range| self_end > range.start);
366 4403991 :
367 4403991 : let mut removed_accum = KeySpaceRandomAccum::new();
368 6911194 : for range in other_ranges {
369 5014890 : while let Some(overlap_at) = self.overlaps_at(range) {
370 2507687 : let overlapped = self.ranges[overlap_at].clone();
371 2507687 :
372 2507687 : if overlapped.start < range.start && overlapped.end <= range.end {
373 9 : // Higher part of the range is completely overlapped.
374 9 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
375 9 : self.ranges[overlap_at].end = range.start;
376 2507678 : }
377 2507687 : if overlapped.start >= range.start && overlapped.end > range.end {
378 9 : // Lower part of the range is completely overlapped.
379 9 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
380 9 : self.ranges[overlap_at].start = range.end;
381 2507678 : }
382 2507687 : if overlapped.start < range.start && overlapped.end > range.end {
383 26 : // Middle part of the range is overlapped.
384 26 : removed_accum.add_range(range.clone());
385 26 : self.ranges[overlap_at].end = range.start;
386 26 : self.ranges
387 26 : .insert(overlap_at + 1, range.end..overlapped.end);
388 2507661 : }
389 2507687 : if overlapped.start >= range.start && overlapped.end <= range.end {
390 2507643 : // Whole range is overlapped
391 2507643 : removed_accum.add_range(self.ranges[overlap_at].clone());
392 2507643 : self.ranges.remove(overlap_at);
393 2507643 : }
394 : }
395 : }
396 :
397 4403991 : removed_accum.to_keyspace()
398 6067484 : }
399 :
400 6067560 : pub fn start(&self) -> Option<Key> {
401 6067560 : self.ranges.first().map(|range| range.start)
402 6067560 : }
403 :
404 6068144 : pub fn end(&self) -> Option<Key> {
405 6068144 : self.ranges.last().map(|range| range.end)
406 6068144 : }
407 :
408 : /// The size of the keyspace in pages, before accounting for sharding
409 3897837 : pub fn total_raw_size(&self) -> usize {
410 3897837 : self.ranges
411 3897837 : .iter()
412 3897837 : .map(|range| ShardedRange::raw_size(range) as usize)
413 3897837 : .sum()
414 3897837 : }
415 :
416 5020531 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
417 5020531 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
418 25 : Ok(0) => None,
419 2509124 : Err(0) => None,
420 37 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
421 2511345 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
422 1537 : _ => None,
423 : }
424 5020531 : }
425 :
426 : ///
427 : /// Check if key space contains overlapping range
428 : ///
429 5641 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
430 5641 : self.overlaps_at(range).is_some()
431 5641 : }
432 :
433 : /// Check if the keyspace contains a key
434 1580 : pub fn contains(&self, key: &Key) -> bool {
435 1580 : self.overlaps(&(*key..key.next()))
436 1580 : }
437 : }
438 :
439 : ///
440 : /// Represents a partitioning of the key space.
441 : ///
442 : /// The only kind of partitioning we do is to partition the key space into
443 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
444 : /// But this data structure could represent any partitioning.
445 : ///
446 : #[derive(Clone, Debug, Default)]
447 : pub struct KeyPartitioning {
448 : pub parts: Vec<KeySpace>,
449 : }
450 :
451 : /// Represents a partitioning of the sparse key space.
452 : #[derive(Clone, Debug, Default)]
453 : pub struct SparseKeyPartitioning {
454 : pub parts: Vec<SparseKeySpace>,
455 : }
456 :
457 : impl KeyPartitioning {
458 1848 : pub fn new() -> Self {
459 1848 : KeyPartitioning { parts: Vec::new() }
460 1848 : }
461 :
462 : /// Convert a key partitioning to a sparse partition.
463 924 : pub fn into_sparse(self) -> SparseKeyPartitioning {
464 924 : SparseKeyPartitioning {
465 924 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
466 924 : }
467 924 : }
468 : }
469 :
470 : impl SparseKeyPartitioning {
471 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
472 : /// cause long/dead loops.
473 1156 : pub fn into_dense(self) -> KeyPartitioning {
474 1156 : KeyPartitioning {
475 1156 : parts: self.parts.into_iter().map(|x| x.0).collect(),
476 1156 : }
477 1156 : }
478 : }
479 :
480 : ///
481 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
482 : /// object. This takes care of merging adjacent keys and key ranges into
483 : /// contiguous ranges.
484 : ///
485 : #[derive(Clone, Debug, Default)]
486 : pub struct KeySpaceAccum {
487 : accum: Option<Range<Key>>,
488 :
489 : ranges: Vec<Range<Key>>,
490 : size: u64,
491 : }
492 :
493 : impl KeySpaceAccum {
494 1951030 : pub fn new() -> Self {
495 1951030 : Self {
496 1951030 : accum: None,
497 1951030 : ranges: Vec::new(),
498 1951030 : size: 0,
499 1951030 : }
500 1951030 : }
501 :
502 : #[inline(always)]
503 8323156 : pub fn add_key(&mut self, key: Key) {
504 8323156 : self.add_range(singleton_range(key))
505 8323156 : }
506 :
507 : #[inline(always)]
508 11268607 : pub fn add_range(&mut self, range: Range<Key>) {
509 11268607 : self.size += ShardedRange::raw_size(&range) as u64;
510 11268607 :
511 11268607 : match self.accum.as_mut() {
512 8270932 : Some(accum) => {
513 8270932 : if range.start == accum.end {
514 8256976 : accum.end = range.end;
515 8256976 : } else {
516 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
517 : // a new range here if the skipped region was all keys that don't belong on this shard.
518 : // (https://github.com/neondatabase/neon/issues/6247)
519 13956 : assert!(range.start > accum.end);
520 13956 : self.ranges.push(accum.clone());
521 13956 : *accum = range;
522 : }
523 : }
524 2997675 : None => self.accum = Some(range),
525 : }
526 1207737 : }
527 :
528 3483855 : pub fn to_keyspace(mut self) -> KeySpace {
529 3483855 : if let Some(accum) = self.accum.take() {
530 2997647 : self.ranges.push(accum);
531 2997647 : }
532 3483855 : KeySpace {
533 3483855 : ranges: self.ranges,
534 3483855 : }
535 3483855 : }
536 :
537 2706 : pub fn consume_keyspace(&mut self) -> KeySpace {
538 2706 : std::mem::take(self).to_keyspace()
539 2706 : }
540 :
541 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
542 : // the keys to be omitted in storage on this shard.
543 5863 : pub fn raw_size(&self) -> u64 {
544 5863 : self.size
545 5863 : }
546 : }
547 :
548 : ///
549 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
550 : /// object. Key ranges may be inserted in any order and can overlap.
551 : ///
552 : #[derive(Clone, Debug, Default)]
553 : pub struct KeySpaceRandomAccum {
554 : ranges: Vec<Range<Key>>,
555 : }
556 :
557 : impl KeySpaceRandomAccum {
558 14167072 : pub fn new() -> Self {
559 14167072 : Self { ranges: Vec::new() }
560 14167072 : }
561 :
562 1208925 : pub fn add_key(&mut self, key: Key) {
563 1208925 : self.add_range(singleton_range(key))
564 1208925 : }
565 :
566 5627859 : pub fn add_range(&mut self, range: Range<Key>) {
567 5627859 : self.ranges.push(range);
568 5627859 : }
569 :
570 1694798 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
571 3389604 : for range in keyspace.ranges {
572 1694806 : self.add_range(range);
573 1694806 : }
574 1694798 : }
575 :
576 9507761 : pub fn to_keyspace(mut self) -> KeySpace {
577 9507761 : let mut ranges = Vec::new();
578 9507761 : if !self.ranges.is_empty() {
579 5457875 : self.ranges.sort_by_key(|r| r.start);
580 5457875 : let mut start = self.ranges.first().unwrap().start;
581 5457875 : let mut end = self.ranges.first().unwrap().end;
582 11085640 : for r in self.ranges {
583 5627765 : assert!(r.start >= start);
584 5627765 : if r.start > end {
585 3176 : ranges.push(start..end);
586 3176 : start = r.start;
587 3176 : end = r.end;
588 5624589 : } else if r.end > end {
589 9242 : end = r.end;
590 5615347 : }
591 : }
592 5457875 : ranges.push(start..end);
593 4049886 : }
594 9507761 : KeySpace { ranges }
595 9507761 : }
596 :
597 5097505 : pub fn consume_keyspace(&mut self) -> KeySpace {
598 5097505 : let mut prev_accum = KeySpaceRandomAccum::new();
599 5097505 : std::mem::swap(self, &mut prev_accum);
600 5097505 :
601 5097505 : prev_accum.to_keyspace()
602 5097505 : }
603 : }
604 :
605 9532081 : pub fn singleton_range(key: Key) -> Range<Key> {
606 9532081 : key..key.next()
607 9532081 : }
608 :
609 : #[cfg(test)]
610 : mod tests {
611 : use std::fmt::Write;
612 :
613 : use rand::{RngCore, SeedableRng};
614 :
615 : use super::*;
616 : use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber, ShardStripeSize};
617 :
618 : // Helper function to create a key range.
619 : //
620 : // Make the tests below less verbose.
621 46 : fn kr(irange: Range<i128>) -> Range<Key> {
622 46 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
623 46 : }
624 :
625 : #[allow(dead_code)]
626 0 : fn dump_keyspace(ks: &KeySpace) {
627 0 : for r in ks.ranges.iter() {
628 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
629 0 : }
630 0 : }
631 :
632 11 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
633 11 : if actual.ranges != expected {
634 0 : let mut msg = String::new();
635 0 :
636 0 : writeln!(msg, "expected:").unwrap();
637 0 : for r in &expected {
638 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
639 0 : }
640 0 : writeln!(msg, "got:").unwrap();
641 0 : for r in &actual.ranges {
642 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
643 0 : }
644 0 : panic!("{}", msg);
645 11 : }
646 11 : }
647 :
648 : #[test]
649 1 : fn keyspace_consume() {
650 1 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
651 1 :
652 1 : let mut accum = KeySpaceAccum::new();
653 4 : for range in &ranges {
654 3 : accum.add_range(range.clone());
655 3 : }
656 :
657 1 : let expected_size: u64 = ranges
658 1 : .iter()
659 3 : .map(|r| ShardedRange::raw_size(r) as u64)
660 1 : .sum();
661 1 : assert_eq!(accum.raw_size(), expected_size);
662 :
663 1 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
664 1 : assert_eq!(accum.raw_size(), 0);
665 :
666 1 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
667 1 : assert_eq!(accum.raw_size(), 0);
668 :
669 4 : for range in &ranges {
670 3 : accum.add_range(range.clone());
671 3 : }
672 1 : assert_ks_eq(&accum.to_keyspace(), ranges);
673 1 : }
674 :
675 : #[test]
676 1 : fn keyspace_add_range() {
677 1 : // two separate ranges
678 1 : //
679 1 : // #####
680 1 : // #####
681 1 : let mut ks = KeySpaceRandomAccum::default();
682 1 : ks.add_range(kr(0..10));
683 1 : ks.add_range(kr(20..30));
684 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
685 1 :
686 1 : // two separate ranges, added in reverse order
687 1 : //
688 1 : // #####
689 1 : // #####
690 1 : let mut ks = KeySpaceRandomAccum::default();
691 1 : ks.add_range(kr(20..30));
692 1 : ks.add_range(kr(0..10));
693 1 :
694 1 : // add range that is adjacent to the end of an existing range
695 1 : //
696 1 : // #####
697 1 : // #####
698 1 : ks.add_range(kr(0..10));
699 1 : ks.add_range(kr(10..30));
700 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
701 1 :
702 1 : // add range that is adjacent to the start of an existing range
703 1 : //
704 1 : // #####
705 1 : // #####
706 1 : let mut ks = KeySpaceRandomAccum::default();
707 1 : ks.add_range(kr(10..30));
708 1 : ks.add_range(kr(0..10));
709 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
710 1 :
711 1 : // add range that overlaps with the end of an existing range
712 1 : //
713 1 : // #####
714 1 : // #####
715 1 : let mut ks = KeySpaceRandomAccum::default();
716 1 : ks.add_range(kr(0..10));
717 1 : ks.add_range(kr(5..30));
718 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
719 1 :
720 1 : // add range that overlaps with the start of an existing range
721 1 : //
722 1 : // #####
723 1 : // #####
724 1 : let mut ks = KeySpaceRandomAccum::default();
725 1 : ks.add_range(kr(5..30));
726 1 : ks.add_range(kr(0..10));
727 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
728 1 :
729 1 : // add range that is fully covered by an existing range
730 1 : //
731 1 : // #########
732 1 : // #####
733 1 : let mut ks = KeySpaceRandomAccum::default();
734 1 : ks.add_range(kr(0..30));
735 1 : ks.add_range(kr(10..20));
736 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
737 1 :
738 1 : // add range that extends an existing range from both ends
739 1 : //
740 1 : // #####
741 1 : // #########
742 1 : let mut ks = KeySpaceRandomAccum::default();
743 1 : ks.add_range(kr(10..20));
744 1 : ks.add_range(kr(0..30));
745 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
746 1 :
747 1 : // add a range that overlaps with two existing ranges, joining them
748 1 : //
749 1 : // ##### #####
750 1 : // #######
751 1 : let mut ks = KeySpaceRandomAccum::default();
752 1 : ks.add_range(kr(0..10));
753 1 : ks.add_range(kr(20..30));
754 1 : ks.add_range(kr(5..25));
755 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
756 1 : }
757 :
758 : #[test]
759 1 : fn keyspace_overlaps() {
760 1 : let mut ks = KeySpaceRandomAccum::default();
761 1 : ks.add_range(kr(10..20));
762 1 : ks.add_range(kr(30..40));
763 1 : let ks = ks.to_keyspace();
764 1 :
765 1 : // ##### #####
766 1 : // xxxx
767 1 : assert!(!ks.overlaps(&kr(0..5)));
768 :
769 : // ##### #####
770 : // xxxx
771 1 : assert!(!ks.overlaps(&kr(5..9)));
772 :
773 : // ##### #####
774 : // xxxx
775 1 : assert!(!ks.overlaps(&kr(5..10)));
776 :
777 : // ##### #####
778 : // xxxx
779 1 : assert!(ks.overlaps(&kr(5..11)));
780 :
781 : // ##### #####
782 : // xxxx
783 1 : assert!(ks.overlaps(&kr(10..15)));
784 :
785 : // ##### #####
786 : // xxxx
787 1 : assert!(ks.overlaps(&kr(15..20)));
788 :
789 : // ##### #####
790 : // xxxx
791 1 : assert!(ks.overlaps(&kr(15..25)));
792 :
793 : // ##### #####
794 : // xxxx
795 1 : assert!(!ks.overlaps(&kr(22..28)));
796 :
797 : // ##### #####
798 : // xxxx
799 1 : assert!(!ks.overlaps(&kr(25..30)));
800 :
801 : // ##### #####
802 : // xxxx
803 1 : assert!(ks.overlaps(&kr(35..35)));
804 :
805 : // ##### #####
806 : // xxxx
807 1 : assert!(!ks.overlaps(&kr(40..45)));
808 :
809 : // ##### #####
810 : // xxxx
811 1 : assert!(!ks.overlaps(&kr(45..50)));
812 :
813 : // ##### #####
814 : // xxxxxxxxxxx
815 1 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
816 1 : }
817 :
818 : #[test]
819 1 : fn test_remove_full_overlapps() {
820 1 : let mut key_space1 = KeySpace {
821 1 : ranges: vec![
822 1 : Key::from_i128(1)..Key::from_i128(4),
823 1 : Key::from_i128(5)..Key::from_i128(8),
824 1 : Key::from_i128(10)..Key::from_i128(12),
825 1 : ],
826 1 : };
827 1 : let key_space2 = KeySpace {
828 1 : ranges: vec![
829 1 : Key::from_i128(2)..Key::from_i128(3),
830 1 : Key::from_i128(6)..Key::from_i128(7),
831 1 : Key::from_i128(11)..Key::from_i128(13),
832 1 : ],
833 1 : };
834 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
835 1 : let removed_expected = KeySpace {
836 1 : ranges: vec![
837 1 : Key::from_i128(2)..Key::from_i128(3),
838 1 : Key::from_i128(6)..Key::from_i128(7),
839 1 : Key::from_i128(11)..Key::from_i128(12),
840 1 : ],
841 1 : };
842 1 : assert_eq!(removed, removed_expected);
843 :
844 1 : assert_eq!(
845 1 : key_space1.ranges,
846 1 : vec![
847 1 : Key::from_i128(1)..Key::from_i128(2),
848 1 : Key::from_i128(3)..Key::from_i128(4),
849 1 : Key::from_i128(5)..Key::from_i128(6),
850 1 : Key::from_i128(7)..Key::from_i128(8),
851 1 : Key::from_i128(10)..Key::from_i128(11)
852 1 : ]
853 1 : );
854 1 : }
855 :
856 : #[test]
857 1 : fn test_remove_partial_overlaps() {
858 1 : // Test partial ovelaps
859 1 : let mut key_space1 = KeySpace {
860 1 : ranges: vec![
861 1 : Key::from_i128(1)..Key::from_i128(5),
862 1 : Key::from_i128(7)..Key::from_i128(10),
863 1 : Key::from_i128(12)..Key::from_i128(15),
864 1 : ],
865 1 : };
866 1 : let key_space2 = KeySpace {
867 1 : ranges: vec![
868 1 : Key::from_i128(3)..Key::from_i128(6),
869 1 : Key::from_i128(8)..Key::from_i128(11),
870 1 : Key::from_i128(14)..Key::from_i128(17),
871 1 : ],
872 1 : };
873 1 :
874 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
875 1 : let removed_expected = KeySpace {
876 1 : ranges: vec![
877 1 : Key::from_i128(3)..Key::from_i128(5),
878 1 : Key::from_i128(8)..Key::from_i128(10),
879 1 : Key::from_i128(14)..Key::from_i128(15),
880 1 : ],
881 1 : };
882 1 : assert_eq!(removed, removed_expected);
883 :
884 1 : assert_eq!(
885 1 : key_space1.ranges,
886 1 : vec![
887 1 : Key::from_i128(1)..Key::from_i128(3),
888 1 : Key::from_i128(7)..Key::from_i128(8),
889 1 : Key::from_i128(12)..Key::from_i128(14),
890 1 : ]
891 1 : );
892 1 : }
893 :
894 : #[test]
895 1 : fn test_remove_no_overlaps() {
896 1 : let mut key_space1 = KeySpace {
897 1 : ranges: vec![
898 1 : Key::from_i128(1)..Key::from_i128(5),
899 1 : Key::from_i128(7)..Key::from_i128(10),
900 1 : Key::from_i128(12)..Key::from_i128(15),
901 1 : ],
902 1 : };
903 1 : let key_space2 = KeySpace {
904 1 : ranges: vec![
905 1 : Key::from_i128(6)..Key::from_i128(7),
906 1 : Key::from_i128(11)..Key::from_i128(12),
907 1 : Key::from_i128(15)..Key::from_i128(17),
908 1 : ],
909 1 : };
910 1 :
911 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
912 1 : let removed_expected = KeySpace::default();
913 1 : assert_eq!(removed, removed_expected);
914 :
915 1 : assert_eq!(
916 1 : key_space1.ranges,
917 1 : vec![
918 1 : Key::from_i128(1)..Key::from_i128(5),
919 1 : Key::from_i128(7)..Key::from_i128(10),
920 1 : Key::from_i128(12)..Key::from_i128(15),
921 1 : ]
922 1 : );
923 1 : }
924 :
925 : #[test]
926 1 : fn test_remove_one_range_overlaps_multiple() {
927 1 : let mut key_space1 = KeySpace {
928 1 : ranges: vec![
929 1 : Key::from_i128(1)..Key::from_i128(3),
930 1 : Key::from_i128(3)..Key::from_i128(6),
931 1 : Key::from_i128(6)..Key::from_i128(10),
932 1 : Key::from_i128(12)..Key::from_i128(15),
933 1 : Key::from_i128(17)..Key::from_i128(20),
934 1 : Key::from_i128(20)..Key::from_i128(30),
935 1 : Key::from_i128(30)..Key::from_i128(40),
936 1 : ],
937 1 : };
938 1 : let key_space2 = KeySpace {
939 1 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
940 1 : };
941 1 :
942 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
943 1 : let removed_expected = KeySpace {
944 1 : ranges: vec![
945 1 : Key::from_i128(9)..Key::from_i128(10),
946 1 : Key::from_i128(12)..Key::from_i128(15),
947 1 : Key::from_i128(17)..Key::from_i128(19),
948 1 : ],
949 1 : };
950 1 : assert_eq!(removed, removed_expected);
951 :
952 1 : assert_eq!(
953 1 : key_space1.ranges,
954 1 : vec![
955 1 : Key::from_i128(1)..Key::from_i128(3),
956 1 : Key::from_i128(3)..Key::from_i128(6),
957 1 : Key::from_i128(6)..Key::from_i128(9),
958 1 : Key::from_i128(19)..Key::from_i128(20),
959 1 : Key::from_i128(20)..Key::from_i128(30),
960 1 : Key::from_i128(30)..Key::from_i128(40),
961 1 : ]
962 1 : );
963 1 : }
964 : #[test]
965 1 : fn sharded_range_relation_gap() {
966 1 : let shard_identity =
967 1 : ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
968 1 :
969 1 : let range = ShardedRange::new(
970 1 : Range {
971 1 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
972 1 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
973 1 : },
974 1 : &shard_identity,
975 1 : );
976 1 :
977 1 : // Key range spans relations, expect MAX
978 1 : assert_eq!(range.page_count(), u32::MAX);
979 1 : }
980 :
981 : #[test]
982 1 : fn shard_identity_keyspaces_single_key() {
983 1 : let shard_identity =
984 1 : ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
985 1 :
986 1 : let range = ShardedRange::new(
987 1 : Range {
988 1 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
989 1 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
990 1 : },
991 1 : &shard_identity,
992 1 : );
993 1 : // Single-key range on logical size key
994 1 : assert_eq!(range.page_count(), 1);
995 1 : }
996 :
997 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
998 : #[test]
999 1 : fn contiguous_range_check() {
1000 1 : assert!(!is_contiguous_range(
1001 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1002 1 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1003 1 : ),);
1004 :
1005 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1006 : // not considered a rel block range because 0xffffffff stores logical sizes,
1007 : // not blocks.
1008 1 : assert!(!is_contiguous_range(
1009 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1010 1 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1011 1 : ),);
1012 :
1013 : // Keys within the normal data region of a relation
1014 1 : assert!(is_contiguous_range(
1015 1 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1016 1 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1017 1 : ),);
1018 :
1019 : // The logical size key of one forkno, then some blocks in the next
1020 1 : assert!(is_contiguous_range(
1021 1 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1022 1 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1023 1 : ),);
1024 1 : }
1025 :
1026 : #[test]
1027 1 : fn shard_identity_keyspaces_forkno_gap() {
1028 1 : let shard_identity =
1029 1 : ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
1030 1 :
1031 1 : let range = ShardedRange::new(
1032 1 : Range {
1033 1 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1034 1 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1035 1 : },
1036 1 : &shard_identity,
1037 1 : );
1038 1 :
1039 1 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1040 1 : // calculate a valid size, because we have no way to know if they keys between start
1041 1 : // and end are actually in use.
1042 1 : assert_eq!(range.page_count(), u32::MAX);
1043 1 : }
1044 :
1045 : #[test]
1046 1 : fn shard_identity_keyspaces_one_relation() {
1047 5 : for shard_number in 0..4 {
1048 4 : let shard_identity = ShardIdentity::new(
1049 4 : ShardNumber(shard_number),
1050 4 : ShardCount::new(4),
1051 4 : DEFAULT_STRIPE_SIZE,
1052 4 : )
1053 4 : .unwrap();
1054 4 :
1055 4 : let range = ShardedRange::new(
1056 4 : Range {
1057 4 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1058 4 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1059 4 : },
1060 4 : &shard_identity,
1061 4 : );
1062 4 :
1063 4 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1064 4 : if shard_number == 0 {
1065 1 : assert_eq!(range.page_count(), 1);
1066 : } else {
1067 : // Other shards should perceive the range's size as zero
1068 3 : assert_eq!(range.page_count(), 0);
1069 : }
1070 : }
1071 1 : }
1072 :
1073 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1074 : /// the total page count in the range and the fragments.
1075 1012 : fn do_fragment(
1076 1012 : range_start: Key,
1077 1012 : range_end: Key,
1078 1012 : shard_identity: &ShardIdentity,
1079 1012 : target_nblocks: u32,
1080 1012 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1081 1012 : let range = ShardedRange::new(
1082 1012 : Range {
1083 1012 : start: range_start,
1084 1012 : end: range_end,
1085 1012 : },
1086 1012 : shard_identity,
1087 1012 : );
1088 1012 :
1089 1012 : let page_count = range.page_count();
1090 1012 : let fragments = range.fragment(target_nblocks);
1091 1012 :
1092 1012 : // Invariant: we always get at least one fragment
1093 1012 : assert!(!fragments.is_empty());
1094 :
1095 : // Invariant: the first/last fragment start/end should equal the input start/end
1096 1012 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1097 1012 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1098 :
1099 1012 : if page_count > 0 {
1100 : // Invariant: every fragment must contain at least one shard-local page, if the
1101 : // total range contains at least one shard-local page
1102 702 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1103 569 : if !all_nonzero {
1104 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1105 569 : }
1106 569 : assert!(all_nonzero);
1107 : } else {
1108 : // A range with no shard-local pages should always be returned as a single fragment
1109 443 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1110 : }
1111 :
1112 : // Invariant: fragments must be ordered and non-overlapping
1113 1012 : let mut last: Option<Range<Key>> = None;
1114 2157 : for frag in &fragments {
1115 1145 : if let Some(last) = last {
1116 133 : assert!(frag.1.start >= last.end);
1117 133 : assert!(frag.1.start > last.start);
1118 1012 : }
1119 1145 : last = Some(frag.1.clone())
1120 : }
1121 :
1122 : // Invariant: fragments respect target_nblocks
1123 2157 : for frag in &fragments {
1124 1145 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1125 : }
1126 :
1127 1012 : (page_count, fragments)
1128 1012 : }
1129 :
1130 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1131 : /// for a single tenant.
1132 : #[test]
1133 1 : fn sharded_range_fragment_simple() {
1134 : const SHARD_COUNT: u8 = 4;
1135 : const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
1136 :
1137 1 : let shard_identity = ShardIdentity::new(
1138 1 : ShardNumber(0),
1139 1 : ShardCount::new(SHARD_COUNT),
1140 1 : ShardStripeSize(STRIPE_SIZE),
1141 1 : )
1142 1 : .unwrap();
1143 1 :
1144 1 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1145 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1146 1 : let mut input_end = input_start;
1147 1 : input_end.field6 += STRIPE_SIZE; // field6 is block number
1148 1 :
1149 1 : // Ask for stripe_size blocks, we get the whole stripe
1150 1 : assert_eq!(
1151 1 : do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE),
1152 1 : (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
1153 1 : );
1154 :
1155 : // Ask for more, we still get the whole stripe
1156 1 : assert_eq!(
1157 1 : do_fragment(input_start, input_end, &shard_identity, 10 * STRIPE_SIZE),
1158 1 : (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
1159 1 : );
1160 :
1161 : // Ask for target_nblocks of half the stripe size, we get two halves
1162 1 : assert_eq!(
1163 1 : do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE / 2),
1164 1 : (
1165 1 : STRIPE_SIZE,
1166 1 : vec![
1167 1 : (
1168 1 : STRIPE_SIZE / 2,
1169 1 : input_start..input_start.add(STRIPE_SIZE / 2)
1170 1 : ),
1171 1 : (STRIPE_SIZE / 2, input_start.add(STRIPE_SIZE / 2)..input_end)
1172 1 : ]
1173 1 : )
1174 1 : );
1175 1 : }
1176 :
1177 : #[test]
1178 1 : fn sharded_range_fragment_multi_stripe() {
1179 : const SHARD_COUNT: u8 = 4;
1180 : const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
1181 : const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
1182 :
1183 1 : let shard_identity = ShardIdentity::new(
1184 1 : ShardNumber(0),
1185 1 : ShardCount::new(SHARD_COUNT),
1186 1 : ShardStripeSize(STRIPE_SIZE),
1187 1 : )
1188 1 : .unwrap();
1189 1 :
1190 1 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1191 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1192 1 : let mut input_end = input_start;
1193 1 : input_end.field6 += RANGE_SIZE; // field6 is block number
1194 1 :
1195 1 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1196 1 : // its size to be just the blocks belonging to our shard.
1197 1 : assert_eq!(
1198 1 : do_fragment(input_start, input_end, &shard_identity, RANGE_SIZE),
1199 1 : (STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
1200 1 : );
1201 :
1202 : // Ask for a sub-stripe quantity that results in 3 fragments.
1203 1 : let limit = STRIPE_SIZE / 3 + 1;
1204 1 : assert_eq!(
1205 1 : do_fragment(input_start, input_end, &shard_identity, limit),
1206 1 : (
1207 1 : STRIPE_SIZE,
1208 1 : vec![
1209 1 : (limit, input_start..input_start.add(limit)),
1210 1 : (limit, input_start.add(limit)..input_start.add(2 * limit)),
1211 1 : (
1212 1 : STRIPE_SIZE - 2 * limit,
1213 1 : input_start.add(2 * limit)..input_end
1214 1 : ),
1215 1 : ]
1216 1 : )
1217 1 : );
1218 :
1219 : // Try on a range that starts slightly after our owned stripe
1220 1 : assert_eq!(
1221 1 : do_fragment(input_start.add(1), input_end, &shard_identity, RANGE_SIZE),
1222 1 : (
1223 1 : STRIPE_SIZE - 1,
1224 1 : vec![(STRIPE_SIZE - 1, input_start.add(1)..input_end)]
1225 1 : )
1226 1 : );
1227 1 : }
1228 :
1229 : /// Test our calculations work correctly when we start a range from the logical size key of
1230 : /// a previous relation.
1231 : #[test]
1232 1 : fn sharded_range_fragment_starting_from_logical_size() {
1233 : const SHARD_COUNT: u8 = 4;
1234 : const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
1235 : const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
1236 :
1237 1 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1238 1 : let mut input_end = Key::from_hex("000000067f00000001000000ae0100000000").unwrap();
1239 1 : input_end.field6 += RANGE_SIZE; // field6 is block number
1240 1 :
1241 1 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1242 1 : let shard_identity = ShardIdentity::new(
1243 1 : ShardNumber(0),
1244 1 : ShardCount::new(SHARD_COUNT),
1245 1 : ShardStripeSize(STRIPE_SIZE),
1246 1 : )
1247 1 : .unwrap();
1248 1 : assert_eq!(
1249 1 : do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
1250 1 : (
1251 1 : STRIPE_SIZE + 1,
1252 1 : vec![(STRIPE_SIZE + 1, input_start..input_end)]
1253 1 : )
1254 1 : );
1255 :
1256 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1257 : // store all logical sizes)
1258 1 : let shard_identity = ShardIdentity::new(
1259 1 : ShardNumber(1),
1260 1 : ShardCount::new(SHARD_COUNT),
1261 1 : ShardStripeSize(STRIPE_SIZE),
1262 1 : )
1263 1 : .unwrap();
1264 1 : assert_eq!(
1265 1 : do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
1266 1 : (1, vec![(1, input_start..input_end)])
1267 1 : );
1268 1 : }
1269 :
1270 : /// Test that ShardedRange behaves properly when used on un-sharded data
1271 : #[test]
1272 1 : fn sharded_range_fragment_unsharded() {
1273 1 : let shard_identity = ShardIdentity::unsharded();
1274 1 :
1275 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1276 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1277 1 : assert_eq!(
1278 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1279 1 : (
1280 1 : 0x10000,
1281 1 : vec![
1282 1 : (0x8000, input_start..input_start.add(0x8000)),
1283 1 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1284 1 : ]
1285 1 : )
1286 1 : );
1287 1 : }
1288 :
1289 : #[test]
1290 1 : fn sharded_range_fragment_cross_relation() {
1291 1 : let shard_identity = ShardIdentity::unsharded();
1292 1 :
1293 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1294 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1295 1 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1296 1 : assert_eq!(
1297 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1298 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1299 1 : );
1300 :
1301 : // Same, but using a sharded identity
1302 1 : let shard_identity =
1303 1 : ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
1304 1 : assert_eq!(
1305 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1306 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1307 1 : );
1308 1 : }
1309 :
1310 : #[test]
1311 1 : fn sharded_range_fragment_tiny_nblocks() {
1312 1 : let shard_identity = ShardIdentity::unsharded();
1313 1 :
1314 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1315 1 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1316 1 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1317 1 : assert_eq!(
1318 1 : do_fragment(input_start, input_end, &shard_identity, 16),
1319 1 : (
1320 1 : 0x38,
1321 1 : vec![
1322 1 : (16, input_start..input_start.add(16)),
1323 1 : (16, input_start.add(16)..input_start.add(32)),
1324 1 : (16, input_start.add(32)..input_start.add(48)),
1325 1 : (8, input_start.add(48)..input_end),
1326 1 : ]
1327 1 : )
1328 1 : );
1329 1 : }
1330 :
1331 : #[test]
1332 1 : fn sharded_range_fragment_fuzz() {
1333 1 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1334 1 : // the test to be reproducible.
1335 1 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1336 :
1337 1001 : for _i in 0..1000 {
1338 1000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1339 519 : ShardIdentity::unsharded()
1340 : } else {
1341 481 : let shard_count = prng.next_u32() % 127 + 1;
1342 481 : ShardIdentity::new(
1343 481 : ShardNumber((prng.next_u32() % shard_count) as u8),
1344 481 : ShardCount::new(shard_count as u8),
1345 481 : DEFAULT_STRIPE_SIZE,
1346 481 : )
1347 481 : .unwrap()
1348 : };
1349 :
1350 1000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1351 1000 :
1352 1000 : let start_offset = prng.next_u32() % 16384;
1353 1000 :
1354 1000 : // Try ranges up to 4GiB in size, that are always at least 1
1355 1000 : let range_size = prng.next_u32() % 8192 + 1;
1356 1000 :
1357 1000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1358 1000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1359 1000 : .unwrap()
1360 1000 : .add(start_offset);
1361 1000 : let input_end = input_start.add(range_size);
1362 1000 :
1363 1000 : // This test's main success conditions are the invariants baked into do_fragment
1364 1000 : let (_total_size, fragments) =
1365 1000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1366 1000 :
1367 1000 : // Pick a random key within the range and check it appears in the output
1368 1000 : let example_key = input_start.add(prng.next_u32() % range_size);
1369 1000 :
1370 1000 : // Panic on unwrap if it isn't found
1371 1000 : let example_key_frag = fragments
1372 1000 : .iter()
1373 1073 : .find(|f| f.1.contains(&example_key))
1374 1000 : .unwrap();
1375 1000 :
1376 1000 : // Check that the fragment containing our random key has a nonzero size if
1377 1000 : // that key is shard-local
1378 1000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1379 1000 : if example_key_local {
1380 536 : assert!(example_key_frag.0 > 0);
1381 464 : }
1382 : }
1383 1 : }
1384 : }
|