Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::{
4 : key::{is_rel_block_key, Key},
5 : models::ShardParameters,
6 : };
7 : use hex::FromHex;
8 : use serde::{Deserialize, Serialize};
9 : use thiserror;
10 : use utils::id::TenantId;
11 :
12 43834135 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
13 : pub struct ShardNumber(pub u8);
14 :
15 112095629 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
16 : pub struct ShardCount(pub u8);
17 :
18 : impl ShardCount {
19 : pub const MAX: Self = Self(u8::MAX);
20 : }
21 :
22 : impl ShardNumber {
23 : pub const MAX: Self = Self(u8::MAX);
24 : }
25 :
26 : /// TenantShardId identify the units of work for the Pageserver.
27 : ///
28 : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
29 : ///
30 : /// # The second shard in a two-shard tenant
31 : /// 072f1291a5310026820b2fe4b2968934-0102
32 : ///
33 : /// Historically, tenants could not have multiple shards, and were identified
34 : /// by TenantId. To support this, TenantShardId has a special legacy
35 : /// mode where `shard_count` is equal to zero: this represents a single-sharded
36 : /// tenant which should be written as a TenantId with no suffix.
37 : ///
38 : /// The human-readable encoding of TenantShardId, such as used in API URLs,
39 : /// is both forward and backward compatible: a legacy TenantId can be
40 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
41 : /// as a TenantId.
42 : ///
43 : /// Note that the binary encoding is _not_ backward compatible, because
44 : /// at the time sharding is introduced, there are no existing binary structures
45 : /// containing TenantId that we need to handle.
46 14370456 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
47 : pub struct TenantShardId {
48 : pub tenant_id: TenantId,
49 : pub shard_number: ShardNumber,
50 : pub shard_count: ShardCount,
51 : }
52 :
53 : impl TenantShardId {
54 692 : pub fn unsharded(tenant_id: TenantId) -> Self {
55 692 : Self {
56 692 : tenant_id,
57 692 : shard_number: ShardNumber(0),
58 692 : shard_count: ShardCount(0),
59 692 : }
60 692 : }
61 :
62 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
63 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
64 23339 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
65 23339 : RangeInclusive::new(
66 23339 : Self {
67 23339 : tenant_id,
68 23339 : shard_number: ShardNumber(0),
69 23339 : shard_count: ShardCount(0),
70 23339 : },
71 23339 : Self {
72 23339 : tenant_id,
73 23339 : shard_number: ShardNumber::MAX,
74 23339 : shard_count: ShardCount::MAX,
75 23339 : },
76 23339 : )
77 23339 : }
78 :
79 7056301 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
80 7056301 : ShardSlug(self)
81 7056301 : }
82 :
83 : /// Convenience for code that has special behavior on the 0th shard.
84 1510131 : pub fn is_zero(&self) -> bool {
85 1510131 : self.shard_number == ShardNumber(0)
86 1510131 : }
87 :
88 1 : pub fn is_unsharded(&self) -> bool {
89 1 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
90 1 : }
91 :
92 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
93 : /// is useful when logging from code that is already in a span that includes tenant ID, to
94 : /// keep messages reasonably terse.
95 9872 : pub fn to_index(&self) -> ShardIndex {
96 9872 : ShardIndex {
97 9872 : shard_number: self.shard_number,
98 9872 : shard_count: self.shard_count,
99 9872 : }
100 9872 : }
101 :
102 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
103 : /// the given number of shards.
104 18 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
105 18 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
106 18 : let mut child_shards = Vec::new();
107 100 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
108 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
109 : // so our child shards are the ones which the same keys would map to.
110 100 : if shard_number % effective_old_shard_count == self.shard_number.0 {
111 44 : child_shards.push(TenantShardId {
112 44 : tenant_id: self.tenant_id,
113 44 : shard_number: ShardNumber(shard_number),
114 44 : shard_count: new_shard_count,
115 44 : })
116 56 : }
117 : }
118 :
119 18 : child_shards
120 18 : }
121 : }
122 :
123 : /// Formatting helper
124 : struct ShardSlug<'a>(&'a TenantShardId);
125 :
126 : impl<'a> std::fmt::Display for ShardSlug<'a> {
127 7056301 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 7056301 : write!(
129 7056301 : f,
130 7056301 : "{:02x}{:02x}",
131 7056301 : self.0.shard_number.0, self.0.shard_count.0
132 7056301 : )
133 7056301 : }
134 : }
135 :
136 : impl std::fmt::Display for TenantShardId {
137 179205 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 179205 : if self.shard_count != ShardCount(0) {
139 4221 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
140 : } else {
141 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
142 : // is distinct from the normal single shard case (shard count == 1).
143 174984 : self.tenant_id.fmt(f)
144 : }
145 179205 : }
146 : }
147 :
148 : impl std::fmt::Debug for TenantShardId {
149 1049 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 1049 : // Debug is the same as Display: the compact hex representation
151 1049 : write!(f, "{}", self)
152 1049 : }
153 : }
154 :
155 : impl std::str::FromStr for TenantShardId {
156 : type Err = hex::FromHexError;
157 :
158 82462 : fn from_str(s: &str) -> Result<Self, Self::Err> {
159 82462 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
160 82462 : if s.len() == 32 {
161 : // Legacy case: no shard specified
162 : Ok(Self {
163 79626 : tenant_id: TenantId::from_str(s)?,
164 79626 : shard_number: ShardNumber(0),
165 79626 : shard_count: ShardCount(0),
166 : })
167 2836 : } else if s.len() == 37 {
168 2836 : let bytes = s.as_bytes();
169 2836 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
170 2836 : let mut shard_parts: [u8; 2] = [0u8; 2];
171 2836 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
172 2836 : Ok(Self {
173 2836 : tenant_id,
174 2836 : shard_number: ShardNumber(shard_parts[0]),
175 2836 : shard_count: ShardCount(shard_parts[1]),
176 2836 : })
177 : } else {
178 0 : Err(hex::FromHexError::InvalidStringLength)
179 : }
180 82462 : }
181 : }
182 :
183 : impl From<[u8; 18]> for TenantShardId {
184 4 : fn from(b: [u8; 18]) -> Self {
185 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
186 4 :
187 4 : Self {
188 4 : tenant_id: TenantId::from(tenant_id_bytes),
189 4 : shard_number: ShardNumber(b[16]),
190 4 : shard_count: ShardCount(b[17]),
191 4 : }
192 4 : }
193 : }
194 :
195 : /// For use within the context of a particular tenant, when we need to know which
196 : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
197 : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
198 : /// TenantShardId.
199 679147 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
200 : pub struct ShardIndex {
201 : pub shard_number: ShardNumber,
202 : pub shard_count: ShardCount,
203 : }
204 :
205 : impl ShardIndex {
206 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
207 0 : Self {
208 0 : shard_number: number,
209 0 : shard_count: count,
210 0 : }
211 0 : }
212 56092 : pub fn unsharded() -> Self {
213 56092 : Self {
214 56092 : shard_number: ShardNumber(0),
215 56092 : shard_count: ShardCount(0),
216 56092 : }
217 56092 : }
218 :
219 1222599 : pub fn is_unsharded(&self) -> bool {
220 1222599 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
221 1222599 : }
222 :
223 : /// For use in constructing remote storage paths: concatenate this with a TenantId
224 : /// to get a fully qualified TenantShardId.
225 : ///
226 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
227 : /// that the legacy pre-sharding remote key format is preserved.
228 19493 : pub fn get_suffix(&self) -> String {
229 19493 : if self.is_unsharded() {
230 19429 : "".to_string()
231 : } else {
232 64 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
233 : }
234 19493 : }
235 : }
236 :
237 : impl std::fmt::Display for ShardIndex {
238 28261 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 28261 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
240 28261 : }
241 : }
242 :
243 : impl std::fmt::Debug for ShardIndex {
244 22792 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 22792 : // Debug is the same as Display: the compact hex representation
246 22792 : write!(f, "{}", self)
247 22792 : }
248 : }
249 :
250 : impl std::str::FromStr for ShardIndex {
251 : type Err = hex::FromHexError;
252 :
253 106 : fn from_str(s: &str) -> Result<Self, Self::Err> {
254 106 : // Expect format: 1 byte shard number, 1 byte shard count
255 106 : if s.len() == 4 {
256 106 : let bytes = s.as_bytes();
257 106 : let mut shard_parts: [u8; 2] = [0u8; 2];
258 106 : hex::decode_to_slice(bytes, &mut shard_parts)?;
259 106 : Ok(Self {
260 106 : shard_number: ShardNumber(shard_parts[0]),
261 106 : shard_count: ShardCount(shard_parts[1]),
262 106 : })
263 : } else {
264 0 : Err(hex::FromHexError::InvalidStringLength)
265 : }
266 106 : }
267 : }
268 :
269 : impl From<[u8; 2]> for ShardIndex {
270 2 : fn from(b: [u8; 2]) -> Self {
271 2 : Self {
272 2 : shard_number: ShardNumber(b[0]),
273 2 : shard_count: ShardCount(b[1]),
274 2 : }
275 2 : }
276 : }
277 :
278 : impl Serialize for TenantShardId {
279 9751 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
280 9751 : where
281 9751 : S: serde::Serializer,
282 9751 : {
283 9751 : if serializer.is_human_readable() {
284 9743 : serializer.collect_str(self)
285 : } else {
286 8 : let mut packed: [u8; 18] = [0; 18];
287 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
288 8 : packed[16] = self.shard_number.0;
289 8 : packed[17] = self.shard_count.0;
290 8 :
291 8 : packed.serialize(serializer)
292 : }
293 9751 : }
294 : }
295 :
296 : impl<'de> Deserialize<'de> for TenantShardId {
297 5612 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
298 5612 : where
299 5612 : D: serde::Deserializer<'de>,
300 5612 : {
301 5612 : struct IdVisitor {
302 5612 : is_human_readable_deserializer: bool,
303 5612 : }
304 5612 :
305 5612 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
306 5612 : type Value = TenantShardId;
307 5612 :
308 5612 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
309 0 : if self.is_human_readable_deserializer {
310 5612 : formatter.write_str("value in form of hex string")
311 5612 : } else {
312 5612 : formatter.write_str("value in form of integer array([u8; 18])")
313 5612 : }
314 5612 : }
315 5612 :
316 5612 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
317 4 : where
318 4 : A: serde::de::SeqAccess<'de>,
319 4 : {
320 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
321 5612 : let id: [u8; 18] = Deserialize::deserialize(s)?;
322 5612 : Ok(TenantShardId::from(id))
323 5612 : }
324 5612 :
325 5612 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
326 5608 : where
327 5608 : E: serde::de::Error,
328 5608 : {
329 5608 : TenantShardId::from_str(v).map_err(E::custom)
330 5608 : }
331 5612 : }
332 5612 :
333 5612 : if deserializer.is_human_readable() {
334 5608 : deserializer.deserialize_str(IdVisitor {
335 5608 : is_human_readable_deserializer: true,
336 5608 : })
337 : } else {
338 4 : deserializer.deserialize_tuple(
339 4 : 18,
340 4 : IdVisitor {
341 4 : is_human_readable_deserializer: false,
342 4 : },
343 4 : )
344 : }
345 5612 : }
346 : }
347 :
348 : /// Stripe size in number of pages
349 940 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
350 : pub struct ShardStripeSize(pub u32);
351 :
352 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
353 88482680 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
354 : pub struct ShardLayout(u8);
355 :
356 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
357 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
358 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
359 :
360 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
361 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
362 :
363 : /// The ShardIdentity contains the information needed for one member of map
364 : /// to resolve a key to a shard, and then check whether that shard is ==self.
365 108 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
366 : pub struct ShardIdentity {
367 : pub number: ShardNumber,
368 : pub count: ShardCount,
369 : pub stripe_size: ShardStripeSize,
370 : layout: ShardLayout,
371 : }
372 :
373 10 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
374 : pub enum ShardConfigError {
375 : #[error("Invalid shard count")]
376 : InvalidCount,
377 : #[error("Invalid shard number")]
378 : InvalidNumber,
379 : #[error("Invalid stripe size")]
380 : InvalidStripeSize,
381 : }
382 :
383 : impl ShardIdentity {
384 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
385 : /// tenants. Modern single-shard tenants should not use this: they should
386 : /// have number=0 count=1.
387 904 : pub fn unsharded() -> Self {
388 904 : Self {
389 904 : number: ShardNumber(0),
390 904 : count: ShardCount(0),
391 904 : layout: LAYOUT_V1,
392 904 : stripe_size: DEFAULT_STRIPE_SIZE,
393 904 : }
394 904 : }
395 :
396 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
397 : /// which are constructed in code paths that don't have access to proper configuration.
398 : ///
399 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
400 : /// Enforcement is via assertions, to avoid making our interface fallible for this
401 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
402 : /// state, and by extension to avoid trying to do any page->shard resolution.
403 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
404 0 : Self {
405 0 : number,
406 0 : count,
407 0 : layout: LAYOUT_BROKEN,
408 0 : stripe_size: DEFAULT_STRIPE_SIZE,
409 0 : }
410 0 : }
411 :
412 1966 : pub fn is_unsharded(&self) -> bool {
413 1966 : self.number == ShardNumber(0) && self.count == ShardCount(0)
414 1966 : }
415 :
416 : /// Count must be nonzero, and number must be < count. To construct
417 : /// the legacy case (count==0), use Self::unsharded instead.
418 82 : pub fn new(
419 82 : number: ShardNumber,
420 82 : count: ShardCount,
421 82 : stripe_size: ShardStripeSize,
422 82 : ) -> Result<Self, ShardConfigError> {
423 82 : if count.0 == 0 {
424 2 : Err(ShardConfigError::InvalidCount)
425 80 : } else if number.0 > count.0 - 1 {
426 6 : Err(ShardConfigError::InvalidNumber)
427 74 : } else if stripe_size.0 == 0 {
428 2 : Err(ShardConfigError::InvalidStripeSize)
429 : } else {
430 72 : Ok(Self {
431 72 : number,
432 72 : count,
433 72 : layout: LAYOUT_V1,
434 72 : stripe_size,
435 72 : })
436 : }
437 82 : }
438 :
439 : /// For use when creating ShardIdentity instances for new shards, where a creation request
440 : /// specifies the ShardParameters that apply to all shards.
441 777 : pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
442 777 : Self {
443 777 : number,
444 777 : count: params.count,
445 777 : layout: LAYOUT_V1,
446 777 : stripe_size: params.stripe_size,
447 777 : }
448 777 : }
449 :
450 88482680 : fn is_broken(&self) -> bool {
451 88482680 : self.layout == LAYOUT_BROKEN
452 88482680 : }
453 :
454 811936 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
455 811936 : assert!(!self.is_broken());
456 811936 : key_to_shard_number(self.count, self.stripe_size, key)
457 811936 : }
458 :
459 : /// Return true if the key should be ingested by this shard
460 87670744 : pub fn is_key_local(&self, key: &Key) -> bool {
461 87670744 : assert!(!self.is_broken());
462 87670744 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
463 68547946 : true
464 : } else {
465 19122798 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
466 : }
467 87670744 : }
468 :
469 : /// Return true if the key should be discarded if found in this shard's
470 : /// data store, e.g. during compaction after a split
471 25250858 : pub fn is_key_disposable(&self, key: &Key) -> bool {
472 25250858 : if key_is_shard0(key) {
473 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
474 : // A: because the WAL ingestion logic currently ingests some shard 0
475 : // content on all shards, even though it's only read on shard 0. If we
476 : // dropped it, then subsequent WAL ingest to these keys would encounter
477 : // an error.
478 4821049 : false
479 : } else {
480 20429809 : !self.is_key_local(key)
481 : }
482 25250858 : }
483 :
484 0 : pub fn shard_slug(&self) -> String {
485 0 : if self.count > ShardCount(0) {
486 0 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
487 : } else {
488 0 : String::new()
489 : }
490 0 : }
491 :
492 : /// Convenience for checking if this identity is the 0th shard in a tenant,
493 : /// for special cases on shard 0 such as ingesting relation sizes.
494 13903908 : pub fn is_zero(&self) -> bool {
495 13903908 : self.number == ShardNumber(0)
496 13903908 : }
497 : }
498 :
499 : impl Serialize for ShardIndex {
500 248 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
501 248 : where
502 248 : S: serde::Serializer,
503 248 : {
504 248 : if serializer.is_human_readable() {
505 244 : serializer.collect_str(self)
506 : } else {
507 : // Binary encoding is not used in index_part.json, but is included in anticipation of
508 : // switching various structures (e.g. inter-process communication, remote metadata) to more
509 : // compact binary encodings in future.
510 4 : let mut packed: [u8; 2] = [0; 2];
511 4 : packed[0] = self.shard_number.0;
512 4 : packed[1] = self.shard_count.0;
513 4 : packed.serialize(serializer)
514 : }
515 248 : }
516 : }
517 :
518 : impl<'de> Deserialize<'de> for ShardIndex {
519 106 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
520 106 : where
521 106 : D: serde::Deserializer<'de>,
522 106 : {
523 106 : struct IdVisitor {
524 106 : is_human_readable_deserializer: bool,
525 106 : }
526 106 :
527 106 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
528 106 : type Value = ShardIndex;
529 106 :
530 106 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
531 0 : if self.is_human_readable_deserializer {
532 106 : formatter.write_str("value in form of hex string")
533 106 : } else {
534 106 : formatter.write_str("value in form of integer array([u8; 2])")
535 106 : }
536 106 : }
537 106 :
538 106 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
539 2 : where
540 2 : A: serde::de::SeqAccess<'de>,
541 2 : {
542 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
543 106 : let id: [u8; 2] = Deserialize::deserialize(s)?;
544 106 : Ok(ShardIndex::from(id))
545 106 : }
546 106 :
547 106 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
548 104 : where
549 104 : E: serde::de::Error,
550 104 : {
551 104 : ShardIndex::from_str(v).map_err(E::custom)
552 104 : }
553 106 : }
554 106 :
555 106 : if deserializer.is_human_readable() {
556 104 : deserializer.deserialize_str(IdVisitor {
557 104 : is_human_readable_deserializer: true,
558 104 : })
559 : } else {
560 2 : deserializer.deserialize_tuple(
561 2 : 2,
562 2 : IdVisitor {
563 2 : is_human_readable_deserializer: false,
564 2 : },
565 2 : )
566 : }
567 106 : }
568 : }
569 :
570 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
571 : /// in order to be able to serve basebackup requests without peer communication).
572 64070513 : fn key_is_shard0(key: &Key) -> bool {
573 64070513 : // To decide what to shard out to shards >0, we apply a simple rule that only
574 64070513 : // relation pages are distributed to shards other than shard zero. Everything else gets
575 64070513 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
576 64070513 : // requests, and any request other than those for particular blocks in relations.
577 64070513 : !is_rel_block_key(key)
578 64070513 : }
579 :
580 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
581 39393716 : fn murmurhash32(mut h: u32) -> u32 {
582 39393716 : h ^= h >> 16;
583 39393716 : h = h.wrapping_mul(0x85ebca6b);
584 39393716 : h ^= h >> 13;
585 39393716 : h = h.wrapping_mul(0xc2b2ae35);
586 39393716 : h ^= h >> 16;
587 39393716 : h
588 39393716 : }
589 :
590 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
591 19696859 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
592 19696859 : b = b.wrapping_add(0x9e3779b9);
593 19696859 : b = b.wrapping_add(a << 6);
594 19696859 : b = b.wrapping_add(a >> 2);
595 19696859 :
596 19696859 : a ^= b;
597 19696859 : a
598 19696859 : }
599 :
600 : /// Where a Key is to be distributed across shards, select the shard. This function
601 : /// does not account for keys that should be broadcast across shards.
602 : ///
603 : /// The hashing in this function must exactly match what we do in postgres smgr
604 : /// code. The resulting distribution of pages is intended to preserve locality within
605 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
606 : /// distributing data pseudo-randomly.
607 : ///
608 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
609 : /// and will be handled at higher levels when shards are split.
610 19934736 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
611 19934736 : // Fast path for un-sharded tenants or broadcast keys
612 19934736 : if count < ShardCount(2) || key_is_shard0(key) {
613 237879 : return ShardNumber(0);
614 19696857 : }
615 19696857 :
616 19696857 : // relNode
617 19696857 : let mut hash = murmurhash32(key.field4);
618 19696857 : // blockNum/stripe size
619 19696857 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
620 19696857 :
621 19696857 : ShardNumber((hash % count.0 as u32) as u8)
622 19934736 : }
623 :
624 : #[cfg(test)]
625 : mod tests {
626 : use std::str::FromStr;
627 :
628 : use bincode;
629 : use utils::{id::TenantId, Hex};
630 :
631 : use super::*;
632 :
633 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
634 :
635 2 : #[test]
636 2 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
637 2 : let example = TenantShardId {
638 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
639 2 : shard_count: ShardCount(10),
640 2 : shard_number: ShardNumber(7),
641 2 : };
642 2 :
643 2 : let encoded = format!("{example}");
644 2 :
645 2 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
646 2 : assert_eq!(&encoded, &expected);
647 :
648 2 : let decoded = TenantShardId::from_str(&encoded)?;
649 :
650 2 : assert_eq!(example, decoded);
651 :
652 2 : Ok(())
653 2 : }
654 :
655 2 : #[test]
656 2 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
657 2 : let example = TenantShardId {
658 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
659 2 : shard_count: ShardCount(10),
660 2 : shard_number: ShardNumber(7),
661 2 : };
662 2 :
663 2 : let encoded = bincode::serialize(&example).unwrap();
664 2 : let expected: [u8; 18] = [
665 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
666 2 : 0xf6, 0xfc, 0x07, 0x0a,
667 2 : ];
668 2 : assert_eq!(Hex(&encoded), Hex(&expected));
669 :
670 2 : let decoded = bincode::deserialize(&encoded).unwrap();
671 2 :
672 2 : assert_eq!(example, decoded);
673 :
674 2 : Ok(())
675 2 : }
676 :
677 2 : #[test]
678 2 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
679 2 : // Test that TenantShardId can decode a TenantId in human
680 2 : // readable form
681 2 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
682 2 : let encoded = format!("{example}");
683 2 :
684 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
685 :
686 2 : let decoded = TenantShardId::from_str(&encoded)?;
687 :
688 2 : assert_eq!(example, decoded.tenant_id);
689 2 : assert_eq!(decoded.shard_count, ShardCount(0));
690 2 : assert_eq!(decoded.shard_number, ShardNumber(0));
691 :
692 2 : Ok(())
693 2 : }
694 :
695 2 : #[test]
696 2 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
697 2 : // Test that a legacy TenantShardId encodes into a form that
698 2 : // can be decoded as TenantId
699 2 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
700 2 : let example = TenantShardId::unsharded(example_tenant_id);
701 2 : let encoded = format!("{example}");
702 2 :
703 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
704 :
705 2 : let decoded = TenantId::from_str(&encoded)?;
706 :
707 2 : assert_eq!(example_tenant_id, decoded);
708 :
709 2 : Ok(())
710 2 : }
711 :
712 2 : #[test]
713 2 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
714 2 : // Unlike in human readable encoding, binary encoding does not
715 2 : // do any special handling of legacy unsharded TenantIds: this test
716 2 : // is equivalent to the main test for binary encoding, just verifying
717 2 : // that the same behavior applies when we have used `unsharded()` to
718 2 : // construct a TenantShardId.
719 2 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
720 2 : let encoded = bincode::serialize(&example).unwrap();
721 2 :
722 2 : let expected: [u8; 18] = [
723 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
724 2 : 0xf6, 0xfc, 0x00, 0x00,
725 2 : ];
726 2 : assert_eq!(Hex(&encoded), Hex(&expected));
727 :
728 2 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
729 2 : assert_eq!(example, decoded);
730 :
731 2 : Ok(())
732 2 : }
733 :
734 2 : #[test]
735 2 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
736 2 : // Happy cases
737 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
738 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
739 2 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
740 :
741 2 : assert_eq!(
742 2 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
743 2 : Err(ShardConfigError::InvalidCount)
744 2 : );
745 2 : assert_eq!(
746 2 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
747 2 : Err(ShardConfigError::InvalidNumber)
748 2 : );
749 2 : assert_eq!(
750 2 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
751 2 : Err(ShardConfigError::InvalidNumber)
752 2 : );
753 2 : assert_eq!(
754 2 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
755 2 : Err(ShardConfigError::InvalidNumber)
756 2 : );
757 2 : assert_eq!(
758 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
759 2 : Err(ShardConfigError::InvalidStripeSize)
760 2 : );
761 :
762 2 : Ok(())
763 2 : }
764 :
765 2 : #[test]
766 2 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
767 2 : let example = ShardIndex {
768 2 : shard_number: ShardNumber(13),
769 2 : shard_count: ShardCount(17),
770 2 : };
771 2 : let expected: String = "0d11".to_string();
772 2 : let encoded = format!("{example}");
773 2 : assert_eq!(&encoded, &expected);
774 :
775 2 : let decoded = ShardIndex::from_str(&encoded)?;
776 2 : assert_eq!(example, decoded);
777 2 : Ok(())
778 2 : }
779 :
780 2 : #[test]
781 2 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
782 2 : let example = ShardIndex {
783 2 : shard_number: ShardNumber(13),
784 2 : shard_count: ShardCount(17),
785 2 : };
786 2 : let expected: [u8; 2] = [0x0d, 0x11];
787 2 :
788 2 : let encoded = bincode::serialize(&example).unwrap();
789 2 : assert_eq!(Hex(&encoded), Hex(&expected));
790 2 : let decoded = bincode::deserialize(&encoded).unwrap();
791 2 : assert_eq!(example, decoded);
792 :
793 2 : Ok(())
794 2 : }
795 :
796 : // These are only smoke tests to spot check that our implementation doesn't
797 : // deviate from a few examples values: not aiming to validate the overall
798 : // hashing algorithm.
799 2 : #[test]
800 2 : fn murmur_hash() {
801 2 : assert_eq!(murmurhash32(0), 0);
802 :
803 2 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
804 2 : }
805 :
806 2 : #[test]
807 2 : fn shard_mapping() {
808 2 : let key = Key {
809 2 : field1: 0x00,
810 2 : field2: 0x67f,
811 2 : field3: 0x5,
812 2 : field4: 0x400c,
813 2 : field5: 0x00,
814 2 : field6: 0x7d06,
815 2 : };
816 2 :
817 2 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
818 2 : assert_eq!(shard, ShardNumber(8));
819 2 : }
820 :
821 2 : #[test]
822 2 : fn shard_id_split() {
823 2 : let tenant_id = TenantId::generate();
824 2 : let parent = TenantShardId::unsharded(tenant_id);
825 2 :
826 2 : // Unsharded into 2
827 2 : assert_eq!(
828 2 : parent.split(ShardCount(2)),
829 2 : vec![
830 2 : TenantShardId {
831 2 : tenant_id,
832 2 : shard_count: ShardCount(2),
833 2 : shard_number: ShardNumber(0)
834 2 : },
835 2 : TenantShardId {
836 2 : tenant_id,
837 2 : shard_count: ShardCount(2),
838 2 : shard_number: ShardNumber(1)
839 2 : }
840 2 : ]
841 2 : );
842 :
843 : // Unsharded into 4
844 2 : assert_eq!(
845 2 : parent.split(ShardCount(4)),
846 2 : vec![
847 2 : TenantShardId {
848 2 : tenant_id,
849 2 : shard_count: ShardCount(4),
850 2 : shard_number: ShardNumber(0)
851 2 : },
852 2 : TenantShardId {
853 2 : tenant_id,
854 2 : shard_count: ShardCount(4),
855 2 : shard_number: ShardNumber(1)
856 2 : },
857 2 : TenantShardId {
858 2 : tenant_id,
859 2 : shard_count: ShardCount(4),
860 2 : shard_number: ShardNumber(2)
861 2 : },
862 2 : TenantShardId {
863 2 : tenant_id,
864 2 : shard_count: ShardCount(4),
865 2 : shard_number: ShardNumber(3)
866 2 : }
867 2 : ]
868 2 : );
869 :
870 : // count=1 into 2 (check this works the same as unsharded.)
871 2 : let parent = TenantShardId {
872 2 : tenant_id,
873 2 : shard_count: ShardCount(1),
874 2 : shard_number: ShardNumber(0),
875 2 : };
876 2 : assert_eq!(
877 2 : parent.split(ShardCount(2)),
878 2 : vec![
879 2 : TenantShardId {
880 2 : tenant_id,
881 2 : shard_count: ShardCount(2),
882 2 : shard_number: ShardNumber(0)
883 2 : },
884 2 : TenantShardId {
885 2 : tenant_id,
886 2 : shard_count: ShardCount(2),
887 2 : shard_number: ShardNumber(1)
888 2 : }
889 2 : ]
890 2 : );
891 :
892 : // count=2 into count=8
893 2 : let parent = TenantShardId {
894 2 : tenant_id,
895 2 : shard_count: ShardCount(2),
896 2 : shard_number: ShardNumber(1),
897 2 : };
898 2 : assert_eq!(
899 2 : parent.split(ShardCount(8)),
900 2 : vec![
901 2 : TenantShardId {
902 2 : tenant_id,
903 2 : shard_count: ShardCount(8),
904 2 : shard_number: ShardNumber(1)
905 2 : },
906 2 : TenantShardId {
907 2 : tenant_id,
908 2 : shard_count: ShardCount(8),
909 2 : shard_number: ShardNumber(3)
910 2 : },
911 2 : TenantShardId {
912 2 : tenant_id,
913 2 : shard_count: ShardCount(8),
914 2 : shard_number: ShardNumber(5)
915 2 : },
916 2 : TenantShardId {
917 2 : tenant_id,
918 2 : shard_count: ShardCount(8),
919 2 : shard_number: ShardNumber(7)
920 2 : },
921 2 : ]
922 2 : );
923 2 : }
924 : }
|