Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::{
4 : key::{is_rel_block_key, Key},
5 : models::ShardParameters,
6 : };
7 : use hex::FromHex;
8 : use serde::{Deserialize, Serialize};
9 : use thiserror;
10 : use utils::id::TenantId;
11 :
12 44290764 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
13 : pub struct ShardNumber(pub u8);
14 :
15 114246878 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
16 : pub struct ShardCount(pub u8);
17 :
18 : impl ShardCount {
19 : pub const MAX: Self = Self(u8::MAX);
20 : }
21 :
22 : impl ShardNumber {
23 : pub const MAX: Self = Self(u8::MAX);
24 : }
25 :
26 : /// TenantShardId identify the units of work for the Pageserver.
27 : ///
28 : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
29 : ///
30 : /// # The second shard in a two-shard tenant
31 : /// 072f1291a5310026820b2fe4b2968934-0102
32 : ///
33 : /// Historically, tenants could not have multiple shards, and were identified
34 : /// by TenantId. To support this, TenantShardId has a special legacy
35 : /// mode where `shard_count` is equal to zero: this represents a single-sharded
36 : /// tenant which should be written as a TenantId with no suffix.
37 : ///
38 : /// The human-readable encoding of TenantShardId, such as used in API URLs,
39 : /// is both forward and backward compatible: a legacy TenantId can be
40 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
41 : /// as a TenantId.
42 : ///
43 : /// Note that the binary encoding is _not_ backward compatible, because
44 : /// at the time sharding is introduced, there are no existing binary structures
45 : /// containing TenantId that we need to handle.
46 17098954 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
47 : pub struct TenantShardId {
48 : pub tenant_id: TenantId,
49 : pub shard_number: ShardNumber,
50 : pub shard_count: ShardCount,
51 : }
52 :
53 : impl TenantShardId {
54 690 : pub fn unsharded(tenant_id: TenantId) -> Self {
55 690 : Self {
56 690 : tenant_id,
57 690 : shard_number: ShardNumber(0),
58 690 : shard_count: ShardCount(0),
59 690 : }
60 690 : }
61 :
62 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
63 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
64 23602 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
65 23602 : RangeInclusive::new(
66 23602 : Self {
67 23602 : tenant_id,
68 23602 : shard_number: ShardNumber(0),
69 23602 : shard_count: ShardCount(0),
70 23602 : },
71 23602 : Self {
72 23602 : tenant_id,
73 23602 : shard_number: ShardNumber::MAX,
74 23602 : shard_count: ShardCount::MAX,
75 23602 : },
76 23602 : )
77 23602 : }
78 :
79 8554581 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
80 8554581 : ShardSlug(self)
81 8554581 : }
82 :
83 : /// Convenience for code that has special behavior on the 0th shard.
84 1508636 : pub fn is_zero(&self) -> bool {
85 1508636 : self.shard_number == ShardNumber(0)
86 1508636 : }
87 :
88 1 : pub fn is_unsharded(&self) -> bool {
89 1 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
90 1 : }
91 9985 : pub fn to_index(&self) -> ShardIndex {
92 9985 : ShardIndex {
93 9985 : shard_number: self.shard_number,
94 9985 : shard_count: self.shard_count,
95 9985 : }
96 9985 : }
97 : }
98 :
99 : /// Formatting helper
100 : struct ShardSlug<'a>(&'a TenantShardId);
101 :
102 : impl<'a> std::fmt::Display for ShardSlug<'a> {
103 8554581 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 8554581 : write!(
105 8554581 : f,
106 8554581 : "{:02x}{:02x}",
107 8554581 : self.0.shard_number.0, self.0.shard_count.0
108 8554581 : )
109 8554581 : }
110 : }
111 :
112 : impl std::fmt::Display for TenantShardId {
113 180873 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 180873 : if self.shard_count != ShardCount(0) {
115 2489 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
116 : } else {
117 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
118 : // is distinct from the normal single shard case (shard count == 1).
119 178384 : self.tenant_id.fmt(f)
120 : }
121 180873 : }
122 : }
123 :
124 : impl std::fmt::Debug for TenantShardId {
125 892 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 892 : // Debug is the same as Display: the compact hex representation
127 892 : write!(f, "{}", self)
128 892 : }
129 : }
130 :
131 : impl std::str::FromStr for TenantShardId {
132 : type Err = hex::FromHexError;
133 :
134 82608 : fn from_str(s: &str) -> Result<Self, Self::Err> {
135 82608 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
136 82608 : if s.len() == 32 {
137 : // Legacy case: no shard specified
138 : Ok(Self {
139 80368 : tenant_id: TenantId::from_str(s)?,
140 80368 : shard_number: ShardNumber(0),
141 80368 : shard_count: ShardCount(0),
142 : })
143 2240 : } else if s.len() == 37 {
144 2240 : let bytes = s.as_bytes();
145 2240 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
146 2240 : let mut shard_parts: [u8; 2] = [0u8; 2];
147 2240 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
148 2240 : Ok(Self {
149 2240 : tenant_id,
150 2240 : shard_number: ShardNumber(shard_parts[0]),
151 2240 : shard_count: ShardCount(shard_parts[1]),
152 2240 : })
153 : } else {
154 0 : Err(hex::FromHexError::InvalidStringLength)
155 : }
156 82608 : }
157 : }
158 :
159 : impl From<[u8; 18]> for TenantShardId {
160 4 : fn from(b: [u8; 18]) -> Self {
161 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
162 4 :
163 4 : Self {
164 4 : tenant_id: TenantId::from(tenant_id_bytes),
165 4 : shard_number: ShardNumber(b[16]),
166 4 : shard_count: ShardCount(b[17]),
167 4 : }
168 4 : }
169 : }
170 :
171 : /// For use within the context of a particular tenant, when we need to know which
172 : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
173 : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
174 : /// TenantShardId.
175 703686 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
176 : pub struct ShardIndex {
177 : pub shard_number: ShardNumber,
178 : pub shard_count: ShardCount,
179 : }
180 :
181 : impl ShardIndex {
182 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
183 0 : Self {
184 0 : shard_number: number,
185 0 : shard_count: count,
186 0 : }
187 0 : }
188 59834 : pub fn unsharded() -> Self {
189 59834 : Self {
190 59834 : shard_number: ShardNumber(0),
191 59834 : shard_count: ShardCount(0),
192 59834 : }
193 59834 : }
194 :
195 1259277 : pub fn is_unsharded(&self) -> bool {
196 1259277 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
197 1259277 : }
198 :
199 : /// For use in constructing remote storage paths: concatenate this with a TenantId
200 : /// to get a fully qualified TenantShardId.
201 : ///
202 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
203 : /// that the legacy pre-sharding remote key format is preserved.
204 19331 : pub fn get_suffix(&self) -> String {
205 19331 : if self.is_unsharded() {
206 19314 : "".to_string()
207 : } else {
208 17 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
209 : }
210 19331 : }
211 : }
212 :
213 : impl std::fmt::Display for ShardIndex {
214 28102 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 28102 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
216 28102 : }
217 : }
218 :
219 : impl std::fmt::Debug for ShardIndex {
220 22625 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 22625 : // Debug is the same as Display: the compact hex representation
222 22625 : write!(f, "{}", self)
223 22625 : }
224 : }
225 :
226 : impl std::str::FromStr for ShardIndex {
227 : type Err = hex::FromHexError;
228 :
229 6 : fn from_str(s: &str) -> Result<Self, Self::Err> {
230 6 : // Expect format: 1 byte shard number, 1 byte shard count
231 6 : if s.len() == 4 {
232 6 : let bytes = s.as_bytes();
233 6 : let mut shard_parts: [u8; 2] = [0u8; 2];
234 6 : hex::decode_to_slice(bytes, &mut shard_parts)?;
235 6 : Ok(Self {
236 6 : shard_number: ShardNumber(shard_parts[0]),
237 6 : shard_count: ShardCount(shard_parts[1]),
238 6 : })
239 : } else {
240 0 : Err(hex::FromHexError::InvalidStringLength)
241 : }
242 6 : }
243 : }
244 :
245 : impl From<[u8; 2]> for ShardIndex {
246 2 : fn from(b: [u8; 2]) -> Self {
247 2 : Self {
248 2 : shard_number: ShardNumber(b[0]),
249 2 : shard_count: ShardCount(b[1]),
250 2 : }
251 2 : }
252 : }
253 :
254 : impl Serialize for TenantShardId {
255 9484 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
256 9484 : where
257 9484 : S: serde::Serializer,
258 9484 : {
259 9484 : if serializer.is_human_readable() {
260 9476 : serializer.collect_str(self)
261 : } else {
262 8 : let mut packed: [u8; 18] = [0; 18];
263 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
264 8 : packed[16] = self.shard_number.0;
265 8 : packed[17] = self.shard_count.0;
266 8 :
267 8 : packed.serialize(serializer)
268 : }
269 9484 : }
270 : }
271 :
272 : impl<'de> Deserialize<'de> for TenantShardId {
273 5519 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
274 5519 : where
275 5519 : D: serde::Deserializer<'de>,
276 5519 : {
277 5519 : struct IdVisitor {
278 5519 : is_human_readable_deserializer: bool,
279 5519 : }
280 5519 :
281 5519 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
282 5519 : type Value = TenantShardId;
283 5519 :
284 5519 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
285 0 : if self.is_human_readable_deserializer {
286 5519 : formatter.write_str("value in form of hex string")
287 5519 : } else {
288 5519 : formatter.write_str("value in form of integer array([u8; 18])")
289 5519 : }
290 5519 : }
291 5519 :
292 5519 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
293 4 : where
294 4 : A: serde::de::SeqAccess<'de>,
295 4 : {
296 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
297 5519 : let id: [u8; 18] = Deserialize::deserialize(s)?;
298 5519 : Ok(TenantShardId::from(id))
299 5519 : }
300 5519 :
301 5519 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
302 5515 : where
303 5515 : E: serde::de::Error,
304 5515 : {
305 5515 : TenantShardId::from_str(v).map_err(E::custom)
306 5515 : }
307 5519 : }
308 5519 :
309 5519 : if deserializer.is_human_readable() {
310 5515 : deserializer.deserialize_str(IdVisitor {
311 5515 : is_human_readable_deserializer: true,
312 5515 : })
313 : } else {
314 4 : deserializer.deserialize_tuple(
315 4 : 18,
316 4 : IdVisitor {
317 4 : is_human_readable_deserializer: false,
318 4 : },
319 4 : )
320 : }
321 5519 : }
322 : }
323 :
324 : /// Stripe size in number of pages
325 883 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
326 : pub struct ShardStripeSize(pub u32);
327 :
328 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
329 89659452 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
330 : pub struct ShardLayout(u8);
331 :
332 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
333 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
334 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
335 :
336 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
337 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
338 :
339 : /// The ShardIdentity contains the information needed for one member of map
340 : /// to resolve a key to a shard, and then check whether that shard is ==self.
341 44 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
342 : pub struct ShardIdentity {
343 : pub number: ShardNumber,
344 : pub count: ShardCount,
345 : pub stripe_size: ShardStripeSize,
346 : layout: ShardLayout,
347 : }
348 :
349 10 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
350 : pub enum ShardConfigError {
351 : #[error("Invalid shard count")]
352 : InvalidCount,
353 : #[error("Invalid shard number")]
354 : InvalidNumber,
355 : #[error("Invalid stripe size")]
356 : InvalidStripeSize,
357 : }
358 :
359 : impl ShardIdentity {
360 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
361 : /// tenants. Modern single-shard tenants should not use this: they should
362 : /// have number=0 count=1.
363 906 : pub fn unsharded() -> Self {
364 906 : Self {
365 906 : number: ShardNumber(0),
366 906 : count: ShardCount(0),
367 906 : layout: LAYOUT_V1,
368 906 : stripe_size: DEFAULT_STRIPE_SIZE,
369 906 : }
370 906 : }
371 :
372 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
373 : /// which are constructed in code paths that don't have access to proper configuration.
374 : ///
375 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
376 : /// Enforcement is via assertions, to avoid making our interface fallible for this
377 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
378 : /// state, and by extension to avoid trying to do any page->shard resolution.
379 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
380 0 : Self {
381 0 : number,
382 0 : count,
383 0 : layout: LAYOUT_BROKEN,
384 0 : stripe_size: DEFAULT_STRIPE_SIZE,
385 0 : }
386 0 : }
387 :
388 1890 : pub fn is_unsharded(&self) -> bool {
389 1890 : self.number == ShardNumber(0) && self.count == ShardCount(0)
390 1890 : }
391 :
392 : /// Count must be nonzero, and number must be < count. To construct
393 : /// the legacy case (count==0), use Self::unsharded instead.
394 60 : pub fn new(
395 60 : number: ShardNumber,
396 60 : count: ShardCount,
397 60 : stripe_size: ShardStripeSize,
398 60 : ) -> Result<Self, ShardConfigError> {
399 60 : if count.0 == 0 {
400 2 : Err(ShardConfigError::InvalidCount)
401 58 : } else if number.0 > count.0 - 1 {
402 6 : Err(ShardConfigError::InvalidNumber)
403 52 : } else if stripe_size.0 == 0 {
404 2 : Err(ShardConfigError::InvalidStripeSize)
405 : } else {
406 50 : Ok(Self {
407 50 : number,
408 50 : count,
409 50 : layout: LAYOUT_V1,
410 50 : stripe_size,
411 50 : })
412 : }
413 60 : }
414 :
415 : /// For use when creating ShardIdentity instances for new shards, where a creation request
416 : /// specifies the ShardParameters that apply to all shards.
417 759 : pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
418 759 : Self {
419 759 : number,
420 759 : count: params.count,
421 759 : layout: LAYOUT_V1,
422 759 : stripe_size: params.stripe_size,
423 759 : }
424 759 : }
425 :
426 89659452 : fn is_broken(&self) -> bool {
427 89659452 : self.layout == LAYOUT_BROKEN
428 89659452 : }
429 :
430 783934 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
431 783934 : assert!(!self.is_broken());
432 783934 : key_to_shard_number(self.count, self.stripe_size, key)
433 783934 : }
434 :
435 : /// Return true if the key should be ingested by this shard
436 88875518 : pub fn is_key_local(&self, key: &Key) -> bool {
437 88875518 : assert!(!self.is_broken());
438 88875518 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
439 69823070 : true
440 : } else {
441 19052448 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
442 : }
443 88875518 : }
444 :
445 : /// Return true if the key should be discarded if found in this shard's
446 : /// data store, e.g. during compaction after a split
447 26590132 : pub fn is_key_disposable(&self, key: &Key) -> bool {
448 26590132 : if key_is_shard0(key) {
449 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
450 : // A: because the WAL ingestion logic currently ingests some shard 0
451 : // content on all shards, even though it's only read on shard 0. If we
452 : // dropped it, then subsequent WAL ingest to these keys would encounter
453 : // an error.
454 4955851 : false
455 : } else {
456 21634281 : !self.is_key_local(key)
457 : }
458 26590132 : }
459 :
460 0 : pub fn shard_slug(&self) -> String {
461 0 : if self.count > ShardCount(0) {
462 0 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
463 : } else {
464 0 : String::new()
465 : }
466 0 : }
467 :
468 : /// Convenience for checking if this identity is the 0th shard in a tenant,
469 : /// for special cases on shard 0 such as ingesting relation sizes.
470 13873379 : pub fn is_zero(&self) -> bool {
471 13873379 : self.number == ShardNumber(0)
472 13873379 : }
473 : }
474 :
475 : impl Serialize for ShardIndex {
476 80 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
477 80 : where
478 80 : S: serde::Serializer,
479 80 : {
480 80 : if serializer.is_human_readable() {
481 76 : serializer.collect_str(self)
482 : } else {
483 : // Binary encoding is not used in index_part.json, but is included in anticipation of
484 : // switching various structures (e.g. inter-process communication, remote metadata) to more
485 : // compact binary encodings in future.
486 4 : let mut packed: [u8; 2] = [0; 2];
487 4 : packed[0] = self.shard_number.0;
488 4 : packed[1] = self.shard_count.0;
489 4 : packed.serialize(serializer)
490 : }
491 80 : }
492 : }
493 :
494 : impl<'de> Deserialize<'de> for ShardIndex {
495 6 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
496 6 : where
497 6 : D: serde::Deserializer<'de>,
498 6 : {
499 6 : struct IdVisitor {
500 6 : is_human_readable_deserializer: bool,
501 6 : }
502 6 :
503 6 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
504 6 : type Value = ShardIndex;
505 6 :
506 6 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
507 0 : if self.is_human_readable_deserializer {
508 6 : formatter.write_str("value in form of hex string")
509 6 : } else {
510 6 : formatter.write_str("value in form of integer array([u8; 2])")
511 6 : }
512 6 : }
513 6 :
514 6 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
515 2 : where
516 2 : A: serde::de::SeqAccess<'de>,
517 2 : {
518 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
519 6 : let id: [u8; 2] = Deserialize::deserialize(s)?;
520 6 : Ok(ShardIndex::from(id))
521 6 : }
522 6 :
523 6 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
524 4 : where
525 4 : E: serde::de::Error,
526 4 : {
527 4 : ShardIndex::from_str(v).map_err(E::custom)
528 4 : }
529 6 : }
530 6 :
531 6 : if deserializer.is_human_readable() {
532 4 : deserializer.deserialize_str(IdVisitor {
533 4 : is_human_readable_deserializer: true,
534 4 : })
535 : } else {
536 2 : deserializer.deserialize_tuple(
537 2 : 2,
538 2 : IdVisitor {
539 2 : is_human_readable_deserializer: false,
540 2 : },
541 2 : )
542 : }
543 6 : }
544 : }
545 :
546 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
547 : /// in order to be able to serve basebackup requests without peer communication).
548 65237179 : fn key_is_shard0(key: &Key) -> bool {
549 65237179 : // To decide what to shard out to shards >0, we apply a simple rule that only
550 65237179 : // relation pages are distributed to shards other than shard zero. Everything else gets
551 65237179 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
552 65237179 : // requests, and any request other than those for particular blocks in relations.
553 65237179 : !is_rel_block_key(key)
554 65237179 : }
555 :
556 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
557 39189200 : fn murmurhash32(mut h: u32) -> u32 {
558 39189200 : h ^= h >> 16;
559 39189200 : h = h.wrapping_mul(0x85ebca6b);
560 39189200 : h ^= h >> 13;
561 39189200 : h = h.wrapping_mul(0xc2b2ae35);
562 39189200 : h ^= h >> 16;
563 39189200 : h
564 39189200 : }
565 :
566 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
567 19594601 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
568 19594601 : b = b.wrapping_add(0x9e3779b9);
569 19594601 : b = b.wrapping_add(a << 6);
570 19594601 : b = b.wrapping_add(a >> 2);
571 19594601 :
572 19594601 : a ^= b;
573 19594601 : a
574 19594601 : }
575 :
576 : /// Where a Key is to be distributed across shards, select the shard. This function
577 : /// does not account for keys that should be broadcast across shards.
578 : ///
579 : /// The hashing in this function must exactly match what we do in postgres smgr
580 : /// code. The resulting distribution of pages is intended to preserve locality within
581 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
582 : /// distributing data pseudo-randomly.
583 : ///
584 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
585 : /// and will be handled at higher levels when shards are split.
586 19836384 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
587 19836384 : // Fast path for un-sharded tenants or broadcast keys
588 19836384 : if count < ShardCount(2) || key_is_shard0(key) {
589 241785 : return ShardNumber(0);
590 19594599 : }
591 19594599 :
592 19594599 : // relNode
593 19594599 : let mut hash = murmurhash32(key.field4);
594 19594599 : // blockNum/stripe size
595 19594599 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
596 19594599 :
597 19594599 : ShardNumber((hash % count.0 as u32) as u8)
598 19836384 : }
599 :
600 : #[cfg(test)]
601 : mod tests {
602 : use std::str::FromStr;
603 :
604 : use bincode;
605 : use utils::{id::TenantId, Hex};
606 :
607 : use super::*;
608 :
609 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
610 :
611 2 : #[test]
612 2 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
613 2 : let example = TenantShardId {
614 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
615 2 : shard_count: ShardCount(10),
616 2 : shard_number: ShardNumber(7),
617 2 : };
618 2 :
619 2 : let encoded = format!("{example}");
620 2 :
621 2 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
622 2 : assert_eq!(&encoded, &expected);
623 :
624 2 : let decoded = TenantShardId::from_str(&encoded)?;
625 :
626 2 : assert_eq!(example, decoded);
627 :
628 2 : Ok(())
629 2 : }
630 :
631 2 : #[test]
632 2 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
633 2 : let example = TenantShardId {
634 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
635 2 : shard_count: ShardCount(10),
636 2 : shard_number: ShardNumber(7),
637 2 : };
638 2 :
639 2 : let encoded = bincode::serialize(&example).unwrap();
640 2 : let expected: [u8; 18] = [
641 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
642 2 : 0xf6, 0xfc, 0x07, 0x0a,
643 2 : ];
644 2 : assert_eq!(Hex(&encoded), Hex(&expected));
645 :
646 2 : let decoded = bincode::deserialize(&encoded).unwrap();
647 2 :
648 2 : assert_eq!(example, decoded);
649 :
650 2 : Ok(())
651 2 : }
652 :
653 2 : #[test]
654 2 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
655 2 : // Test that TenantShardId can decode a TenantId in human
656 2 : // readable form
657 2 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
658 2 : let encoded = format!("{example}");
659 2 :
660 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
661 :
662 2 : let decoded = TenantShardId::from_str(&encoded)?;
663 :
664 2 : assert_eq!(example, decoded.tenant_id);
665 2 : assert_eq!(decoded.shard_count, ShardCount(0));
666 2 : assert_eq!(decoded.shard_number, ShardNumber(0));
667 :
668 2 : Ok(())
669 2 : }
670 :
671 2 : #[test]
672 2 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
673 2 : // Test that a legacy TenantShardId encodes into a form that
674 2 : // can be decoded as TenantId
675 2 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
676 2 : let example = TenantShardId::unsharded(example_tenant_id);
677 2 : let encoded = format!("{example}");
678 2 :
679 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
680 :
681 2 : let decoded = TenantId::from_str(&encoded)?;
682 :
683 2 : assert_eq!(example_tenant_id, decoded);
684 :
685 2 : Ok(())
686 2 : }
687 :
688 2 : #[test]
689 2 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
690 2 : // Unlike in human readable encoding, binary encoding does not
691 2 : // do any special handling of legacy unsharded TenantIds: this test
692 2 : // is equivalent to the main test for binary encoding, just verifying
693 2 : // that the same behavior applies when we have used `unsharded()` to
694 2 : // construct a TenantShardId.
695 2 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
696 2 : let encoded = bincode::serialize(&example).unwrap();
697 2 :
698 2 : let expected: [u8; 18] = [
699 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
700 2 : 0xf6, 0xfc, 0x00, 0x00,
701 2 : ];
702 2 : assert_eq!(Hex(&encoded), Hex(&expected));
703 :
704 2 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
705 2 : assert_eq!(example, decoded);
706 :
707 2 : Ok(())
708 2 : }
709 :
710 2 : #[test]
711 2 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
712 2 : // Happy cases
713 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
714 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
715 2 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
716 :
717 2 : assert_eq!(
718 2 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
719 2 : Err(ShardConfigError::InvalidCount)
720 2 : );
721 2 : assert_eq!(
722 2 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
723 2 : Err(ShardConfigError::InvalidNumber)
724 2 : );
725 2 : assert_eq!(
726 2 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
727 2 : Err(ShardConfigError::InvalidNumber)
728 2 : );
729 2 : assert_eq!(
730 2 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
731 2 : Err(ShardConfigError::InvalidNumber)
732 2 : );
733 2 : assert_eq!(
734 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
735 2 : Err(ShardConfigError::InvalidStripeSize)
736 2 : );
737 :
738 2 : Ok(())
739 2 : }
740 :
741 2 : #[test]
742 2 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
743 2 : let example = ShardIndex {
744 2 : shard_number: ShardNumber(13),
745 2 : shard_count: ShardCount(17),
746 2 : };
747 2 : let expected: String = "0d11".to_string();
748 2 : let encoded = format!("{example}");
749 2 : assert_eq!(&encoded, &expected);
750 :
751 2 : let decoded = ShardIndex::from_str(&encoded)?;
752 2 : assert_eq!(example, decoded);
753 2 : Ok(())
754 2 : }
755 :
756 2 : #[test]
757 2 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
758 2 : let example = ShardIndex {
759 2 : shard_number: ShardNumber(13),
760 2 : shard_count: ShardCount(17),
761 2 : };
762 2 : let expected: [u8; 2] = [0x0d, 0x11];
763 2 :
764 2 : let encoded = bincode::serialize(&example).unwrap();
765 2 : assert_eq!(Hex(&encoded), Hex(&expected));
766 2 : let decoded = bincode::deserialize(&encoded).unwrap();
767 2 : assert_eq!(example, decoded);
768 :
769 2 : Ok(())
770 2 : }
771 :
772 : // These are only smoke tests to spot check that our implementation doesn't
773 : // deviate from a few examples values: not aiming to validate the overall
774 : // hashing algorithm.
775 2 : #[test]
776 2 : fn murmur_hash() {
777 2 : assert_eq!(murmurhash32(0), 0);
778 :
779 2 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
780 2 : }
781 :
782 2 : #[test]
783 2 : fn shard_mapping() {
784 2 : let key = Key {
785 2 : field1: 0x00,
786 2 : field2: 0x67f,
787 2 : field3: 0x5,
788 2 : field4: 0x400c,
789 2 : field5: 0x00,
790 2 : field6: 0x7d06,
791 2 : };
792 2 :
793 2 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
794 2 : assert_eq!(shard, ShardNumber(8));
795 2 : }
796 : }
|