TLA Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::key::{is_rel_block_key, Key};
4 : use hex::FromHex;
5 : use serde::{Deserialize, Serialize};
6 : use thiserror;
7 : use utils::id::TenantId;
8 :
9 CBC 11069924 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
10 : pub struct ShardNumber(pub u8);
11 :
12 69860880 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
13 : pub struct ShardCount(pub u8);
14 :
15 : impl ShardCount {
16 : pub const MAX: Self = Self(u8::MAX);
17 : }
18 :
19 : impl ShardNumber {
20 : pub const MAX: Self = Self(u8::MAX);
21 : }
22 :
23 : /// TenantShardId identify the units of work for the Pageserver.
24 : ///
25 : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
26 : ///
27 : /// # The second shard in a two-shard tenant
28 : /// 072f1291a5310026820b2fe4b2968934-0102
29 : ///
30 : /// Historically, tenants could not have multiple shards, and were identified
31 : /// by TenantId. To support this, TenantShardId has a special legacy
32 : /// mode where `shard_count` is equal to zero: this represents a single-sharded
33 : /// tenant which should be written as a TenantId with no suffix.
34 : ///
35 : /// The human-readable encoding of TenantShardId, such as used in API URLs,
36 : /// is both forward and backward compatible: a legacy TenantId can be
37 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
38 : /// as a TenantId.
39 : ///
40 : /// Note that the binary encoding is _not_ backward compatible, because
41 : /// at the time sharding is introduced, there are no existing binary structures
42 : /// containing TenantId that we need to handle.
43 12041212 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
44 : pub struct TenantShardId {
45 : pub tenant_id: TenantId,
46 : pub shard_number: ShardNumber,
47 : pub shard_count: ShardCount,
48 : }
49 :
50 : impl TenantShardId {
51 798 : pub fn unsharded(tenant_id: TenantId) -> Self {
52 798 : Self {
53 798 : tenant_id,
54 798 : shard_number: ShardNumber(0),
55 798 : shard_count: ShardCount(0),
56 798 : }
57 798 : }
58 :
59 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
60 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
61 8067 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
62 8067 : RangeInclusive::new(
63 8067 : Self {
64 8067 : tenant_id,
65 8067 : shard_number: ShardNumber(0),
66 8067 : shard_count: ShardCount(0),
67 8067 : },
68 8067 : Self {
69 8067 : tenant_id,
70 8067 : shard_number: ShardNumber::MAX,
71 8067 : shard_count: ShardCount::MAX,
72 8067 : },
73 8067 : )
74 8067 : }
75 :
76 2121323 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
77 2121323 : ShardSlug(self)
78 2121323 : }
79 :
80 : /// Convenience for code that has special behavior on the 0th shard.
81 136 : pub fn is_zero(&self) -> bool {
82 136 : self.shard_number == ShardNumber(0)
83 136 : }
84 :
85 UBC 0 : pub fn is_unsharded(&self) -> bool {
86 0 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
87 0 : }
88 : }
89 :
90 : /// Formatting helper
91 : struct ShardSlug<'a>(&'a TenantShardId);
92 :
93 : impl<'a> std::fmt::Display for ShardSlug<'a> {
94 CBC 2121323 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 2121323 : write!(
96 2121323 : f,
97 2121323 : "{:02x}{:02x}",
98 2121323 : self.0.shard_number.0, self.0.shard_count.0
99 2121323 : )
100 2121323 : }
101 : }
102 :
103 : impl std::fmt::Display for TenantShardId {
104 164121 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 164121 : if self.shard_count != ShardCount(0) {
106 1 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
107 : } else {
108 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
109 : // is distinct from the normal single shard case (shard count == 1).
110 164120 : self.tenant_id.fmt(f)
111 : }
112 164121 : }
113 : }
114 :
115 : impl std::fmt::Debug for TenantShardId {
116 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 0 : // Debug is the same as Display: the compact hex representation
118 0 : write!(f, "{}", self)
119 0 : }
120 : }
121 :
122 : impl std::str::FromStr for TenantShardId {
123 : type Err = hex::FromHexError;
124 :
125 CBC 10148 : fn from_str(s: &str) -> Result<Self, Self::Err> {
126 10148 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
127 10148 : if s.len() == 32 {
128 : // Legacy case: no shard specified
129 : Ok(Self {
130 10147 : tenant_id: TenantId::from_str(s)?,
131 10147 : shard_number: ShardNumber(0),
132 10147 : shard_count: ShardCount(0),
133 : })
134 1 : } else if s.len() == 37 {
135 1 : let bytes = s.as_bytes();
136 1 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
137 1 : let mut shard_parts: [u8; 2] = [0u8; 2];
138 1 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
139 1 : Ok(Self {
140 1 : tenant_id,
141 1 : shard_number: ShardNumber(shard_parts[0]),
142 1 : shard_count: ShardCount(shard_parts[1]),
143 1 : })
144 : } else {
145 UBC 0 : Err(hex::FromHexError::InvalidStringLength)
146 : }
147 CBC 10148 : }
148 : }
149 :
150 : impl From<[u8; 18]> for TenantShardId {
151 2 : fn from(b: [u8; 18]) -> Self {
152 2 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
153 2 :
154 2 : Self {
155 2 : tenant_id: TenantId::from(tenant_id_bytes),
156 2 : shard_number: ShardNumber(b[16]),
157 2 : shard_count: ShardCount(b[17]),
158 2 : }
159 2 : }
160 : }
161 :
162 : /// For use within the context of a particular tenant, when we need to know which
163 : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
164 : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
165 : /// TenantShardId.
166 569516 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
167 : pub struct ShardIndex {
168 : pub shard_number: ShardNumber,
169 : pub shard_count: ShardCount,
170 : }
171 :
172 : impl ShardIndex {
173 UBC 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
174 0 : Self {
175 0 : shard_number: number,
176 0 : shard_count: count,
177 0 : }
178 0 : }
179 CBC 61922 : pub fn unsharded() -> Self {
180 61922 : Self {
181 61922 : shard_number: ShardNumber(0),
182 61922 : shard_count: ShardCount(0),
183 61922 : }
184 61922 : }
185 :
186 990479 : pub fn is_unsharded(&self) -> bool {
187 990479 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
188 990479 : }
189 :
190 : /// For use in constructing remote storage paths: concatenate this with a TenantId
191 : /// to get a fully qualified TenantShardId.
192 : ///
193 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
194 : /// that the legacy pre-sharding remote key format is preserved.
195 17953 : pub fn get_suffix(&self) -> String {
196 17953 : if self.is_unsharded() {
197 17953 : "".to_string()
198 : } else {
199 UBC 0 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
200 : }
201 CBC 17953 : }
202 : }
203 :
204 : impl std::fmt::Display for ShardIndex {
205 4934 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 4934 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
207 4934 : }
208 : }
209 :
210 : impl std::fmt::Debug for ShardIndex {
211 633 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 633 : // Debug is the same as Display: the compact hex representation
213 633 : write!(f, "{}", self)
214 633 : }
215 : }
216 :
217 : impl std::str::FromStr for ShardIndex {
218 : type Err = hex::FromHexError;
219 :
220 1 : fn from_str(s: &str) -> Result<Self, Self::Err> {
221 1 : // Expect format: 1 byte shard number, 1 byte shard count
222 1 : if s.len() == 4 {
223 1 : let bytes = s.as_bytes();
224 1 : let mut shard_parts: [u8; 2] = [0u8; 2];
225 1 : hex::decode_to_slice(bytes, &mut shard_parts)?;
226 1 : Ok(Self {
227 1 : shard_number: ShardNumber(shard_parts[0]),
228 1 : shard_count: ShardCount(shard_parts[1]),
229 1 : })
230 : } else {
231 UBC 0 : Err(hex::FromHexError::InvalidStringLength)
232 : }
233 CBC 1 : }
234 : }
235 :
236 : impl From<[u8; 2]> for ShardIndex {
237 1 : fn from(b: [u8; 2]) -> Self {
238 1 : Self {
239 1 : shard_number: ShardNumber(b[0]),
240 1 : shard_count: ShardCount(b[1]),
241 1 : }
242 1 : }
243 : }
244 :
245 : impl Serialize for TenantShardId {
246 5616 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
247 5616 : where
248 5616 : S: serde::Serializer,
249 5616 : {
250 5616 : if serializer.is_human_readable() {
251 5612 : serializer.collect_str(self)
252 : } else {
253 4 : let mut packed: [u8; 18] = [0; 18];
254 4 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
255 4 : packed[16] = self.shard_number.0;
256 4 : packed[17] = self.shard_count.0;
257 4 :
258 4 : packed.serialize(serializer)
259 : }
260 5616 : }
261 : }
262 :
263 : impl<'de> Deserialize<'de> for TenantShardId {
264 2494 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
265 2494 : where
266 2494 : D: serde::Deserializer<'de>,
267 2494 : {
268 2494 : struct IdVisitor {
269 2494 : is_human_readable_deserializer: bool,
270 2494 : }
271 2494 :
272 2494 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
273 2494 : type Value = TenantShardId;
274 2494 :
275 2494 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
276 UBC 0 : if self.is_human_readable_deserializer {
277 CBC 2494 : formatter.write_str("value in form of hex string")
278 2494 : } else {
279 2494 : formatter.write_str("value in form of integer array([u8; 18])")
280 2494 : }
281 2494 : }
282 2494 :
283 2494 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
284 2 : where
285 2 : A: serde::de::SeqAccess<'de>,
286 2 : {
287 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
288 2494 : let id: [u8; 18] = Deserialize::deserialize(s)?;
289 2494 : Ok(TenantShardId::from(id))
290 2494 : }
291 2494 :
292 2494 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
293 2492 : where
294 2492 : E: serde::de::Error,
295 2492 : {
296 2492 : TenantShardId::from_str(v).map_err(E::custom)
297 2492 : }
298 2494 : }
299 2494 :
300 2494 : if deserializer.is_human_readable() {
301 2492 : deserializer.deserialize_str(IdVisitor {
302 2492 : is_human_readable_deserializer: true,
303 2492 : })
304 : } else {
305 2 : deserializer.deserialize_tuple(
306 2 : 18,
307 2 : IdVisitor {
308 2 : is_human_readable_deserializer: false,
309 2 : },
310 2 : )
311 : }
312 2494 : }
313 : }
314 :
315 : /// Stripe size in number of pages
316 UBC 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
317 : pub struct ShardStripeSize(pub u32);
318 :
319 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
320 CBC 69860879 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
321 : pub struct ShardLayout(u8);
322 :
323 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
324 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
325 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
326 :
327 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
328 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
329 :
330 : /// The ShardIdentity contains the information needed for one member of map
331 : /// to resolve a key to a shard, and then check whether that shard is ==self.
332 20 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
333 : pub struct ShardIdentity {
334 : pub number: ShardNumber,
335 : pub count: ShardCount,
336 : stripe_size: ShardStripeSize,
337 : layout: ShardLayout,
338 : }
339 :
340 5 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
341 : pub enum ShardConfigError {
342 : #[error("Invalid shard count")]
343 : InvalidCount,
344 : #[error("Invalid shard number")]
345 : InvalidNumber,
346 : #[error("Invalid stripe size")]
347 : InvalidStripeSize,
348 : }
349 :
350 : impl ShardIdentity {
351 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
352 : /// tenants. Modern single-shard tenants should not use this: they should
353 : /// have number=0 count=1.
354 917 : pub fn unsharded() -> Self {
355 917 : Self {
356 917 : number: ShardNumber(0),
357 917 : count: ShardCount(0),
358 917 : layout: LAYOUT_V1,
359 917 : stripe_size: DEFAULT_STRIPE_SIZE,
360 917 : }
361 917 : }
362 :
363 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
364 : /// which are constructed in code paths that don't have access to proper configuration.
365 : ///
366 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
367 : /// Enforcement is via assertions, to avoid making our interface fallible for this
368 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
369 : /// state, and by extension to avoid trying to do any page->shard resolution.
370 UBC 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
371 0 : Self {
372 0 : number,
373 0 : count,
374 0 : layout: LAYOUT_BROKEN,
375 0 : stripe_size: DEFAULT_STRIPE_SIZE,
376 0 : }
377 0 : }
378 :
379 CBC 1624 : pub fn is_unsharded(&self) -> bool {
380 1624 : self.number == ShardNumber(0) && self.count == ShardCount(0)
381 1624 : }
382 :
383 : /// Count must be nonzero, and number must be < count. To construct
384 : /// the legacy case (count==0), use Self::unsharded instead.
385 8 : pub fn new(
386 8 : number: ShardNumber,
387 8 : count: ShardCount,
388 8 : stripe_size: ShardStripeSize,
389 8 : ) -> Result<Self, ShardConfigError> {
390 8 : if count.0 == 0 {
391 1 : Err(ShardConfigError::InvalidCount)
392 7 : } else if number.0 > count.0 - 1 {
393 3 : Err(ShardConfigError::InvalidNumber)
394 4 : } else if stripe_size.0 == 0 {
395 1 : Err(ShardConfigError::InvalidStripeSize)
396 : } else {
397 3 : Ok(Self {
398 3 : number,
399 3 : count,
400 3 : layout: LAYOUT_V1,
401 3 : stripe_size,
402 3 : })
403 : }
404 8 : }
405 :
406 69860879 : fn is_broken(&self) -> bool {
407 69860879 : self.layout == LAYOUT_BROKEN
408 69860879 : }
409 :
410 228408 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
411 228408 : assert!(!self.is_broken());
412 228408 : key_to_shard_number(self.count, self.stripe_size, key)
413 228408 : }
414 :
415 : /// Return true if the key should be ingested by this shard
416 69632471 : pub fn is_key_local(&self, key: &Key) -> bool {
417 69632471 : assert!(!self.is_broken());
418 69632471 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
419 69632471 : true
420 : } else {
421 UBC 0 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
422 : }
423 CBC 69632471 : }
424 :
425 : /// Return true if the key should be discarded if found in this shard's
426 : /// data store, e.g. during compaction after a split
427 22558332 : pub fn is_key_disposable(&self, key: &Key) -> bool {
428 22558332 : if key_is_shard0(key) {
429 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
430 : // A: because the WAL ingestion logic currently ingests some shard 0
431 : // content on all shards, even though it's only read on shard 0. If we
432 : // dropped it, then subsequent WAL ingest to these keys would encounter
433 : // an error.
434 3510804 : false
435 : } else {
436 19047528 : !self.is_key_local(key)
437 : }
438 22558332 : }
439 :
440 UBC 0 : pub fn shard_slug(&self) -> String {
441 0 : if self.count > ShardCount(0) {
442 0 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
443 : } else {
444 0 : String::new()
445 : }
446 0 : }
447 :
448 : /// Convenience for checking if this identity is the 0th shard in a tenant,
449 : /// for special cases on shard 0 such as ingesting relation sizes.
450 0 : pub fn is_zero(&self) -> bool {
451 0 : self.number == ShardNumber(0)
452 0 : }
453 : }
454 :
455 : impl Serialize for ShardIndex {
456 CBC 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
457 2 : where
458 2 : S: serde::Serializer,
459 2 : {
460 2 : if serializer.is_human_readable() {
461 UBC 0 : serializer.collect_str(self)
462 : } else {
463 : // Binary encoding is not used in index_part.json, but is included in anticipation of
464 : // switching various structures (e.g. inter-process communication, remote metadata) to more
465 : // compact binary encodings in future.
466 CBC 2 : let mut packed: [u8; 2] = [0; 2];
467 2 : packed[0] = self.shard_number.0;
468 2 : packed[1] = self.shard_count.0;
469 2 : packed.serialize(serializer)
470 : }
471 2 : }
472 : }
473 :
474 : impl<'de> Deserialize<'de> for ShardIndex {
475 1 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
476 1 : where
477 1 : D: serde::Deserializer<'de>,
478 1 : {
479 1 : struct IdVisitor {
480 1 : is_human_readable_deserializer: bool,
481 1 : }
482 1 :
483 1 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
484 1 : type Value = ShardIndex;
485 1 :
486 1 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
487 UBC 0 : if self.is_human_readable_deserializer {
488 CBC 1 : formatter.write_str("value in form of hex string")
489 1 : } else {
490 1 : formatter.write_str("value in form of integer array([u8; 2])")
491 1 : }
492 1 : }
493 1 :
494 1 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
495 1 : where
496 1 : A: serde::de::SeqAccess<'de>,
497 1 : {
498 1 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
499 1 : let id: [u8; 2] = Deserialize::deserialize(s)?;
500 1 : Ok(ShardIndex::from(id))
501 1 : }
502 1 :
503 1 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
504 UBC 0 : where
505 0 : E: serde::de::Error,
506 0 : {
507 0 : ShardIndex::from_str(v).map_err(E::custom)
508 0 : }
509 CBC 1 : }
510 1 :
511 1 : if deserializer.is_human_readable() {
512 UBC 0 : deserializer.deserialize_str(IdVisitor {
513 0 : is_human_readable_deserializer: true,
514 0 : })
515 : } else {
516 CBC 1 : deserializer.deserialize_tuple(
517 1 : 2,
518 1 : IdVisitor {
519 1 : is_human_readable_deserializer: false,
520 1 : },
521 1 : )
522 : }
523 1 : }
524 : }
525 :
526 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
527 : /// in order to be able to serve basebackup requests without peer communication).
528 22558333 : fn key_is_shard0(key: &Key) -> bool {
529 22558333 : // To decide what to shard out to shards >0, we apply a simple rule that only
530 22558333 : // relation pages are distributed to shards other than shard zero. Everything else gets
531 22558333 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
532 22558333 : // requests, and any request other than those for particular blocks in relations.
533 22558333 : !is_rel_block_key(key)
534 22558333 : }
535 :
536 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
537 3 : fn murmurhash32(mut h: u32) -> u32 {
538 3 : h ^= h >> 16;
539 3 : h = h.wrapping_mul(0x85ebca6b);
540 3 : h ^= h >> 13;
541 3 : h = h.wrapping_mul(0xc2b2ae35);
542 3 : h ^= h >> 16;
543 3 : h
544 3 : }
545 :
546 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
547 2 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
548 2 : b = b.wrapping_add(0x9e3779b9);
549 2 : b = b.wrapping_add(a << 6);
550 2 : b = b.wrapping_add(a >> 2);
551 2 :
552 2 : a ^= b;
553 2 : a
554 2 : }
555 :
556 : /// Where a Key is to be distributed across shards, select the shard. This function
557 : /// does not account for keys that should be broadcast across shards.
558 : ///
559 : /// The hashing in this function must exactly match what we do in postgres smgr
560 : /// code. The resulting distribution of pages is intended to preserve locality within
561 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
562 : /// distributing data pseudo-randomly.
563 : ///
564 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
565 : /// and will be handled at higher levels when shards are split.
566 228409 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
567 228409 : // Fast path for un-sharded tenants or broadcast keys
568 228409 : if count < ShardCount(2) || key_is_shard0(key) {
569 228408 : return ShardNumber(0);
570 1 : }
571 1 :
572 1 : // relNode
573 1 : let mut hash = murmurhash32(key.field4);
574 1 : // blockNum/stripe size
575 1 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
576 1 :
577 1 : ShardNumber((hash % count.0 as u32) as u8)
578 228409 : }
579 :
580 : #[cfg(test)]
581 : mod tests {
582 : use std::str::FromStr;
583 :
584 : use bincode;
585 : use utils::{id::TenantId, Hex};
586 :
587 : use super::*;
588 :
589 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
590 :
591 1 : #[test]
592 1 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
593 1 : let example = TenantShardId {
594 1 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
595 1 : shard_count: ShardCount(10),
596 1 : shard_number: ShardNumber(7),
597 1 : };
598 1 :
599 1 : let encoded = format!("{example}");
600 1 :
601 1 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
602 1 : assert_eq!(&encoded, &expected);
603 :
604 1 : let decoded = TenantShardId::from_str(&encoded)?;
605 :
606 1 : assert_eq!(example, decoded);
607 :
608 1 : Ok(())
609 1 : }
610 :
611 1 : #[test]
612 1 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
613 1 : let example = TenantShardId {
614 1 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
615 1 : shard_count: ShardCount(10),
616 1 : shard_number: ShardNumber(7),
617 1 : };
618 1 :
619 1 : let encoded = bincode::serialize(&example).unwrap();
620 1 : let expected: [u8; 18] = [
621 1 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
622 1 : 0xf6, 0xfc, 0x07, 0x0a,
623 1 : ];
624 1 : assert_eq!(Hex(&encoded), Hex(&expected));
625 :
626 1 : let decoded = bincode::deserialize(&encoded).unwrap();
627 1 :
628 1 : assert_eq!(example, decoded);
629 :
630 1 : Ok(())
631 1 : }
632 :
633 1 : #[test]
634 1 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
635 1 : // Test that TenantShardId can decode a TenantId in human
636 1 : // readable form
637 1 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
638 1 : let encoded = format!("{example}");
639 1 :
640 1 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
641 :
642 1 : let decoded = TenantShardId::from_str(&encoded)?;
643 :
644 1 : assert_eq!(example, decoded.tenant_id);
645 1 : assert_eq!(decoded.shard_count, ShardCount(0));
646 1 : assert_eq!(decoded.shard_number, ShardNumber(0));
647 :
648 1 : Ok(())
649 1 : }
650 :
651 1 : #[test]
652 1 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
653 1 : // Test that a legacy TenantShardId encodes into a form that
654 1 : // can be decoded as TenantId
655 1 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
656 1 : let example = TenantShardId::unsharded(example_tenant_id);
657 1 : let encoded = format!("{example}");
658 1 :
659 1 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
660 :
661 1 : let decoded = TenantId::from_str(&encoded)?;
662 :
663 1 : assert_eq!(example_tenant_id, decoded);
664 :
665 1 : Ok(())
666 1 : }
667 :
668 1 : #[test]
669 1 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
670 1 : // Unlike in human readable encoding, binary encoding does not
671 1 : // do any special handling of legacy unsharded TenantIds: this test
672 1 : // is equivalent to the main test for binary encoding, just verifying
673 1 : // that the same behavior applies when we have used `unsharded()` to
674 1 : // construct a TenantShardId.
675 1 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
676 1 : let encoded = bincode::serialize(&example).unwrap();
677 1 :
678 1 : let expected: [u8; 18] = [
679 1 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
680 1 : 0xf6, 0xfc, 0x00, 0x00,
681 1 : ];
682 1 : assert_eq!(Hex(&encoded), Hex(&expected));
683 :
684 1 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
685 1 : assert_eq!(example, decoded);
686 :
687 1 : Ok(())
688 1 : }
689 :
690 1 : #[test]
691 1 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
692 1 : // Happy cases
693 1 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
694 1 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
695 1 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
696 :
697 1 : assert_eq!(
698 1 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
699 1 : Err(ShardConfigError::InvalidCount)
700 1 : );
701 1 : assert_eq!(
702 1 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
703 1 : Err(ShardConfigError::InvalidNumber)
704 1 : );
705 1 : assert_eq!(
706 1 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
707 1 : Err(ShardConfigError::InvalidNumber)
708 1 : );
709 1 : assert_eq!(
710 1 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
711 1 : Err(ShardConfigError::InvalidNumber)
712 1 : );
713 1 : assert_eq!(
714 1 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
715 1 : Err(ShardConfigError::InvalidStripeSize)
716 1 : );
717 :
718 1 : Ok(())
719 1 : }
720 :
721 1 : #[test]
722 1 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
723 1 : let example = ShardIndex {
724 1 : shard_number: ShardNumber(13),
725 1 : shard_count: ShardCount(17),
726 1 : };
727 1 : let expected: String = "0d11".to_string();
728 1 : let encoded = format!("{example}");
729 1 : assert_eq!(&encoded, &expected);
730 :
731 1 : let decoded = ShardIndex::from_str(&encoded)?;
732 1 : assert_eq!(example, decoded);
733 1 : Ok(())
734 1 : }
735 :
736 1 : #[test]
737 1 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
738 1 : let example = ShardIndex {
739 1 : shard_number: ShardNumber(13),
740 1 : shard_count: ShardCount(17),
741 1 : };
742 1 : let expected: [u8; 2] = [0x0d, 0x11];
743 1 :
744 1 : let encoded = bincode::serialize(&example).unwrap();
745 1 : assert_eq!(Hex(&encoded), Hex(&expected));
746 1 : let decoded = bincode::deserialize(&encoded).unwrap();
747 1 : assert_eq!(example, decoded);
748 :
749 1 : Ok(())
750 1 : }
751 :
752 : // These are only smoke tests to spot check that our implementation doesn't
753 : // deviate from a few examples values: not aiming to validate the overall
754 : // hashing algorithm.
755 1 : #[test]
756 1 : fn murmur_hash() {
757 1 : assert_eq!(murmurhash32(0), 0);
758 :
759 1 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
760 1 : }
761 :
762 1 : #[test]
763 1 : fn shard_mapping() {
764 1 : let key = Key {
765 1 : field1: 0x00,
766 1 : field2: 0x67f,
767 1 : field3: 0x5,
768 1 : field4: 0x400c,
769 1 : field5: 0x00,
770 1 : field6: 0x7d06,
771 1 : };
772 1 :
773 1 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
774 1 : assert_eq!(shard, ShardNumber(8));
775 1 : }
776 : }
|