Line data Source code
1 : use postgres_ffi::BLCKSZ;
2 : use std::ops::Range;
3 :
4 : use crate::{
5 : key::Key,
6 : shard::{ShardCount, ShardIdentity},
7 : };
8 : use itertools::Itertools;
9 :
10 : ///
11 : /// Represents a set of Keys, in a compact form.
12 : ///
13 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
14 : pub struct KeySpace {
15 : /// Contiguous ranges of keys that belong to the key space. In key order,
16 : /// and with no overlap.
17 : pub ranges: Vec<Range<Key>>,
18 : }
19 :
20 : impl std::fmt::Display for KeySpace {
21 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 0 : write!(f, "[")?;
23 0 : for range in &self.ranges {
24 0 : write!(f, "{}..{},", range.start, range.end)?;
25 : }
26 0 : write!(f, "]")
27 0 : }
28 : }
29 :
30 : /// A wrapper type for sparse keyspaces.
31 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
32 : pub struct SparseKeySpace(pub KeySpace);
33 :
34 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
35 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
36 : /// shard.
37 : ///
38 : /// When we iterate over keys within this object, we will skip any keys that don't belong
39 : /// to this shard.
40 : ///
41 : /// The start + end keys may not belong to the shard: these specify where layer files should
42 : /// start + end, but we will never actually read/write those keys.
43 : #[derive(Clone, Debug, PartialEq, Eq)]
44 : pub struct ShardedRange<'a> {
45 : pub shard_identity: &'a ShardIdentity,
46 : pub range: Range<Key>,
47 : }
48 :
49 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
50 : // top page in the previous relation's space.
51 21906054 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
52 21906054 : debug_assert!(is_contiguous_range(range));
53 21906054 : if range.start.field6 == 0xffffffff {
54 2481304 : range.end.field6 + 1
55 : } else {
56 19424750 : range.end.field6 - range.start.field6
57 : }
58 21906054 : }
59 :
60 : /// Return true if this key range includes only keys in the same relation's data blocks, or
61 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
62 : ///
63 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
64 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
65 : /// of a given contiguous range is present on one shard.
66 : ///
67 : /// This matters, because:
68 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
69 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
70 43816841 : fn is_contiguous_range(range: &Range<Key>) -> bool {
71 43816841 : range.start.field1 == range.end.field1
72 43812329 : && range.start.field2 == range.end.field2
73 43812205 : && range.start.field3 == range.end.field3
74 43811947 : && range.start.field4 == range.end.field4
75 43811946 : && (range.start.field5 == range.end.field5
76 4962612 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
77 43816841 : }
78 :
79 : impl<'a> ShardedRange<'a> {
80 6239 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
81 6239 : Self {
82 6239 : shard_identity,
83 6239 : range,
84 6239 : }
85 6239 : }
86 :
87 : /// Break up this range into chunks, each of which has at least one local key in it if the
88 : /// total range has at least one local key.
89 6232 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
90 6232 : // Optimization for single-key case (e.g. logical size keys)
91 6232 : if self.range.end == self.range.start.add(1) {
92 4351 : return vec![(
93 4351 : if self.shard_identity.is_key_disposable(&self.range.start) {
94 0 : 0
95 : } else {
96 4351 : 1
97 : },
98 4351 : self.range,
99 : )];
100 1881 : }
101 1881 :
102 1881 : if !is_contiguous_range(&self.range) {
103 : // Ranges that span relations are not fragmented. We only get these ranges as a result
104 : // of operations that act on existing layers, so we trust that the existing range is
105 : // reasonably small.
106 2 : return vec![(u32::MAX, self.range)];
107 1879 : }
108 1879 :
109 1879 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
110 1879 :
111 1879 : let mut cursor = self.range.start;
112 3914 : while cursor < self.range.end {
113 2035 : let advance_by = self.distance_to_next_boundary(cursor);
114 2035 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
115 :
116 : // If the previous fragment is undersized, then we seek to consume enough
117 : // blocks to complete it.
118 2035 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
119 156 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
120 145 : Some(frag) => {
121 145 : // Prev block is complete, want the full number.
122 145 : (
123 145 : target_nblocks,
124 145 : if is_fragment_disposable {
125 : // If this current range will be empty (not shard-local data), we will merge into previous
126 0 : Some(frag)
127 : } else {
128 145 : None
129 : },
130 : )
131 : }
132 : None => {
133 : // First iteration, want the full number
134 1879 : (target_nblocks, None)
135 : }
136 : };
137 :
138 2035 : let advance_by = if is_fragment_disposable {
139 468 : advance_by
140 : } else {
141 1567 : std::cmp::min(advance_by, want_blocks)
142 : };
143 :
144 2035 : let next_cursor = cursor.add(advance_by);
145 :
146 2035 : let this_frag = (
147 2035 : if is_fragment_disposable {
148 468 : 0
149 : } else {
150 1567 : advance_by
151 : },
152 2035 : cursor..next_cursor,
153 2035 : );
154 2035 : cursor = next_cursor;
155 :
156 2035 : if let Some(last_fragment) = merge_last_fragment {
157 11 : // Previous fragment was short or this one is empty, merge into it
158 11 : last_fragment.0 += this_frag.0;
159 11 : last_fragment.1.end = this_frag.1.end;
160 2024 : } else {
161 2024 : fragments.push(this_frag);
162 2024 : }
163 : }
164 :
165 1879 : fragments
166 6232 : }
167 :
168 : /// Estimate the physical pages that are within this range, on this shard. This returns
169 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
170 1019 : pub fn page_count(&self) -> u32 {
171 1019 : // Special cases for single keys like logical sizes
172 1019 : if self.range.end == self.range.start.add(1) {
173 6 : return if self.shard_identity.is_key_disposable(&self.range.start) {
174 3 : 0
175 : } else {
176 3 : 1
177 : };
178 1013 : }
179 1013 :
180 1013 : // We can only do an authentic calculation of contiguous key ranges
181 1013 : if !is_contiguous_range(&self.range) {
182 4 : return u32::MAX;
183 1009 : }
184 1009 :
185 1009 : // Special case for single sharded tenants: our logical and physical sizes are the same
186 1009 : if self.shard_identity.count < ShardCount::new(2) {
187 524 : return contiguous_range_len(&self.range);
188 485 : }
189 485 :
190 485 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
191 485 : // to Self, and add the stripe's block count to our total if so.
192 485 : let mut result: u64 = 0;
193 485 : let mut cursor = self.range.start;
194 981 : while cursor < self.range.end {
195 : // Count up to the next stripe_size boundary or end of range
196 496 : let advance_by = self.distance_to_next_boundary(cursor);
197 496 :
198 496 : // If this blocks in this stripe belong to us, add them to our count
199 496 : if !self.shard_identity.is_key_disposable(&cursor) {
200 28 : result += advance_by as u64;
201 468 : }
202 :
203 496 : cursor = cursor.add(advance_by);
204 : }
205 :
206 485 : if result > u32::MAX as u64 {
207 0 : u32::MAX
208 : } else {
209 485 : result as u32
210 : }
211 1019 : }
212 :
213 : /// Advance the cursor to the next potential fragment boundary: this is either
214 : /// a stripe boundary, or the end of the range.
215 2531 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
216 2531 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
217 2531 :
218 2531 : if self.shard_identity.count < ShardCount::new(2) {
219 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
220 1532 : return distance_to_range_end;
221 999 : }
222 999 :
223 999 : if cursor.field6 == 0xffffffff {
224 : // We are wrapping from one relation's logical size to the next relation's first data block
225 4 : return 1;
226 995 : }
227 995 :
228 995 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
229 995 : let stripe_remainder = self.shard_identity.stripe_size.0
230 995 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
231 995 :
232 995 : if cfg!(debug_assertions) {
233 : // We should never overflow field5 and field6 -- our callers check this earlier
234 : // and would have returned their u32::MAX cases if the input range violated this.
235 995 : let next_cursor = cursor.add(stripe_remainder);
236 995 : debug_assert!(
237 995 : next_cursor.field1 == cursor.field1
238 995 : && next_cursor.field2 == cursor.field2
239 995 : && next_cursor.field3 == cursor.field3
240 995 : && next_cursor.field4 == cursor.field4
241 995 : && next_cursor.field5 == cursor.field5
242 : )
243 0 : }
244 :
245 995 : std::cmp::min(stripe_remainder, distance_to_range_end)
246 2531 : }
247 :
248 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
249 : /// this function simply calculates the number of pages in the space, without accounting for those
250 : /// pages that would not actually be stored on this node.
251 : ///
252 : /// Don't use this function in code that works with physical entities like layer files.
253 21907889 : pub fn raw_size(range: &Range<Key>) -> u32 {
254 21907889 : if is_contiguous_range(range) {
255 21902999 : contiguous_range_len(range)
256 : } else {
257 4890 : u32::MAX
258 : }
259 21907889 : }
260 : }
261 :
262 : impl KeySpace {
263 : /// Create a key space with a single range.
264 24415 : pub fn single(key_range: Range<Key>) -> Self {
265 24415 : Self {
266 24415 : ranges: vec![key_range],
267 24415 : }
268 24415 : }
269 :
270 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
271 : /// in each partition.
272 : ///
273 870 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
274 870 : // Assume that each value is 8k in size.
275 870 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
276 870 :
277 870 : let mut parts = Vec::new();
278 870 : let mut current_part = Vec::new();
279 870 : let mut current_part_size: usize = 0;
280 6090 : for range in &self.ranges {
281 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
282 : // will respect shard striping rather than assuming all keys within a range are present.
283 5220 : let range = ShardedRange::new(range.clone(), shard_identity);
284 :
285 : // Chunk up the range into parts that each contain up to target_size local blocks
286 5232 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
287 : // If appending the next contiguous range in the keyspace to the current
288 : // partition would cause it to be too large, and our current partition
289 : // covers at least one block that is physically present in this shard,
290 : // then start a new partition
291 5232 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
292 72 : && current_part_size > 0
293 72 : {
294 72 : parts.push(KeySpace {
295 72 : ranges: current_part,
296 72 : });
297 72 : current_part = Vec::new();
298 72 : current_part_size = 0;
299 5160 : }
300 5232 : current_part.push(frag_range.start..frag_range.end);
301 5232 : current_part_size += frag_on_shard_size as usize;
302 : }
303 : }
304 :
305 : // add last partition that wasn't full yet.
306 870 : if !current_part.is_empty() {
307 870 : parts.push(KeySpace {
308 870 : ranges: current_part,
309 870 : });
310 870 : }
311 :
312 870 : KeyPartitioning { parts }
313 870 : }
314 :
315 8239723 : pub fn is_empty(&self) -> bool {
316 8239723 : self.total_raw_size() == 0
317 8239723 : }
318 :
319 : /// Merge another keyspace into the current one.
320 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
321 5010160 : pub fn merge(&mut self, other: &KeySpace) {
322 5010160 : let all_ranges = self
323 5010160 : .ranges
324 5010160 : .iter()
325 5010160 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
326 5010160 :
327 5010160 : let mut accum = KeySpaceAccum::new();
328 5010160 : let mut prev: Option<&Range<Key>> = None;
329 7204153 : for range in all_ranges {
330 2193993 : if let Some(prev) = prev {
331 314798 : let overlap =
332 314798 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
333 314798 : assert!(
334 314798 : !overlap,
335 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
336 : prev, range
337 : );
338 1879195 : }
339 :
340 2193993 : accum.add_range(range.clone());
341 2193993 : prev = Some(range);
342 : }
343 :
344 5010160 : self.ranges = accum.to_keyspace().ranges;
345 5010160 : }
346 :
347 : /// Remove all keys in `other` from `self`.
348 : /// This can involve splitting or removing of existing ranges.
349 : /// Returns the removed keyspace
350 10818905 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
351 10818905 : let (self_start, self_end) = match (self.start(), self.end()) {
352 8916243 : (Some(start), Some(end)) => (start, end),
353 : _ => {
354 : // self is empty
355 1902662 : return KeySpace::default();
356 : }
357 : };
358 :
359 : // Key spaces are sorted by definition, so skip ahead to the first
360 : // potentially intersecting range. Similarly, ignore ranges that start
361 : // after the current keyspace ends.
362 8916243 : let other_ranges = other
363 8916243 : .ranges
364 8916243 : .iter()
365 8916243 : .skip_while(|range| self_start >= range.end)
366 8916243 : .take_while(|range| self_end > range.start);
367 8916243 :
368 8916243 : let mut removed_accum = KeySpaceRandomAccum::new();
369 12915842 : for range in other_ranges {
370 8215702 : while let Some(overlap_at) = self.overlaps_at(range) {
371 4216103 : let overlapped = self.ranges[overlap_at].clone();
372 4216103 :
373 4216103 : if overlapped.start < range.start && overlapped.end <= range.end {
374 26 : // Higher part of the range is completely overlapped.
375 26 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
376 26 : self.ranges[overlap_at].end = range.start;
377 4216077 : }
378 4216103 : if overlapped.start >= range.start && overlapped.end > range.end {
379 208 : // Lower part of the range is completely overlapped.
380 208 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
381 208 : self.ranges[overlap_at].start = range.end;
382 4215895 : }
383 4216103 : if overlapped.start < range.start && overlapped.end > range.end {
384 240038 : // Middle part of the range is overlapped.
385 240038 : removed_accum.add_range(range.clone());
386 240038 : self.ranges[overlap_at].end = range.start;
387 240038 : self.ranges
388 240038 : .insert(overlap_at + 1, range.end..overlapped.end);
389 3976065 : }
390 4216103 : if overlapped.start >= range.start && overlapped.end <= range.end {
391 3975831 : // Whole range is overlapped
392 3975831 : removed_accum.add_range(self.ranges[overlap_at].clone());
393 3975831 : self.ranges.remove(overlap_at);
394 3975831 : }
395 : }
396 : }
397 :
398 8916243 : removed_accum.to_keyspace()
399 10818905 : }
400 :
401 10819019 : pub fn start(&self) -> Option<Key> {
402 10819019 : self.ranges.first().map(|range| range.start)
403 10819019 : }
404 :
405 10819895 : pub fn end(&self) -> Option<Key> {
406 10819895 : self.ranges.last().map(|range| range.end)
407 10819895 : }
408 :
409 : /// The size of the keyspace in pages, before accounting for sharding
410 10123562 : pub fn total_raw_size(&self) -> usize {
411 10123562 : self.ranges
412 10123562 : .iter()
413 10123562 : .map(|range| ShardedRange::raw_size(range) as usize)
414 10123562 : .sum()
415 10123562 : }
416 :
417 10296895 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
418 12579720 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
419 343 : Ok(0) => None,
420 3762500 : Err(0) => None,
421 366232 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
422 6167820 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
423 368461 : _ => None,
424 : }
425 10296895 : }
426 :
427 : ///
428 : /// Check if key space contains overlapping range
429 : ///
430 2081193 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
431 2081193 : self.overlaps_at(range).is_some()
432 2081193 : }
433 :
434 : /// Check if the keyspace contains a key
435 2075348 : pub fn contains(&self, key: &Key) -> bool {
436 2075348 : self.overlaps(&(*key..key.next()))
437 2075348 : }
438 : }
439 :
440 : ///
441 : /// Represents a partitioning of the key space.
442 : ///
443 : /// The only kind of partitioning we do is to partition the key space into
444 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
445 : /// But this data structure could represent any partitioning.
446 : ///
447 : #[derive(Clone, Debug, Default)]
448 : pub struct KeyPartitioning {
449 : pub parts: Vec<KeySpace>,
450 : }
451 :
452 : /// Represents a partitioning of the sparse key space.
453 : #[derive(Clone, Debug, Default)]
454 : pub struct SparseKeyPartitioning {
455 : pub parts: Vec<SparseKeySpace>,
456 : }
457 :
458 : impl KeyPartitioning {
459 2484 : pub fn new() -> Self {
460 2484 : KeyPartitioning { parts: Vec::new() }
461 2484 : }
462 :
463 : /// Convert a key partitioning to a sparse partition.
464 1242 : pub fn into_sparse(self) -> SparseKeyPartitioning {
465 1242 : SparseKeyPartitioning {
466 1242 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
467 1242 : }
468 1242 : }
469 : }
470 :
471 : impl SparseKeyPartitioning {
472 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
473 : /// cause long/dead loops.
474 1614 : pub fn into_dense(self) -> KeyPartitioning {
475 1614 : KeyPartitioning {
476 1614 : parts: self.parts.into_iter().map(|x| x.0).collect(),
477 1614 : }
478 1614 : }
479 : }
480 :
481 : ///
482 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
483 : /// object. This takes care of merging adjacent keys and key ranges into
484 : /// contiguous ranges.
485 : ///
486 : #[derive(Clone, Debug, Default)]
487 : pub struct KeySpaceAccum {
488 : accum: Option<Range<Key>>,
489 :
490 : ranges: Vec<Range<Key>>,
491 : size: u64,
492 : }
493 :
494 : impl KeySpaceAccum {
495 6562089 : pub fn new() -> Self {
496 6562089 : Self {
497 6562089 : accum: None,
498 6562089 : ranges: Vec::new(),
499 6562089 : size: 0,
500 6562089 : }
501 6562089 : }
502 :
503 : #[inline(always)]
504 12244002 : pub fn add_key(&mut self, key: Key) {
505 12244002 : self.add_range(singleton_range(key))
506 12244002 : }
507 :
508 : #[inline(always)]
509 16001887 : pub fn add_range(&mut self, range: Range<Key>) {
510 16001887 : self.size += ShardedRange::raw_size(&range) as u64;
511 16001887 :
512 16001887 : match self.accum.as_mut() {
513 12528690 : Some(accum) => {
514 12528690 : if range.start == accum.end {
515 12200508 : accum.end = range.end;
516 12200508 : } else {
517 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
518 : // a new range here if the skipped region was all keys that don't belong on this shard.
519 : // (https://github.com/neondatabase/neon/issues/6247)
520 328182 : assert!(range.start > accum.end);
521 328182 : self.ranges.push(accum.clone());
522 328182 : *accum = range;
523 : }
524 : }
525 3473197 : None => self.accum = Some(range),
526 : }
527 16001887 : }
528 :
529 5913997 : pub fn to_keyspace(mut self) -> KeySpace {
530 5913997 : if let Some(accum) = self.accum.take() {
531 2772879 : self.ranges.push(accum);
532 3141118 : }
533 5913997 : KeySpace {
534 5913997 : ranges: self.ranges,
535 5913997 : }
536 5913997 : }
537 :
538 3392 : pub fn consume_keyspace(&mut self) -> KeySpace {
539 3392 : std::mem::take(self).to_keyspace()
540 3392 : }
541 :
542 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
543 : // the keys to be omitted in storage on this shard.
544 7353 : pub fn raw_size(&self) -> u64 {
545 7353 : self.size
546 7353 : }
547 : }
548 :
549 : ///
550 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
551 : /// object. Key ranges may be inserted in any order and can overlap.
552 : ///
553 : #[derive(Clone, Debug, Default)]
554 : pub struct KeySpaceRandomAccum {
555 : ranges: Vec<Range<Key>>,
556 : }
557 :
558 : impl KeySpaceRandomAccum {
559 25837102 : pub fn new() -> Self {
560 25837102 : Self { ranges: Vec::new() }
561 25837102 : }
562 :
563 2000713 : pub fn add_key(&mut self, key: Key) {
564 2000713 : self.add_range(singleton_range(key))
565 2000713 : }
566 :
567 9144686 : pub fn add_range(&mut self, range: Range<Key>) {
568 9144686 : self.ranges.push(range);
569 9144686 : }
570 :
571 2646220 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
572 5292968 : for range in keyspace.ranges {
573 2646748 : self.add_range(range);
574 2646748 : }
575 2646220 : }
576 :
577 18945896 : pub fn to_keyspace(mut self) -> KeySpace {
578 18945896 : let mut ranges = Vec::new();
579 18945896 : if !self.ranges.is_empty() {
580 8124507 : self.ranges.sort_by_key(|r| r.start);
581 8124507 : let mut start = self.ranges.first().unwrap().start;
582 8124507 : let mut end = self.ranges.first().unwrap().end;
583 17268983 : for r in self.ranges {
584 9144476 : assert!(r.start >= start);
585 9144476 : if r.start > end {
586 770281 : ranges.push(start..end);
587 770281 : start = r.start;
588 770281 : end = r.end;
589 8374195 : } else if r.end > end {
590 13482 : end = r.end;
591 8360713 : }
592 : }
593 8124507 : ranges.push(start..end);
594 10821389 : }
595 18945896 : KeySpace { ranges }
596 18945896 : }
597 :
598 10020260 : pub fn consume_keyspace(&mut self) -> KeySpace {
599 10020260 : let mut prev_accum = KeySpaceRandomAccum::new();
600 10020260 : std::mem::swap(self, &mut prev_accum);
601 10020260 :
602 10020260 : prev_accum.to_keyspace()
603 10020260 : }
604 : }
605 :
606 14244715 : pub fn singleton_range(key: Key) -> Range<Key> {
607 14244715 : key..key.next()
608 14244715 : }
609 :
610 : #[cfg(test)]
611 : mod tests {
612 : use rand::{RngCore, SeedableRng};
613 :
614 : use crate::{
615 : models::ShardParameters,
616 : shard::{ShardCount, ShardNumber},
617 : };
618 :
619 : use super::*;
620 : use std::fmt::Write;
621 :
622 : // Helper function to create a key range.
623 : //
624 : // Make the tests below less verbose.
625 46 : fn kr(irange: Range<i128>) -> Range<Key> {
626 46 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
627 46 : }
628 :
629 : #[allow(dead_code)]
630 0 : fn dump_keyspace(ks: &KeySpace) {
631 0 : for r in ks.ranges.iter() {
632 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
633 0 : }
634 0 : }
635 :
636 11 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
637 11 : if actual.ranges != expected {
638 0 : let mut msg = String::new();
639 0 :
640 0 : writeln!(msg, "expected:").unwrap();
641 0 : for r in &expected {
642 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
643 0 : }
644 0 : writeln!(msg, "got:").unwrap();
645 0 : for r in &actual.ranges {
646 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
647 0 : }
648 0 : panic!("{}", msg);
649 11 : }
650 11 : }
651 :
652 : #[test]
653 1 : fn keyspace_consume() {
654 1 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
655 1 :
656 1 : let mut accum = KeySpaceAccum::new();
657 4 : for range in &ranges {
658 3 : accum.add_range(range.clone());
659 3 : }
660 :
661 1 : let expected_size: u64 = ranges
662 1 : .iter()
663 3 : .map(|r| ShardedRange::raw_size(r) as u64)
664 1 : .sum();
665 1 : assert_eq!(accum.raw_size(), expected_size);
666 :
667 1 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
668 1 : assert_eq!(accum.raw_size(), 0);
669 :
670 1 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
671 1 : assert_eq!(accum.raw_size(), 0);
672 :
673 4 : for range in &ranges {
674 3 : accum.add_range(range.clone());
675 3 : }
676 1 : assert_ks_eq(&accum.to_keyspace(), ranges);
677 1 : }
678 :
679 : #[test]
680 1 : fn keyspace_add_range() {
681 1 : // two separate ranges
682 1 : //
683 1 : // #####
684 1 : // #####
685 1 : let mut ks = KeySpaceRandomAccum::default();
686 1 : ks.add_range(kr(0..10));
687 1 : ks.add_range(kr(20..30));
688 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
689 1 :
690 1 : // two separate ranges, added in reverse order
691 1 : //
692 1 : // #####
693 1 : // #####
694 1 : let mut ks = KeySpaceRandomAccum::default();
695 1 : ks.add_range(kr(20..30));
696 1 : ks.add_range(kr(0..10));
697 1 :
698 1 : // add range that is adjacent to the end of an existing range
699 1 : //
700 1 : // #####
701 1 : // #####
702 1 : ks.add_range(kr(0..10));
703 1 : ks.add_range(kr(10..30));
704 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
705 1 :
706 1 : // add range that is adjacent to the start of an existing range
707 1 : //
708 1 : // #####
709 1 : // #####
710 1 : let mut ks = KeySpaceRandomAccum::default();
711 1 : ks.add_range(kr(10..30));
712 1 : ks.add_range(kr(0..10));
713 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
714 1 :
715 1 : // add range that overlaps with the end of an existing range
716 1 : //
717 1 : // #####
718 1 : // #####
719 1 : let mut ks = KeySpaceRandomAccum::default();
720 1 : ks.add_range(kr(0..10));
721 1 : ks.add_range(kr(5..30));
722 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
723 1 :
724 1 : // add range that overlaps with the start of an existing range
725 1 : //
726 1 : // #####
727 1 : // #####
728 1 : let mut ks = KeySpaceRandomAccum::default();
729 1 : ks.add_range(kr(5..30));
730 1 : ks.add_range(kr(0..10));
731 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
732 1 :
733 1 : // add range that is fully covered by an existing range
734 1 : //
735 1 : // #########
736 1 : // #####
737 1 : let mut ks = KeySpaceRandomAccum::default();
738 1 : ks.add_range(kr(0..30));
739 1 : ks.add_range(kr(10..20));
740 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
741 1 :
742 1 : // add range that extends an existing range from both ends
743 1 : //
744 1 : // #####
745 1 : // #########
746 1 : let mut ks = KeySpaceRandomAccum::default();
747 1 : ks.add_range(kr(10..20));
748 1 : ks.add_range(kr(0..30));
749 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
750 1 :
751 1 : // add a range that overlaps with two existing ranges, joining them
752 1 : //
753 1 : // ##### #####
754 1 : // #######
755 1 : let mut ks = KeySpaceRandomAccum::default();
756 1 : ks.add_range(kr(0..10));
757 1 : ks.add_range(kr(20..30));
758 1 : ks.add_range(kr(5..25));
759 1 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
760 1 : }
761 :
762 : #[test]
763 1 : fn keyspace_overlaps() {
764 1 : let mut ks = KeySpaceRandomAccum::default();
765 1 : ks.add_range(kr(10..20));
766 1 : ks.add_range(kr(30..40));
767 1 : let ks = ks.to_keyspace();
768 1 :
769 1 : // ##### #####
770 1 : // xxxx
771 1 : assert!(!ks.overlaps(&kr(0..5)));
772 :
773 : // ##### #####
774 : // xxxx
775 1 : assert!(!ks.overlaps(&kr(5..9)));
776 :
777 : // ##### #####
778 : // xxxx
779 1 : assert!(!ks.overlaps(&kr(5..10)));
780 :
781 : // ##### #####
782 : // xxxx
783 1 : assert!(ks.overlaps(&kr(5..11)));
784 :
785 : // ##### #####
786 : // xxxx
787 1 : assert!(ks.overlaps(&kr(10..15)));
788 :
789 : // ##### #####
790 : // xxxx
791 1 : assert!(ks.overlaps(&kr(15..20)));
792 :
793 : // ##### #####
794 : // xxxx
795 1 : assert!(ks.overlaps(&kr(15..25)));
796 :
797 : // ##### #####
798 : // xxxx
799 1 : assert!(!ks.overlaps(&kr(22..28)));
800 :
801 : // ##### #####
802 : // xxxx
803 1 : assert!(!ks.overlaps(&kr(25..30)));
804 :
805 : // ##### #####
806 : // xxxx
807 1 : assert!(ks.overlaps(&kr(35..35)));
808 :
809 : // ##### #####
810 : // xxxx
811 1 : assert!(!ks.overlaps(&kr(40..45)));
812 :
813 : // ##### #####
814 : // xxxx
815 1 : assert!(!ks.overlaps(&kr(45..50)));
816 :
817 : // ##### #####
818 : // xxxxxxxxxxx
819 1 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
820 1 : }
821 :
822 : #[test]
823 1 : fn test_remove_full_overlapps() {
824 1 : let mut key_space1 = KeySpace {
825 1 : ranges: vec![
826 1 : Key::from_i128(1)..Key::from_i128(4),
827 1 : Key::from_i128(5)..Key::from_i128(8),
828 1 : Key::from_i128(10)..Key::from_i128(12),
829 1 : ],
830 1 : };
831 1 : let key_space2 = KeySpace {
832 1 : ranges: vec![
833 1 : Key::from_i128(2)..Key::from_i128(3),
834 1 : Key::from_i128(6)..Key::from_i128(7),
835 1 : Key::from_i128(11)..Key::from_i128(13),
836 1 : ],
837 1 : };
838 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
839 1 : let removed_expected = KeySpace {
840 1 : ranges: vec![
841 1 : Key::from_i128(2)..Key::from_i128(3),
842 1 : Key::from_i128(6)..Key::from_i128(7),
843 1 : Key::from_i128(11)..Key::from_i128(12),
844 1 : ],
845 1 : };
846 1 : assert_eq!(removed, removed_expected);
847 :
848 1 : assert_eq!(
849 1 : key_space1.ranges,
850 1 : vec![
851 1 : Key::from_i128(1)..Key::from_i128(2),
852 1 : Key::from_i128(3)..Key::from_i128(4),
853 1 : Key::from_i128(5)..Key::from_i128(6),
854 1 : Key::from_i128(7)..Key::from_i128(8),
855 1 : Key::from_i128(10)..Key::from_i128(11)
856 1 : ]
857 1 : );
858 1 : }
859 :
860 : #[test]
861 1 : fn test_remove_partial_overlaps() {
862 1 : // Test partial ovelaps
863 1 : let mut key_space1 = KeySpace {
864 1 : ranges: vec![
865 1 : Key::from_i128(1)..Key::from_i128(5),
866 1 : Key::from_i128(7)..Key::from_i128(10),
867 1 : Key::from_i128(12)..Key::from_i128(15),
868 1 : ],
869 1 : };
870 1 : let key_space2 = KeySpace {
871 1 : ranges: vec![
872 1 : Key::from_i128(3)..Key::from_i128(6),
873 1 : Key::from_i128(8)..Key::from_i128(11),
874 1 : Key::from_i128(14)..Key::from_i128(17),
875 1 : ],
876 1 : };
877 1 :
878 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
879 1 : let removed_expected = KeySpace {
880 1 : ranges: vec![
881 1 : Key::from_i128(3)..Key::from_i128(5),
882 1 : Key::from_i128(8)..Key::from_i128(10),
883 1 : Key::from_i128(14)..Key::from_i128(15),
884 1 : ],
885 1 : };
886 1 : assert_eq!(removed, removed_expected);
887 :
888 1 : assert_eq!(
889 1 : key_space1.ranges,
890 1 : vec![
891 1 : Key::from_i128(1)..Key::from_i128(3),
892 1 : Key::from_i128(7)..Key::from_i128(8),
893 1 : Key::from_i128(12)..Key::from_i128(14),
894 1 : ]
895 1 : );
896 1 : }
897 :
898 : #[test]
899 1 : fn test_remove_no_overlaps() {
900 1 : let mut key_space1 = KeySpace {
901 1 : ranges: vec![
902 1 : Key::from_i128(1)..Key::from_i128(5),
903 1 : Key::from_i128(7)..Key::from_i128(10),
904 1 : Key::from_i128(12)..Key::from_i128(15),
905 1 : ],
906 1 : };
907 1 : let key_space2 = KeySpace {
908 1 : ranges: vec![
909 1 : Key::from_i128(6)..Key::from_i128(7),
910 1 : Key::from_i128(11)..Key::from_i128(12),
911 1 : Key::from_i128(15)..Key::from_i128(17),
912 1 : ],
913 1 : };
914 1 :
915 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
916 1 : let removed_expected = KeySpace::default();
917 1 : assert_eq!(removed, removed_expected);
918 :
919 1 : assert_eq!(
920 1 : key_space1.ranges,
921 1 : vec![
922 1 : Key::from_i128(1)..Key::from_i128(5),
923 1 : Key::from_i128(7)..Key::from_i128(10),
924 1 : Key::from_i128(12)..Key::from_i128(15),
925 1 : ]
926 1 : );
927 1 : }
928 :
929 : #[test]
930 1 : fn test_remove_one_range_overlaps_multiple() {
931 1 : let mut key_space1 = KeySpace {
932 1 : ranges: vec![
933 1 : Key::from_i128(1)..Key::from_i128(3),
934 1 : Key::from_i128(3)..Key::from_i128(6),
935 1 : Key::from_i128(6)..Key::from_i128(10),
936 1 : Key::from_i128(12)..Key::from_i128(15),
937 1 : Key::from_i128(17)..Key::from_i128(20),
938 1 : Key::from_i128(20)..Key::from_i128(30),
939 1 : Key::from_i128(30)..Key::from_i128(40),
940 1 : ],
941 1 : };
942 1 : let key_space2 = KeySpace {
943 1 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
944 1 : };
945 1 :
946 1 : let removed = key_space1.remove_overlapping_with(&key_space2);
947 1 : let removed_expected = KeySpace {
948 1 : ranges: vec![
949 1 : Key::from_i128(9)..Key::from_i128(10),
950 1 : Key::from_i128(12)..Key::from_i128(15),
951 1 : Key::from_i128(17)..Key::from_i128(19),
952 1 : ],
953 1 : };
954 1 : assert_eq!(removed, removed_expected);
955 :
956 1 : assert_eq!(
957 1 : key_space1.ranges,
958 1 : vec![
959 1 : Key::from_i128(1)..Key::from_i128(3),
960 1 : Key::from_i128(3)..Key::from_i128(6),
961 1 : Key::from_i128(6)..Key::from_i128(9),
962 1 : Key::from_i128(19)..Key::from_i128(20),
963 1 : Key::from_i128(20)..Key::from_i128(30),
964 1 : Key::from_i128(30)..Key::from_i128(40),
965 1 : ]
966 1 : );
967 1 : }
968 : #[test]
969 1 : fn sharded_range_relation_gap() {
970 1 : let shard_identity = ShardIdentity::new(
971 1 : ShardNumber(0),
972 1 : ShardCount::new(4),
973 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
974 1 : )
975 1 : .unwrap();
976 1 :
977 1 : let range = ShardedRange::new(
978 1 : Range {
979 1 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
980 1 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
981 1 : },
982 1 : &shard_identity,
983 1 : );
984 1 :
985 1 : // Key range spans relations, expect MAX
986 1 : assert_eq!(range.page_count(), u32::MAX);
987 1 : }
988 :
989 : #[test]
990 1 : fn shard_identity_keyspaces_single_key() {
991 1 : let shard_identity = ShardIdentity::new(
992 1 : ShardNumber(1),
993 1 : ShardCount::new(4),
994 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
995 1 : )
996 1 : .unwrap();
997 1 :
998 1 : let range = ShardedRange::new(
999 1 : Range {
1000 1 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
1001 1 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
1002 1 : },
1003 1 : &shard_identity,
1004 1 : );
1005 1 : // Single-key range on logical size key
1006 1 : assert_eq!(range.page_count(), 1);
1007 1 : }
1008 :
1009 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
1010 : #[test]
1011 1 : fn contiguous_range_check() {
1012 1 : assert!(!is_contiguous_range(
1013 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1014 1 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1015 1 : ),);
1016 :
1017 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1018 : // not considered a rel block range because 0xffffffff stores logical sizes,
1019 : // not blocks.
1020 1 : assert!(!is_contiguous_range(
1021 1 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1022 1 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1023 1 : ),);
1024 :
1025 : // Keys within the normal data region of a relation
1026 1 : assert!(is_contiguous_range(
1027 1 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1028 1 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1029 1 : ),);
1030 :
1031 : // The logical size key of one forkno, then some blocks in the next
1032 1 : assert!(is_contiguous_range(
1033 1 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1034 1 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1035 1 : ),);
1036 1 : }
1037 :
1038 : #[test]
1039 1 : fn shard_identity_keyspaces_forkno_gap() {
1040 1 : let shard_identity = ShardIdentity::new(
1041 1 : ShardNumber(1),
1042 1 : ShardCount::new(4),
1043 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1044 1 : )
1045 1 : .unwrap();
1046 1 :
1047 1 : let range = ShardedRange::new(
1048 1 : Range {
1049 1 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1050 1 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1051 1 : },
1052 1 : &shard_identity,
1053 1 : );
1054 1 :
1055 1 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1056 1 : // calculate a valid size, because we have no way to know if they keys between start
1057 1 : // and end are actually in use.
1058 1 : assert_eq!(range.page_count(), u32::MAX);
1059 1 : }
1060 :
1061 : #[test]
1062 1 : fn shard_identity_keyspaces_one_relation() {
1063 5 : for shard_number in 0..4 {
1064 4 : let shard_identity = ShardIdentity::new(
1065 4 : ShardNumber(shard_number),
1066 4 : ShardCount::new(4),
1067 4 : ShardParameters::DEFAULT_STRIPE_SIZE,
1068 4 : )
1069 4 : .unwrap();
1070 4 :
1071 4 : let range = ShardedRange::new(
1072 4 : Range {
1073 4 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1074 4 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1075 4 : },
1076 4 : &shard_identity,
1077 4 : );
1078 4 :
1079 4 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1080 4 : if shard_number == 0 {
1081 1 : assert_eq!(range.page_count(), 1);
1082 : } else {
1083 : // Other shards should perceive the range's size as zero
1084 3 : assert_eq!(range.page_count(), 0);
1085 : }
1086 : }
1087 1 : }
1088 :
1089 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1090 : /// the total page count in the range and the fragments.
1091 1012 : fn do_fragment(
1092 1012 : range_start: Key,
1093 1012 : range_end: Key,
1094 1012 : shard_identity: &ShardIdentity,
1095 1012 : target_nblocks: u32,
1096 1012 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1097 1012 : let range = ShardedRange::new(
1098 1012 : Range {
1099 1012 : start: range_start,
1100 1012 : end: range_end,
1101 1012 : },
1102 1012 : shard_identity,
1103 1012 : );
1104 1012 :
1105 1012 : let page_count = range.page_count();
1106 1012 : let fragments = range.fragment(target_nblocks);
1107 1012 :
1108 1012 : // Invariant: we always get at least one fragment
1109 1012 : assert!(!fragments.is_empty());
1110 :
1111 : // Invariant: the first/last fragment start/end should equal the input start/end
1112 1012 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1113 1012 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1114 :
1115 1012 : if page_count > 0 {
1116 : // Invariant: every fragment must contain at least one shard-local page, if the
1117 : // total range contains at least one shard-local page
1118 687 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1119 554 : if !all_nonzero {
1120 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1121 554 : }
1122 554 : assert!(all_nonzero);
1123 : } else {
1124 : // A range with no shard-local pages should always be returned as a single fragment
1125 458 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1126 : }
1127 :
1128 : // Invariant: fragments must be ordered and non-overlapping
1129 1012 : let mut last: Option<Range<Key>> = None;
1130 2157 : for frag in &fragments {
1131 1145 : if let Some(last) = last {
1132 133 : assert!(frag.1.start >= last.end);
1133 133 : assert!(frag.1.start > last.start);
1134 1012 : }
1135 1145 : last = Some(frag.1.clone())
1136 : }
1137 :
1138 : // Invariant: fragments respect target_nblocks
1139 2157 : for frag in &fragments {
1140 1145 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1141 : }
1142 :
1143 1012 : (page_count, fragments)
1144 1012 : }
1145 :
1146 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1147 : /// for a single tenant.
1148 : #[test]
1149 1 : fn sharded_range_fragment_simple() {
1150 1 : let shard_identity = ShardIdentity::new(
1151 1 : ShardNumber(0),
1152 1 : ShardCount::new(4),
1153 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1154 1 : )
1155 1 : .unwrap();
1156 1 :
1157 1 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1158 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1159 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
1160 1 :
1161 1 : // Ask for stripe_size blocks, we get the whole stripe
1162 1 : assert_eq!(
1163 1 : do_fragment(input_start, input_end, &shard_identity, 32768),
1164 1 : (32768, vec![(32768, input_start..input_end)])
1165 1 : );
1166 :
1167 : // Ask for more, we still get the whole stripe
1168 1 : assert_eq!(
1169 1 : do_fragment(input_start, input_end, &shard_identity, 10000000),
1170 1 : (32768, vec![(32768, input_start..input_end)])
1171 1 : );
1172 :
1173 : // Ask for target_nblocks of half the stripe size, we get two halves
1174 1 : assert_eq!(
1175 1 : do_fragment(input_start, input_end, &shard_identity, 16384),
1176 1 : (
1177 1 : 32768,
1178 1 : vec![
1179 1 : (16384, input_start..input_start.add(16384)),
1180 1 : (16384, input_start.add(16384)..input_end)
1181 1 : ]
1182 1 : )
1183 1 : );
1184 1 : }
1185 :
1186 : #[test]
1187 1 : fn sharded_range_fragment_multi_stripe() {
1188 1 : let shard_identity = ShardIdentity::new(
1189 1 : ShardNumber(0),
1190 1 : ShardCount::new(4),
1191 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1192 1 : )
1193 1 : .unwrap();
1194 1 :
1195 1 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1196 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1197 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1198 1 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1199 1 : // its size to be just the blocks belonging to our shard.
1200 1 : assert_eq!(
1201 1 : do_fragment(input_start, input_end, &shard_identity, 131072),
1202 1 : (32768, vec![(32768, input_start..input_end)])
1203 1 : );
1204 :
1205 : // Ask for a sub-stripe quantity
1206 1 : assert_eq!(
1207 1 : do_fragment(input_start, input_end, &shard_identity, 16000),
1208 1 : (
1209 1 : 32768,
1210 1 : vec![
1211 1 : (16000, input_start..input_start.add(16000)),
1212 1 : (16000, input_start.add(16000)..input_start.add(32000)),
1213 1 : (768, input_start.add(32000)..input_end),
1214 1 : ]
1215 1 : )
1216 1 : );
1217 :
1218 : // Try on a range that starts slightly after our owned stripe
1219 1 : assert_eq!(
1220 1 : do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
1221 1 : (32767, vec![(32767, input_start.add(1)..input_end)])
1222 1 : );
1223 1 : }
1224 :
1225 : /// Test our calculations work correctly when we start a range from the logical size key of
1226 : /// a previous relation.
1227 : #[test]
1228 1 : fn sharded_range_fragment_starting_from_logical_size() {
1229 1 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1230 1 : let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
1231 1 :
1232 1 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1233 1 : let shard_identity = ShardIdentity::new(
1234 1 : ShardNumber(0),
1235 1 : ShardCount::new(4),
1236 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1237 1 : )
1238 1 : .unwrap();
1239 1 : assert_eq!(
1240 1 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1241 1 : (0x8001, vec![(0x8001, input_start..input_end)])
1242 1 : );
1243 :
1244 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1245 : // store all logical sizes)
1246 1 : let shard_identity = ShardIdentity::new(
1247 1 : ShardNumber(1),
1248 1 : ShardCount::new(4),
1249 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1250 1 : )
1251 1 : .unwrap();
1252 1 : assert_eq!(
1253 1 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1254 1 : (0x1, vec![(0x1, input_start..input_end)])
1255 1 : );
1256 1 : }
1257 :
1258 : /// Test that ShardedRange behaves properly when used on un-sharded data
1259 : #[test]
1260 1 : fn sharded_range_fragment_unsharded() {
1261 1 : let shard_identity = ShardIdentity::unsharded();
1262 1 :
1263 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1264 1 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1265 1 : assert_eq!(
1266 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1267 1 : (
1268 1 : 0x10000,
1269 1 : vec![
1270 1 : (0x8000, input_start..input_start.add(0x8000)),
1271 1 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1272 1 : ]
1273 1 : )
1274 1 : );
1275 1 : }
1276 :
1277 : #[test]
1278 1 : fn sharded_range_fragment_cross_relation() {
1279 1 : let shard_identity = ShardIdentity::unsharded();
1280 1 :
1281 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1282 1 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1283 1 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1284 1 : assert_eq!(
1285 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1286 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1287 1 : );
1288 :
1289 : // Same, but using a sharded identity
1290 1 : let shard_identity = ShardIdentity::new(
1291 1 : ShardNumber(0),
1292 1 : ShardCount::new(4),
1293 1 : ShardParameters::DEFAULT_STRIPE_SIZE,
1294 1 : )
1295 1 : .unwrap();
1296 1 : assert_eq!(
1297 1 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1298 1 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1299 1 : );
1300 1 : }
1301 :
1302 : #[test]
1303 1 : fn sharded_range_fragment_tiny_nblocks() {
1304 1 : let shard_identity = ShardIdentity::unsharded();
1305 1 :
1306 1 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1307 1 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1308 1 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1309 1 : assert_eq!(
1310 1 : do_fragment(input_start, input_end, &shard_identity, 16),
1311 1 : (
1312 1 : 0x38,
1313 1 : vec![
1314 1 : (16, input_start..input_start.add(16)),
1315 1 : (16, input_start.add(16)..input_start.add(32)),
1316 1 : (16, input_start.add(32)..input_start.add(48)),
1317 1 : (8, input_start.add(48)..input_end),
1318 1 : ]
1319 1 : )
1320 1 : );
1321 1 : }
1322 :
1323 : #[test]
1324 1 : fn sharded_range_fragment_fuzz() {
1325 1 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1326 1 : // the test to be reproducible.
1327 1 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1328 :
1329 1001 : for _i in 0..1000 {
1330 1000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1331 519 : ShardIdentity::unsharded()
1332 : } else {
1333 481 : let shard_count = prng.next_u32() % 127 + 1;
1334 481 : ShardIdentity::new(
1335 481 : ShardNumber((prng.next_u32() % shard_count) as u8),
1336 481 : ShardCount::new(shard_count as u8),
1337 481 : ShardParameters::DEFAULT_STRIPE_SIZE,
1338 481 : )
1339 481 : .unwrap()
1340 : };
1341 :
1342 1000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1343 1000 :
1344 1000 : let start_offset = prng.next_u32() % 16384;
1345 1000 :
1346 1000 : // Try ranges up to 4GiB in size, that are always at least 1
1347 1000 : let range_size = prng.next_u32() % 8192 + 1;
1348 1000 :
1349 1000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1350 1000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1351 1000 : .unwrap()
1352 1000 : .add(start_offset);
1353 1000 : let input_end = input_start.add(range_size);
1354 1000 :
1355 1000 : // This test's main success conditions are the invariants baked into do_fragment
1356 1000 : let (_total_size, fragments) =
1357 1000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1358 1000 :
1359 1000 : // Pick a random key within the range and check it appears in the output
1360 1000 : let example_key = input_start.add(prng.next_u32() % range_size);
1361 1000 :
1362 1000 : // Panic on unwrap if it isn't found
1363 1000 : let example_key_frag = fragments
1364 1000 : .iter()
1365 1073 : .find(|f| f.1.contains(&example_key))
1366 1000 : .unwrap();
1367 1000 :
1368 1000 : // Check that the fragment containing our random key has a nonzero size if
1369 1000 : // that key is shard-local
1370 1000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1371 1000 : if example_key_local {
1372 542 : assert!(example_key_frag.0 > 0);
1373 458 : }
1374 : }
1375 1 : }
1376 : }
|