Line data Source code
1 : use postgres_ffi::BLCKSZ;
2 : use std::ops::Range;
3 :
4 : use crate::{
5 : key::Key,
6 : shard::{ShardCount, ShardIdentity},
7 : };
8 : use itertools::Itertools;
9 :
10 : ///
11 : /// Represents a set of Keys, in a compact form.
12 : ///
13 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
14 : pub struct KeySpace {
15 : /// Contiguous ranges of keys that belong to the key space. In key order,
16 : /// and with no overlap.
17 : pub ranges: Vec<Range<Key>>,
18 : }
19 :
20 : impl std::fmt::Display for KeySpace {
21 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 0 : write!(f, "[")?;
23 0 : for range in &self.ranges {
24 0 : write!(f, "{}..{},", range.start, range.end)?;
25 : }
26 0 : write!(f, "]")
27 0 : }
28 : }
29 :
30 : /// A wrapper type for sparse keyspaces.
31 : #[derive(Clone, Debug, Default, PartialEq, Eq)]
32 : pub struct SparseKeySpace(pub KeySpace);
33 :
34 : /// Represents a contiguous half-open range of the keyspace, masked according to a particular
35 : /// ShardNumber's stripes: within this range of keys, only some "belong" to the current
36 : /// shard.
37 : ///
38 : /// When we iterate over keys within this object, we will skip any keys that don't belong
39 : /// to this shard.
40 : ///
41 : /// The start + end keys may not belong to the shard: these specify where layer files should
42 : /// start + end, but we will never actually read/write those keys.
43 : #[derive(Clone, Debug, PartialEq, Eq)]
44 : pub struct ShardedRange<'a> {
45 : pub shard_identity: &'a ShardIdentity,
46 : pub range: Range<Key>,
47 : }
48 :
49 : // Calculate the size of a range within the blocks of the same relation, or spanning only the
50 : // top page in the previous relation's space.
51 21934582 : fn contiguous_range_len(range: &Range<Key>) -> u32 {
52 21934582 : debug_assert!(is_contiguous_range(range));
53 21934582 : if range.start.field6 == 0xffffffff {
54 2481324 : range.end.field6 + 1
55 : } else {
56 19453258 : range.end.field6 - range.start.field6
57 : }
58 21934582 : }
59 :
60 : /// Return true if this key range includes only keys in the same relation's data blocks, or
61 : /// just spanning one relation and the logical size (0xffffffff) block of the relation before it.
62 : ///
63 : /// Contiguous in this context means we know the keys are in use _somewhere_, but it might not
64 : /// be on our shard. Later in ShardedRange we do the extra work to figure out how much
65 : /// of a given contiguous range is present on one shard.
66 : ///
67 : /// This matters, because:
68 : /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
69 : /// - Within such ranges, we may calculate distances using simple subtraction of field6.
70 43873136 : fn is_contiguous_range(range: &Range<Key>) -> bool {
71 43873136 : range.start.field1 == range.end.field1
72 43868660 : && range.start.field2 == range.end.field2
73 43868516 : && range.start.field3 == range.end.field3
74 43868258 : && range.start.field4 == range.end.field4
75 43868252 : && (range.start.field5 == range.end.field5
76 4962672 : || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5))
77 43873136 : }
78 :
79 : impl<'a> ShardedRange<'a> {
80 11298 : pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
81 11298 : Self {
82 11298 : shard_identity,
83 11298 : range,
84 11298 : }
85 11298 : }
86 :
87 : /// Break up this range into chunks, each of which has at least one local key in it if the
88 : /// total range has at least one local key.
89 11256 : pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
90 11256 : // Optimization for single-key case (e.g. logical size keys)
91 11256 : if self.range.end == self.range.start.add(1) {
92 4326 : return vec![(
93 4326 : if self.shard_identity.is_key_disposable(&self.range.start) {
94 0 : 0
95 : } else {
96 4326 : 1
97 : },
98 4326 : self.range,
99 : )];
100 6930 : }
101 6930 :
102 6930 : if !is_contiguous_range(&self.range) {
103 : // Ranges that span relations are not fragmented. We only get these ranges as a result
104 : // of operations that act on existing layers, so we trust that the existing range is
105 : // reasonably small.
106 12 : return vec![(u32::MAX, self.range)];
107 6918 : }
108 6918 :
109 6918 : let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
110 6918 :
111 6918 : let mut cursor = self.range.start;
112 14712 : while cursor < self.range.end {
113 7794 : let advance_by = self.distance_to_next_boundary(cursor);
114 7794 : let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
115 :
116 : // If the previous fragment is undersized, then we seek to consume enough
117 : // blocks to complete it.
118 7794 : let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
119 876 : Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
120 810 : Some(frag) => {
121 810 : // Prev block is complete, want the full number.
122 810 : (
123 810 : target_nblocks,
124 810 : if is_fragment_disposable {
125 : // If this current range will be empty (not shard-local data), we will merge into previous
126 0 : Some(frag)
127 : } else {
128 810 : None
129 : },
130 : )
131 : }
132 : None => {
133 : // First iteration, want the full number
134 6918 : (target_nblocks, None)
135 : }
136 : };
137 :
138 7794 : let advance_by = if is_fragment_disposable {
139 2808 : advance_by
140 : } else {
141 4986 : std::cmp::min(advance_by, want_blocks)
142 : };
143 :
144 7794 : let next_cursor = cursor.add(advance_by);
145 :
146 7794 : let this_frag = (
147 7794 : if is_fragment_disposable {
148 2808 : 0
149 : } else {
150 4986 : advance_by
151 : },
152 7794 : cursor..next_cursor,
153 7794 : );
154 7794 : cursor = next_cursor;
155 :
156 7794 : if let Some(last_fragment) = merge_last_fragment {
157 66 : // Previous fragment was short or this one is empty, merge into it
158 66 : last_fragment.0 += this_frag.0;
159 66 : last_fragment.1.end = this_frag.1.end;
160 7728 : } else {
161 7728 : fragments.push(this_frag);
162 7728 : }
163 : }
164 :
165 6918 : fragments
166 11256 : }
167 :
168 : /// Estimate the physical pages that are within this range, on this shard. This returns
169 : /// u32::MAX if the range spans relations: this return value should be interpreted as "large".
170 6114 : pub fn page_count(&self) -> u32 {
171 6114 : // Special cases for single keys like logical sizes
172 6114 : if self.range.end == self.range.start.add(1) {
173 36 : return if self.shard_identity.is_key_disposable(&self.range.start) {
174 18 : 0
175 : } else {
176 18 : 1
177 : };
178 6078 : }
179 6078 :
180 6078 : // We can only do an authentic calculation of contiguous key ranges
181 6078 : if !is_contiguous_range(&self.range) {
182 24 : return u32::MAX;
183 6054 : }
184 6054 :
185 6054 : // Special case for single sharded tenants: our logical and physical sizes are the same
186 6054 : if self.shard_identity.count < ShardCount::new(2) {
187 3144 : return contiguous_range_len(&self.range);
188 2910 : }
189 2910 :
190 2910 : // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
191 2910 : // to Self, and add the stripe's block count to our total if so.
192 2910 : let mut result: u64 = 0;
193 2910 : let mut cursor = self.range.start;
194 5886 : while cursor < self.range.end {
195 : // Count up to the next stripe_size boundary or end of range
196 2976 : let advance_by = self.distance_to_next_boundary(cursor);
197 2976 :
198 2976 : // If this blocks in this stripe belong to us, add them to our count
199 2976 : if !self.shard_identity.is_key_disposable(&cursor) {
200 168 : result += advance_by as u64;
201 2808 : }
202 :
203 2976 : cursor = cursor.add(advance_by);
204 : }
205 :
206 2910 : if result > u32::MAX as u64 {
207 0 : u32::MAX
208 : } else {
209 2910 : result as u32
210 : }
211 6114 : }
212 :
213 : /// Advance the cursor to the next potential fragment boundary: this is either
214 : /// a stripe boundary, or the end of the range.
215 10770 : fn distance_to_next_boundary(&self, cursor: Key) -> u32 {
216 10770 : let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end));
217 10770 :
218 10770 : if self.shard_identity.count < ShardCount::new(2) {
219 : // Optimization: don't bother stepping through stripes if the tenant isn't sharded.
220 4776 : return distance_to_range_end;
221 5994 : }
222 5994 :
223 5994 : if cursor.field6 == 0xffffffff {
224 : // We are wrapping from one relation's logical size to the next relation's first data block
225 24 : return 1;
226 5970 : }
227 5970 :
228 5970 : let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
229 5970 : let stripe_remainder = self.shard_identity.stripe_size.0
230 5970 : - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
231 5970 :
232 5970 : if cfg!(debug_assertions) {
233 : // We should never overflow field5 and field6 -- our callers check this earlier
234 : // and would have returned their u32::MAX cases if the input range violated this.
235 5970 : let next_cursor = cursor.add(stripe_remainder);
236 5970 : debug_assert!(
237 5970 : next_cursor.field1 == cursor.field1
238 5970 : && next_cursor.field2 == cursor.field2
239 5970 : && next_cursor.field3 == cursor.field3
240 5970 : && next_cursor.field4 == cursor.field4
241 5970 : && next_cursor.field5 == cursor.field5
242 : )
243 0 : }
244 :
245 5970 : std::cmp::min(stripe_remainder, distance_to_range_end)
246 10770 : }
247 :
248 : /// Whereas `page_count` estimates the number of pages physically in this range on this shard,
249 : /// this function simply calculates the number of pages in the space, without accounting for those
250 : /// pages that would not actually be stored on this node.
251 : ///
252 : /// Don't use this function in code that works with physical entities like layer files.
253 21925522 : pub fn raw_size(range: &Range<Key>) -> u32 {
254 21925522 : if is_contiguous_range(range) {
255 21920668 : contiguous_range_len(range)
256 : } else {
257 4854 : u32::MAX
258 : }
259 21925522 : }
260 : }
261 :
262 : impl KeySpace {
263 : /// Create a key space with a single range.
264 24348 : pub fn single(key_range: Range<Key>) -> Self {
265 24348 : Self {
266 24348 : ranges: vec![key_range],
267 24348 : }
268 24348 : }
269 :
270 : /// Partition a key space into roughly chunks of roughly 'target_size' bytes
271 : /// in each partition.
272 : ///
273 864 : pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
274 864 : // Assume that each value is 8k in size.
275 864 : let target_nblocks = (target_size / BLCKSZ as u64) as u32;
276 864 :
277 864 : let mut parts = Vec::new();
278 864 : let mut current_part = Vec::new();
279 864 : let mut current_part_size: usize = 0;
280 6048 : for range in &self.ranges {
281 : // While doing partitioning, wrap the range in ShardedRange so that our size calculations
282 : // will respect shard striping rather than assuming all keys within a range are present.
283 5184 : let range = ShardedRange::new(range.clone(), shard_identity);
284 :
285 : // Chunk up the range into parts that each contain up to target_size local blocks
286 5196 : for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) {
287 : // If appending the next contiguous range in the keyspace to the current
288 : // partition would cause it to be too large, and our current partition
289 : // covers at least one block that is physically present in this shard,
290 : // then start a new partition
291 5196 : if current_part_size + frag_on_shard_size as usize > target_nblocks as usize
292 72 : && current_part_size > 0
293 72 : {
294 72 : parts.push(KeySpace {
295 72 : ranges: current_part,
296 72 : });
297 72 : current_part = Vec::new();
298 72 : current_part_size = 0;
299 5124 : }
300 5196 : current_part.push(frag_range.start..frag_range.end);
301 5196 : current_part_size += frag_on_shard_size as usize;
302 : }
303 : }
304 :
305 : // add last partition that wasn't full yet.
306 864 : if !current_part.is_empty() {
307 864 : parts.push(KeySpace {
308 864 : ranges: current_part,
309 864 : });
310 864 : }
311 :
312 864 : KeyPartitioning { parts }
313 864 : }
314 :
315 8253472 : pub fn is_empty(&self) -> bool {
316 8253472 : self.total_raw_size() == 0
317 8253472 : }
318 :
319 : /// Merge another keyspace into the current one.
320 : /// Note: the keyspaces must not overlap (enforced via assertions). To merge overlapping key ranges, use `KeySpaceRandomAccum`.
321 5016406 : pub fn merge(&mut self, other: &KeySpace) {
322 5016406 : let all_ranges = self
323 5016406 : .ranges
324 5016406 : .iter()
325 5016406 : .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
326 5016406 :
327 5016406 : let mut accum = KeySpaceAccum::new();
328 5016406 : let mut prev: Option<&Range<Key>> = None;
329 7210831 : for range in all_ranges {
330 2194425 : if let Some(prev) = prev {
331 315249 : let overlap =
332 315249 : std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
333 315249 : assert!(
334 315249 : !overlap,
335 0 : "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
336 : prev, range
337 : );
338 1879176 : }
339 :
340 2194425 : accum.add_range(range.clone());
341 2194425 : prev = Some(range);
342 : }
343 :
344 5016406 : self.ranges = accum.to_keyspace().ranges;
345 5016406 : }
346 :
347 : /// Remove all keys in `other` from `self`.
348 : /// This can involve splitting or removing of existing ranges.
349 : /// Returns the removed keyspace
350 10836358 : pub fn remove_overlapping_with(&mut self, other: &KeySpace) -> KeySpace {
351 10836358 : let (self_start, self_end) = match (self.start(), self.end()) {
352 8933758 : (Some(start), Some(end)) => (start, end),
353 : _ => {
354 : // self is empty
355 1902600 : return KeySpace::default();
356 : }
357 : };
358 :
359 : // Key spaces are sorted by definition, so skip ahead to the first
360 : // potentially intersecting range. Similarly, ignore ranges that start
361 : // after the current keyspace ends.
362 8933758 : let other_ranges = other
363 8933758 : .ranges
364 8933758 : .iter()
365 8933758 : .skip_while(|range| self_start >= range.end)
366 8933758 : .take_while(|range| self_end > range.start);
367 8933758 :
368 8933758 : let mut removed_accum = KeySpaceRandomAccum::new();
369 12933358 : for range in other_ranges {
370 8215698 : while let Some(overlap_at) = self.overlaps_at(range) {
371 4216098 : let overlapped = self.ranges[overlap_at].clone();
372 4216098 :
373 4216098 : if overlapped.start < range.start && overlapped.end <= range.end {
374 51 : // Higher part of the range is completely overlapped.
375 51 : removed_accum.add_range(range.start..self.ranges[overlap_at].end);
376 51 : self.ranges[overlap_at].end = range.start;
377 4216047 : }
378 4216098 : if overlapped.start >= range.start && overlapped.end > range.end {
379 211 : // Lower part of the range is completely overlapped.
380 211 : removed_accum.add_range(self.ranges[overlap_at].start..range.end);
381 211 : self.ranges[overlap_at].start = range.end;
382 4215887 : }
383 4216098 : if overlapped.start < range.start && overlapped.end > range.end {
384 240038 : // Middle part of the range is overlapped.
385 240038 : removed_accum.add_range(range.clone());
386 240038 : self.ranges[overlap_at].end = range.start;
387 240038 : self.ranges
388 240038 : .insert(overlap_at + 1, range.end..overlapped.end);
389 3976060 : }
390 4216098 : if overlapped.start >= range.start && overlapped.end <= range.end {
391 3975798 : // Whole range is overlapped
392 3975798 : removed_accum.add_range(self.ranges[overlap_at].clone());
393 3975798 : self.ranges.remove(overlap_at);
394 3975798 : }
395 : }
396 : }
397 :
398 8933758 : removed_accum.to_keyspace()
399 10836358 : }
400 :
401 10836472 : pub fn start(&self) -> Option<Key> {
402 10836472 : self.ranges.first().map(|range| range.start)
403 10836472 : }
404 :
405 10837348 : pub fn end(&self) -> Option<Key> {
406 10837348 : self.ranges.last().map(|range| range.end)
407 10837348 : }
408 :
409 : /// The size of the keyspace in pages, before accounting for sharding
410 10137256 : pub fn total_raw_size(&self) -> usize {
411 10137256 : self.ranges
412 10137256 : .iter()
413 10137256 : .map(|range| ShardedRange::raw_size(range) as usize)
414 10137256 : .sum()
415 10137256 : }
416 :
417 10297456 : fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
418 12583915 : match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
419 336 : Ok(0) => None,
420 3762446 : Err(0) => None,
421 366694 : Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
422 6167980 : Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
423 368938 : _ => None,
424 : }
425 10297456 : }
426 :
427 : ///
428 : /// Check if key space contains overlapping range
429 : ///
430 2081758 : pub fn overlaps(&self, range: &Range<Key>) -> bool {
431 2081758 : self.overlaps_at(range).is_some()
432 2081758 : }
433 :
434 : /// Check if the keyspace contains a key
435 2075860 : pub fn contains(&self, key: &Key) -> bool {
436 2075860 : self.overlaps(&(*key..key.next()))
437 2075860 : }
438 : }
439 :
440 : ///
441 : /// Represents a partitioning of the key space.
442 : ///
443 : /// The only kind of partitioning we do is to partition the key space into
444 : /// partitions that are roughly equal in physical size (see KeySpace::partition).
445 : /// But this data structure could represent any partitioning.
446 : ///
447 : #[derive(Clone, Debug, Default)]
448 : pub struct KeyPartitioning {
449 : pub parts: Vec<KeySpace>,
450 : }
451 :
452 : /// Represents a partitioning of the sparse key space.
453 : #[derive(Clone, Debug, Default)]
454 : pub struct SparseKeyPartitioning {
455 : pub parts: Vec<SparseKeySpace>,
456 : }
457 :
458 : impl KeyPartitioning {
459 2472 : pub fn new() -> Self {
460 2472 : KeyPartitioning { parts: Vec::new() }
461 2472 : }
462 :
463 : /// Convert a key partitioning to a sparse partition.
464 1236 : pub fn into_sparse(self) -> SparseKeyPartitioning {
465 1236 : SparseKeyPartitioning {
466 1236 : parts: self.parts.into_iter().map(SparseKeySpace).collect(),
467 1236 : }
468 1236 : }
469 : }
470 :
471 : impl SparseKeyPartitioning {
472 : /// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
473 : /// cause long/dead loops.
474 1608 : pub fn into_dense(self) -> KeyPartitioning {
475 1608 : KeyPartitioning {
476 1608 : parts: self.parts.into_iter().map(|x| x.0).collect(),
477 1608 : }
478 1608 : }
479 : }
480 :
481 : ///
482 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
483 : /// object. This takes care of merging adjacent keys and key ranges into
484 : /// contiguous ranges.
485 : ///
486 : #[derive(Clone, Debug, Default)]
487 : pub struct KeySpaceAccum {
488 : accum: Option<Range<Key>>,
489 :
490 : ranges: Vec<Range<Key>>,
491 : size: u64,
492 : }
493 :
494 : impl KeySpaceAccum {
495 6575126 : pub fn new() -> Self {
496 6575126 : Self {
497 6575126 : accum: None,
498 6575126 : ranges: Vec::new(),
499 6575126 : size: 0,
500 6575126 : }
501 6575126 : }
502 :
503 : #[inline(always)]
504 12243918 : pub fn add_key(&mut self, key: Key) {
505 12243918 : self.add_range(singleton_range(key))
506 12243918 : }
507 :
508 : #[inline(always)]
509 16009055 : pub fn add_range(&mut self, range: Range<Key>) {
510 16009055 : self.size += ShardedRange::raw_size(&range) as u64;
511 16009055 :
512 16009055 : match self.accum.as_mut() {
513 12529119 : Some(accum) => {
514 12529119 : if range.start == accum.end {
515 12200466 : accum.end = range.end;
516 12200466 : } else {
517 : // TODO: to efficiently support small sharding stripe sizes, we should avoid starting
518 : // a new range here if the skipped region was all keys that don't belong on this shard.
519 : // (https://github.com/neondatabase/neon/issues/6247)
520 328653 : assert!(range.start > accum.end);
521 328653 : self.ranges.push(accum.clone());
522 328653 : *accum = range;
523 : }
524 : }
525 3479936 : None => self.accum = Some(range),
526 : }
527 16009055 : }
528 :
529 5923260 : pub fn to_keyspace(mut self) -> KeySpace {
530 5923260 : if let Some(accum) = self.accum.take() {
531 2775872 : self.ranges.push(accum);
532 3147388 : }
533 5923260 : KeySpace {
534 5923260 : ranges: self.ranges,
535 5923260 : }
536 5923260 : }
537 :
538 3366 : pub fn consume_keyspace(&mut self) -> KeySpace {
539 3366 : std::mem::take(self).to_keyspace()
540 3366 : }
541 :
542 : // The total number of keys in this object, ignoring any sharding effects that might cause some of
543 : // the keys to be omitted in storage on this shard.
544 7290 : pub fn raw_size(&self) -> u64 {
545 7290 : self.size
546 7290 : }
547 : }
548 :
549 : ///
550 : /// A helper object, to collect a set of keys and key ranges into a KeySpace
551 : /// object. Key ranges may be inserted in any order and can overlap.
552 : ///
553 : #[derive(Clone, Debug, Default)]
554 : pub struct KeySpaceRandomAccum {
555 : ranges: Vec<Range<Key>>,
556 : }
557 :
558 : impl KeySpaceRandomAccum {
559 25873342 : pub fn new() -> Self {
560 25873342 : Self { ranges: Vec::new() }
561 25873342 : }
562 :
563 2000652 : pub fn add_key(&mut self, key: Key) {
564 2000652 : self.add_range(singleton_range(key))
565 2000652 : }
566 :
567 9147663 : pub fn add_range(&mut self, range: Range<Key>) {
568 9147663 : self.ranges.push(range);
569 9147663 : }
570 :
571 2649225 : pub fn add_keyspace(&mut self, keyspace: KeySpace) {
572 5298972 : for range in keyspace.ranges {
573 2649747 : self.add_range(range);
574 2649747 : }
575 2649225 : }
576 :
577 18975948 : pub fn to_keyspace(mut self) -> KeySpace {
578 18975948 : let mut ranges = Vec::new();
579 18975948 : if !self.ranges.is_empty() {
580 8126962 : self.ranges.sort_by_key(|r| r.start);
581 8126962 : let mut start = self.ranges.first().unwrap().start;
582 8126962 : let mut end = self.ranges.first().unwrap().end;
583 17274415 : for r in self.ranges {
584 9147453 : assert!(r.start >= start);
585 9147453 : if r.start > end {
586 770801 : ranges.push(start..end);
587 770801 : start = r.start;
588 770801 : end = r.end;
589 8376652 : } else if r.end > end {
590 13464 : end = r.end;
591 8363188 : }
592 : }
593 8126962 : ranges.push(start..end);
594 10848986 : }
595 18975948 : KeySpace { ranges }
596 18975948 : }
597 :
598 10032752 : pub fn consume_keyspace(&mut self) -> KeySpace {
599 10032752 : let mut prev_accum = KeySpaceRandomAccum::new();
600 10032752 : std::mem::swap(self, &mut prev_accum);
601 10032752 :
602 10032752 : prev_accum.to_keyspace()
603 10032752 : }
604 : }
605 :
606 14244570 : pub fn singleton_range(key: Key) -> Range<Key> {
607 14244570 : key..key.next()
608 14244570 : }
609 :
610 : #[cfg(test)]
611 : mod tests {
612 : use rand::{RngCore, SeedableRng};
613 :
614 : use crate::{
615 : models::ShardParameters,
616 : shard::{ShardCount, ShardNumber},
617 : };
618 :
619 : use super::*;
620 : use std::fmt::Write;
621 :
622 : // Helper function to create a key range.
623 : //
624 : // Make the tests below less verbose.
625 276 : fn kr(irange: Range<i128>) -> Range<Key> {
626 276 : Key::from_i128(irange.start)..Key::from_i128(irange.end)
627 276 : }
628 :
629 : #[allow(dead_code)]
630 0 : fn dump_keyspace(ks: &KeySpace) {
631 0 : for r in ks.ranges.iter() {
632 0 : println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
633 0 : }
634 0 : }
635 :
636 66 : fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
637 66 : if actual.ranges != expected {
638 0 : let mut msg = String::new();
639 0 :
640 0 : writeln!(msg, "expected:").unwrap();
641 0 : for r in &expected {
642 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
643 0 : }
644 0 : writeln!(msg, "got:").unwrap();
645 0 : for r in &actual.ranges {
646 0 : writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
647 0 : }
648 0 : panic!("{}", msg);
649 66 : }
650 66 : }
651 :
652 : #[test]
653 6 : fn keyspace_consume() {
654 6 : let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
655 6 :
656 6 : let mut accum = KeySpaceAccum::new();
657 24 : for range in &ranges {
658 18 : accum.add_range(range.clone());
659 18 : }
660 :
661 6 : let expected_size: u64 = ranges
662 6 : .iter()
663 18 : .map(|r| ShardedRange::raw_size(r) as u64)
664 6 : .sum();
665 6 : assert_eq!(accum.raw_size(), expected_size);
666 :
667 6 : assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
668 6 : assert_eq!(accum.raw_size(), 0);
669 :
670 6 : assert_ks_eq(&accum.consume_keyspace(), vec![]);
671 6 : assert_eq!(accum.raw_size(), 0);
672 :
673 24 : for range in &ranges {
674 18 : accum.add_range(range.clone());
675 18 : }
676 6 : assert_ks_eq(&accum.to_keyspace(), ranges);
677 6 : }
678 :
679 : #[test]
680 6 : fn keyspace_add_range() {
681 6 : // two separate ranges
682 6 : //
683 6 : // #####
684 6 : // #####
685 6 : let mut ks = KeySpaceRandomAccum::default();
686 6 : ks.add_range(kr(0..10));
687 6 : ks.add_range(kr(20..30));
688 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
689 6 :
690 6 : // two separate ranges, added in reverse order
691 6 : //
692 6 : // #####
693 6 : // #####
694 6 : let mut ks = KeySpaceRandomAccum::default();
695 6 : ks.add_range(kr(20..30));
696 6 : ks.add_range(kr(0..10));
697 6 :
698 6 : // add range that is adjacent to the end of an existing range
699 6 : //
700 6 : // #####
701 6 : // #####
702 6 : ks.add_range(kr(0..10));
703 6 : ks.add_range(kr(10..30));
704 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
705 6 :
706 6 : // add range that is adjacent to the start of an existing range
707 6 : //
708 6 : // #####
709 6 : // #####
710 6 : let mut ks = KeySpaceRandomAccum::default();
711 6 : ks.add_range(kr(10..30));
712 6 : ks.add_range(kr(0..10));
713 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
714 6 :
715 6 : // add range that overlaps with the end of an existing range
716 6 : //
717 6 : // #####
718 6 : // #####
719 6 : let mut ks = KeySpaceRandomAccum::default();
720 6 : ks.add_range(kr(0..10));
721 6 : ks.add_range(kr(5..30));
722 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
723 6 :
724 6 : // add range that overlaps with the start of an existing range
725 6 : //
726 6 : // #####
727 6 : // #####
728 6 : let mut ks = KeySpaceRandomAccum::default();
729 6 : ks.add_range(kr(5..30));
730 6 : ks.add_range(kr(0..10));
731 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
732 6 :
733 6 : // add range that is fully covered by an existing range
734 6 : //
735 6 : // #########
736 6 : // #####
737 6 : let mut ks = KeySpaceRandomAccum::default();
738 6 : ks.add_range(kr(0..30));
739 6 : ks.add_range(kr(10..20));
740 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
741 6 :
742 6 : // add range that extends an existing range from both ends
743 6 : //
744 6 : // #####
745 6 : // #########
746 6 : let mut ks = KeySpaceRandomAccum::default();
747 6 : ks.add_range(kr(10..20));
748 6 : ks.add_range(kr(0..30));
749 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
750 6 :
751 6 : // add a range that overlaps with two existing ranges, joining them
752 6 : //
753 6 : // ##### #####
754 6 : // #######
755 6 : let mut ks = KeySpaceRandomAccum::default();
756 6 : ks.add_range(kr(0..10));
757 6 : ks.add_range(kr(20..30));
758 6 : ks.add_range(kr(5..25));
759 6 : assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
760 6 : }
761 :
762 : #[test]
763 6 : fn keyspace_overlaps() {
764 6 : let mut ks = KeySpaceRandomAccum::default();
765 6 : ks.add_range(kr(10..20));
766 6 : ks.add_range(kr(30..40));
767 6 : let ks = ks.to_keyspace();
768 6 :
769 6 : // ##### #####
770 6 : // xxxx
771 6 : assert!(!ks.overlaps(&kr(0..5)));
772 :
773 : // ##### #####
774 : // xxxx
775 6 : assert!(!ks.overlaps(&kr(5..9)));
776 :
777 : // ##### #####
778 : // xxxx
779 6 : assert!(!ks.overlaps(&kr(5..10)));
780 :
781 : // ##### #####
782 : // xxxx
783 6 : assert!(ks.overlaps(&kr(5..11)));
784 :
785 : // ##### #####
786 : // xxxx
787 6 : assert!(ks.overlaps(&kr(10..15)));
788 :
789 : // ##### #####
790 : // xxxx
791 6 : assert!(ks.overlaps(&kr(15..20)));
792 :
793 : // ##### #####
794 : // xxxx
795 6 : assert!(ks.overlaps(&kr(15..25)));
796 :
797 : // ##### #####
798 : // xxxx
799 6 : assert!(!ks.overlaps(&kr(22..28)));
800 :
801 : // ##### #####
802 : // xxxx
803 6 : assert!(!ks.overlaps(&kr(25..30)));
804 :
805 : // ##### #####
806 : // xxxx
807 6 : assert!(ks.overlaps(&kr(35..35)));
808 :
809 : // ##### #####
810 : // xxxx
811 6 : assert!(!ks.overlaps(&kr(40..45)));
812 :
813 : // ##### #####
814 : // xxxx
815 6 : assert!(!ks.overlaps(&kr(45..50)));
816 :
817 : // ##### #####
818 : // xxxxxxxxxxx
819 6 : assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
820 6 : }
821 :
822 : #[test]
823 6 : fn test_remove_full_overlapps() {
824 6 : let mut key_space1 = KeySpace {
825 6 : ranges: vec![
826 6 : Key::from_i128(1)..Key::from_i128(4),
827 6 : Key::from_i128(5)..Key::from_i128(8),
828 6 : Key::from_i128(10)..Key::from_i128(12),
829 6 : ],
830 6 : };
831 6 : let key_space2 = KeySpace {
832 6 : ranges: vec![
833 6 : Key::from_i128(2)..Key::from_i128(3),
834 6 : Key::from_i128(6)..Key::from_i128(7),
835 6 : Key::from_i128(11)..Key::from_i128(13),
836 6 : ],
837 6 : };
838 6 : let removed = key_space1.remove_overlapping_with(&key_space2);
839 6 : let removed_expected = KeySpace {
840 6 : ranges: vec![
841 6 : Key::from_i128(2)..Key::from_i128(3),
842 6 : Key::from_i128(6)..Key::from_i128(7),
843 6 : Key::from_i128(11)..Key::from_i128(12),
844 6 : ],
845 6 : };
846 6 : assert_eq!(removed, removed_expected);
847 :
848 6 : assert_eq!(
849 6 : key_space1.ranges,
850 6 : vec![
851 6 : Key::from_i128(1)..Key::from_i128(2),
852 6 : Key::from_i128(3)..Key::from_i128(4),
853 6 : Key::from_i128(5)..Key::from_i128(6),
854 6 : Key::from_i128(7)..Key::from_i128(8),
855 6 : Key::from_i128(10)..Key::from_i128(11)
856 6 : ]
857 6 : );
858 6 : }
859 :
860 : #[test]
861 6 : fn test_remove_partial_overlaps() {
862 6 : // Test partial ovelaps
863 6 : let mut key_space1 = KeySpace {
864 6 : ranges: vec![
865 6 : Key::from_i128(1)..Key::from_i128(5),
866 6 : Key::from_i128(7)..Key::from_i128(10),
867 6 : Key::from_i128(12)..Key::from_i128(15),
868 6 : ],
869 6 : };
870 6 : let key_space2 = KeySpace {
871 6 : ranges: vec![
872 6 : Key::from_i128(3)..Key::from_i128(6),
873 6 : Key::from_i128(8)..Key::from_i128(11),
874 6 : Key::from_i128(14)..Key::from_i128(17),
875 6 : ],
876 6 : };
877 6 :
878 6 : let removed = key_space1.remove_overlapping_with(&key_space2);
879 6 : let removed_expected = KeySpace {
880 6 : ranges: vec![
881 6 : Key::from_i128(3)..Key::from_i128(5),
882 6 : Key::from_i128(8)..Key::from_i128(10),
883 6 : Key::from_i128(14)..Key::from_i128(15),
884 6 : ],
885 6 : };
886 6 : assert_eq!(removed, removed_expected);
887 :
888 6 : assert_eq!(
889 6 : key_space1.ranges,
890 6 : vec![
891 6 : Key::from_i128(1)..Key::from_i128(3),
892 6 : Key::from_i128(7)..Key::from_i128(8),
893 6 : Key::from_i128(12)..Key::from_i128(14),
894 6 : ]
895 6 : );
896 6 : }
897 :
898 : #[test]
899 6 : fn test_remove_no_overlaps() {
900 6 : let mut key_space1 = KeySpace {
901 6 : ranges: vec![
902 6 : Key::from_i128(1)..Key::from_i128(5),
903 6 : Key::from_i128(7)..Key::from_i128(10),
904 6 : Key::from_i128(12)..Key::from_i128(15),
905 6 : ],
906 6 : };
907 6 : let key_space2 = KeySpace {
908 6 : ranges: vec![
909 6 : Key::from_i128(6)..Key::from_i128(7),
910 6 : Key::from_i128(11)..Key::from_i128(12),
911 6 : Key::from_i128(15)..Key::from_i128(17),
912 6 : ],
913 6 : };
914 6 :
915 6 : let removed = key_space1.remove_overlapping_with(&key_space2);
916 6 : let removed_expected = KeySpace::default();
917 6 : assert_eq!(removed, removed_expected);
918 :
919 6 : assert_eq!(
920 6 : key_space1.ranges,
921 6 : vec![
922 6 : Key::from_i128(1)..Key::from_i128(5),
923 6 : Key::from_i128(7)..Key::from_i128(10),
924 6 : Key::from_i128(12)..Key::from_i128(15),
925 6 : ]
926 6 : );
927 6 : }
928 :
929 : #[test]
930 6 : fn test_remove_one_range_overlaps_multiple() {
931 6 : let mut key_space1 = KeySpace {
932 6 : ranges: vec![
933 6 : Key::from_i128(1)..Key::from_i128(3),
934 6 : Key::from_i128(3)..Key::from_i128(6),
935 6 : Key::from_i128(6)..Key::from_i128(10),
936 6 : Key::from_i128(12)..Key::from_i128(15),
937 6 : Key::from_i128(17)..Key::from_i128(20),
938 6 : Key::from_i128(20)..Key::from_i128(30),
939 6 : Key::from_i128(30)..Key::from_i128(40),
940 6 : ],
941 6 : };
942 6 : let key_space2 = KeySpace {
943 6 : ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
944 6 : };
945 6 :
946 6 : let removed = key_space1.remove_overlapping_with(&key_space2);
947 6 : let removed_expected = KeySpace {
948 6 : ranges: vec![
949 6 : Key::from_i128(9)..Key::from_i128(10),
950 6 : Key::from_i128(12)..Key::from_i128(15),
951 6 : Key::from_i128(17)..Key::from_i128(19),
952 6 : ],
953 6 : };
954 6 : assert_eq!(removed, removed_expected);
955 :
956 6 : assert_eq!(
957 6 : key_space1.ranges,
958 6 : vec![
959 6 : Key::from_i128(1)..Key::from_i128(3),
960 6 : Key::from_i128(3)..Key::from_i128(6),
961 6 : Key::from_i128(6)..Key::from_i128(9),
962 6 : Key::from_i128(19)..Key::from_i128(20),
963 6 : Key::from_i128(20)..Key::from_i128(30),
964 6 : Key::from_i128(30)..Key::from_i128(40),
965 6 : ]
966 6 : );
967 6 : }
968 : #[test]
969 6 : fn sharded_range_relation_gap() {
970 6 : let shard_identity = ShardIdentity::new(
971 6 : ShardNumber(0),
972 6 : ShardCount::new(4),
973 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
974 6 : )
975 6 : .unwrap();
976 6 :
977 6 : let range = ShardedRange::new(
978 6 : Range {
979 6 : start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
980 6 : end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
981 6 : },
982 6 : &shard_identity,
983 6 : );
984 6 :
985 6 : // Key range spans relations, expect MAX
986 6 : assert_eq!(range.page_count(), u32::MAX);
987 6 : }
988 :
989 : #[test]
990 6 : fn shard_identity_keyspaces_single_key() {
991 6 : let shard_identity = ShardIdentity::new(
992 6 : ShardNumber(1),
993 6 : ShardCount::new(4),
994 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
995 6 : )
996 6 : .unwrap();
997 6 :
998 6 : let range = ShardedRange::new(
999 6 : Range {
1000 6 : start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
1001 6 : end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
1002 6 : },
1003 6 : &shard_identity,
1004 6 : );
1005 6 : // Single-key range on logical size key
1006 6 : assert_eq!(range.page_count(), 1);
1007 6 : }
1008 :
1009 : /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation
1010 : #[test]
1011 6 : fn contiguous_range_check() {
1012 6 : assert!(!is_contiguous_range(
1013 6 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1014 6 : ..Key::from_hex("000000067f00000001000004df0100000003").unwrap())
1015 6 : ),);
1016 :
1017 : // The ranges goes all the way up to the 0xffffffff, including it: this is
1018 : // not considered a rel block range because 0xffffffff stores logical sizes,
1019 : // not blocks.
1020 6 : assert!(!is_contiguous_range(
1021 6 : &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap()
1022 6 : ..Key::from_hex("000000067f00000001000004df0100000000").unwrap())
1023 6 : ),);
1024 :
1025 : // Keys within the normal data region of a relation
1026 6 : assert!(is_contiguous_range(
1027 6 : &(Key::from_hex("000000067f00000001000004df0000000000").unwrap()
1028 6 : ..Key::from_hex("000000067f00000001000004df0000000080").unwrap())
1029 6 : ),);
1030 :
1031 : // The logical size key of one forkno, then some blocks in the next
1032 6 : assert!(is_contiguous_range(
1033 6 : &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap()
1034 6 : ..Key::from_hex("000000067f00000001000004df0100000080").unwrap())
1035 6 : ),);
1036 6 : }
1037 :
1038 : #[test]
1039 6 : fn shard_identity_keyspaces_forkno_gap() {
1040 6 : let shard_identity = ShardIdentity::new(
1041 6 : ShardNumber(1),
1042 6 : ShardCount::new(4),
1043 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
1044 6 : )
1045 6 : .unwrap();
1046 6 :
1047 6 : let range = ShardedRange::new(
1048 6 : Range {
1049 6 : start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
1050 6 : end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
1051 6 : },
1052 6 : &shard_identity,
1053 6 : );
1054 6 :
1055 6 : // Range spanning the end of one forkno and the start of the next: we do not attempt to
1056 6 : // calculate a valid size, because we have no way to know if they keys between start
1057 6 : // and end are actually in use.
1058 6 : assert_eq!(range.page_count(), u32::MAX);
1059 6 : }
1060 :
1061 : #[test]
1062 6 : fn shard_identity_keyspaces_one_relation() {
1063 30 : for shard_number in 0..4 {
1064 24 : let shard_identity = ShardIdentity::new(
1065 24 : ShardNumber(shard_number),
1066 24 : ShardCount::new(4),
1067 24 : ShardParameters::DEFAULT_STRIPE_SIZE,
1068 24 : )
1069 24 : .unwrap();
1070 24 :
1071 24 : let range = ShardedRange::new(
1072 24 : Range {
1073 24 : start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
1074 24 : end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
1075 24 : },
1076 24 : &shard_identity,
1077 24 : );
1078 24 :
1079 24 : // Very simple case: range covering block zero of one relation, where that block maps to shard zero
1080 24 : if shard_number == 0 {
1081 6 : assert_eq!(range.page_count(), 1);
1082 : } else {
1083 : // Other shards should perceive the range's size as zero
1084 18 : assert_eq!(range.page_count(), 0);
1085 : }
1086 : }
1087 6 : }
1088 :
1089 : /// Test helper: construct a ShardedRange and call fragment() on it, returning
1090 : /// the total page count in the range and the fragments.
1091 6072 : fn do_fragment(
1092 6072 : range_start: Key,
1093 6072 : range_end: Key,
1094 6072 : shard_identity: &ShardIdentity,
1095 6072 : target_nblocks: u32,
1096 6072 : ) -> (u32, Vec<(u32, Range<Key>)>) {
1097 6072 : let range = ShardedRange::new(
1098 6072 : Range {
1099 6072 : start: range_start,
1100 6072 : end: range_end,
1101 6072 : },
1102 6072 : shard_identity,
1103 6072 : );
1104 6072 :
1105 6072 : let page_count = range.page_count();
1106 6072 : let fragments = range.fragment(target_nblocks);
1107 6072 :
1108 6072 : // Invariant: we always get at least one fragment
1109 6072 : assert!(!fragments.is_empty());
1110 :
1111 : // Invariant: the first/last fragment start/end should equal the input start/end
1112 6072 : assert_eq!(fragments.first().unwrap().1.start, range_start);
1113 6072 : assert_eq!(fragments.last().unwrap().1.end, range_end);
1114 :
1115 6072 : if page_count > 0 {
1116 : // Invariant: every fragment must contain at least one shard-local page, if the
1117 : // total range contains at least one shard-local page
1118 4122 : let all_nonzero = fragments.iter().all(|f| f.0 > 0);
1119 3324 : if !all_nonzero {
1120 0 : eprintln!("Found a zero-length fragment: {:?}", fragments);
1121 3324 : }
1122 3324 : assert!(all_nonzero);
1123 : } else {
1124 : // A range with no shard-local pages should always be returned as a single fragment
1125 2748 : assert_eq!(fragments, vec![(0, range_start..range_end)]);
1126 : }
1127 :
1128 : // Invariant: fragments must be ordered and non-overlapping
1129 6072 : let mut last: Option<Range<Key>> = None;
1130 12942 : for frag in &fragments {
1131 6870 : if let Some(last) = last {
1132 798 : assert!(frag.1.start >= last.end);
1133 798 : assert!(frag.1.start > last.start);
1134 6072 : }
1135 6870 : last = Some(frag.1.clone())
1136 : }
1137 :
1138 : // Invariant: fragments respect target_nblocks
1139 12942 : for frag in &fragments {
1140 6870 : assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks);
1141 : }
1142 :
1143 6072 : (page_count, fragments)
1144 6072 : }
1145 :
1146 : /// Really simple tests for fragment(), on a range that just contains a single stripe
1147 : /// for a single tenant.
1148 : #[test]
1149 6 : fn sharded_range_fragment_simple() {
1150 6 : let shard_identity = ShardIdentity::new(
1151 6 : ShardNumber(0),
1152 6 : ShardCount::new(4),
1153 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
1154 6 : )
1155 6 : .unwrap();
1156 6 :
1157 6 : // A range which we happen to know covers exactly one stripe which belongs to this shard
1158 6 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1159 6 : let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
1160 6 :
1161 6 : // Ask for stripe_size blocks, we get the whole stripe
1162 6 : assert_eq!(
1163 6 : do_fragment(input_start, input_end, &shard_identity, 32768),
1164 6 : (32768, vec![(32768, input_start..input_end)])
1165 6 : );
1166 :
1167 : // Ask for more, we still get the whole stripe
1168 6 : assert_eq!(
1169 6 : do_fragment(input_start, input_end, &shard_identity, 10000000),
1170 6 : (32768, vec![(32768, input_start..input_end)])
1171 6 : );
1172 :
1173 : // Ask for target_nblocks of half the stripe size, we get two halves
1174 6 : assert_eq!(
1175 6 : do_fragment(input_start, input_end, &shard_identity, 16384),
1176 6 : (
1177 6 : 32768,
1178 6 : vec![
1179 6 : (16384, input_start..input_start.add(16384)),
1180 6 : (16384, input_start.add(16384)..input_end)
1181 6 : ]
1182 6 : )
1183 6 : );
1184 6 : }
1185 :
1186 : #[test]
1187 6 : fn sharded_range_fragment_multi_stripe() {
1188 6 : let shard_identity = ShardIdentity::new(
1189 6 : ShardNumber(0),
1190 6 : ShardCount::new(4),
1191 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
1192 6 : )
1193 6 : .unwrap();
1194 6 :
1195 6 : // A range which covers multiple stripes, exactly one of which belongs to the current shard.
1196 6 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1197 6 : let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
1198 6 : // Ask for all the blocks, get a fragment that covers the whole range but reports
1199 6 : // its size to be just the blocks belonging to our shard.
1200 6 : assert_eq!(
1201 6 : do_fragment(input_start, input_end, &shard_identity, 131072),
1202 6 : (32768, vec![(32768, input_start..input_end)])
1203 6 : );
1204 :
1205 : // Ask for a sub-stripe quantity
1206 6 : assert_eq!(
1207 6 : do_fragment(input_start, input_end, &shard_identity, 16000),
1208 6 : (
1209 6 : 32768,
1210 6 : vec![
1211 6 : (16000, input_start..input_start.add(16000)),
1212 6 : (16000, input_start.add(16000)..input_start.add(32000)),
1213 6 : (768, input_start.add(32000)..input_end),
1214 6 : ]
1215 6 : )
1216 6 : );
1217 :
1218 : // Try on a range that starts slightly after our owned stripe
1219 6 : assert_eq!(
1220 6 : do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
1221 6 : (32767, vec![(32767, input_start.add(1)..input_end)])
1222 6 : );
1223 6 : }
1224 :
1225 : /// Test our calculations work correctly when we start a range from the logical size key of
1226 : /// a previous relation.
1227 : #[test]
1228 6 : fn sharded_range_fragment_starting_from_logical_size() {
1229 6 : let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
1230 6 : let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
1231 6 :
1232 6 : // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
1233 6 : let shard_identity = ShardIdentity::new(
1234 6 : ShardNumber(0),
1235 6 : ShardCount::new(4),
1236 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
1237 6 : )
1238 6 : .unwrap();
1239 6 : assert_eq!(
1240 6 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1241 6 : (0x8001, vec![(0x8001, input_start..input_end)])
1242 6 : );
1243 :
1244 : // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
1245 : // store all logical sizes)
1246 6 : let shard_identity = ShardIdentity::new(
1247 6 : ShardNumber(1),
1248 6 : ShardCount::new(4),
1249 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
1250 6 : )
1251 6 : .unwrap();
1252 6 : assert_eq!(
1253 6 : do_fragment(input_start, input_end, &shard_identity, 0x10000),
1254 6 : (0x1, vec![(0x1, input_start..input_end)])
1255 6 : );
1256 6 : }
1257 :
1258 : /// Test that ShardedRange behaves properly when used on un-sharded data
1259 : #[test]
1260 6 : fn sharded_range_fragment_unsharded() {
1261 6 : let shard_identity = ShardIdentity::unsharded();
1262 6 :
1263 6 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1264 6 : let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
1265 6 : assert_eq!(
1266 6 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1267 6 : (
1268 6 : 0x10000,
1269 6 : vec![
1270 6 : (0x8000, input_start..input_start.add(0x8000)),
1271 6 : (0x8000, input_start.add(0x8000)..input_start.add(0x10000))
1272 6 : ]
1273 6 : )
1274 6 : );
1275 6 : }
1276 :
1277 : #[test]
1278 6 : fn sharded_range_fragment_cross_relation() {
1279 6 : let shard_identity = ShardIdentity::unsharded();
1280 6 :
1281 6 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1282 6 : let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
1283 6 : let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
1284 6 : assert_eq!(
1285 6 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1286 6 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1287 6 : );
1288 :
1289 : // Same, but using a sharded identity
1290 6 : let shard_identity = ShardIdentity::new(
1291 6 : ShardNumber(0),
1292 6 : ShardCount::new(4),
1293 6 : ShardParameters::DEFAULT_STRIPE_SIZE,
1294 6 : )
1295 6 : .unwrap();
1296 6 : assert_eq!(
1297 6 : do_fragment(input_start, input_end, &shard_identity, 0x8000),
1298 6 : (u32::MAX, vec![(u32::MAX, input_start..input_end),])
1299 6 : );
1300 6 : }
1301 :
1302 : #[test]
1303 6 : fn sharded_range_fragment_tiny_nblocks() {
1304 6 : let shard_identity = ShardIdentity::unsharded();
1305 6 :
1306 6 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1307 6 : let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap();
1308 6 : let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap();
1309 6 : assert_eq!(
1310 6 : do_fragment(input_start, input_end, &shard_identity, 16),
1311 6 : (
1312 6 : 0x38,
1313 6 : vec![
1314 6 : (16, input_start..input_start.add(16)),
1315 6 : (16, input_start.add(16)..input_start.add(32)),
1316 6 : (16, input_start.add(32)..input_start.add(48)),
1317 6 : (8, input_start.add(48)..input_end),
1318 6 : ]
1319 6 : )
1320 6 : );
1321 6 : }
1322 :
1323 : #[test]
1324 6 : fn sharded_range_fragment_fuzz() {
1325 6 : // Use a fixed seed: we don't want to explicitly pick values, but we do want
1326 6 : // the test to be reproducible.
1327 6 : let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef);
1328 :
1329 6006 : for _i in 0..1000 {
1330 6000 : let shard_identity = if prng.next_u32() % 2 == 0 {
1331 3114 : ShardIdentity::unsharded()
1332 : } else {
1333 2886 : let shard_count = prng.next_u32() % 127 + 1;
1334 2886 : ShardIdentity::new(
1335 2886 : ShardNumber((prng.next_u32() % shard_count) as u8),
1336 2886 : ShardCount::new(shard_count as u8),
1337 2886 : ShardParameters::DEFAULT_STRIPE_SIZE,
1338 2886 : )
1339 2886 : .unwrap()
1340 : };
1341 :
1342 6000 : let target_nblocks = prng.next_u32() % 65536 + 1;
1343 6000 :
1344 6000 : let start_offset = prng.next_u32() % 16384;
1345 6000 :
1346 6000 : // Try ranges up to 4GiB in size, that are always at least 1
1347 6000 : let range_size = prng.next_u32() % 8192 + 1;
1348 6000 :
1349 6000 : // A range that spans relations: expect fragmentation to give up and return a u32::MAX size
1350 6000 : let input_start = Key::from_hex("000000067F00000001000004E10000000000")
1351 6000 : .unwrap()
1352 6000 : .add(start_offset);
1353 6000 : let input_end = input_start.add(range_size);
1354 6000 :
1355 6000 : // This test's main success conditions are the invariants baked into do_fragment
1356 6000 : let (_total_size, fragments) =
1357 6000 : do_fragment(input_start, input_end, &shard_identity, target_nblocks);
1358 6000 :
1359 6000 : // Pick a random key within the range and check it appears in the output
1360 6000 : let example_key = input_start.add(prng.next_u32() % range_size);
1361 6000 :
1362 6000 : // Panic on unwrap if it isn't found
1363 6000 : let example_key_frag = fragments
1364 6000 : .iter()
1365 6438 : .find(|f| f.1.contains(&example_key))
1366 6000 : .unwrap();
1367 6000 :
1368 6000 : // Check that the fragment containing our random key has a nonzero size if
1369 6000 : // that key is shard-local
1370 6000 : let example_key_local = !shard_identity.is_key_disposable(&example_key);
1371 6000 : if example_key_local {
1372 3252 : assert!(example_key_frag.0 > 0);
1373 2748 : }
1374 : }
1375 6 : }
1376 : }
|