Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::{
4 : key::{is_rel_block_key, Key},
5 : models::ShardParameters,
6 : };
7 : use hex::FromHex;
8 : use serde::{Deserialize, Serialize};
9 : use utils::id::TenantId;
10 :
11 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
12 : pub struct ShardNumber(pub u8);
13 :
14 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
15 : pub struct ShardCount(u8);
16 :
17 : impl ShardCount {
18 : pub const MAX: Self = Self(u8::MAX);
19 :
20 : /// The internal value of a ShardCount may be zero, which means "1 shard, but use
21 : /// legacy format for TenantShardId that excludes the shard suffix", also known
22 : /// as `TenantShardId::unsharded`.
23 : ///
24 : /// This method returns the actual number of shards, i.e. if our internal value is
25 : /// zero, we return 1 (unsharded tenants have 1 shard).
26 756 : pub fn count(&self) -> u8 {
27 756 : if self.0 > 0 {
28 10 : self.0
29 : } else {
30 746 : 1
31 : }
32 756 : }
33 :
34 : /// The literal internal value: this is **not** the number of shards in the
35 : /// tenant, as we have a special zero value for legacy unsharded tenants. Use
36 : /// [`Self::count`] if you want to know the cardinality of shards.
37 4 : pub fn literal(&self) -> u8 {
38 4 : self.0
39 4 : }
40 :
41 0 : pub fn is_unsharded(&self) -> bool {
42 0 : self.0 == 0
43 0 : }
44 :
45 : /// `v` may be zero, or the number of shards in the tenant. `v` is what
46 : /// [`Self::literal`] would return.
47 128 : pub fn new(val: u8) -> Self {
48 128 : Self(val)
49 128 : }
50 : }
51 :
52 : impl ShardNumber {
53 : pub const MAX: Self = Self(u8::MAX);
54 : }
55 :
56 : /// TenantShardId identify the units of work for the Pageserver.
57 : ///
58 : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
59 : ///
60 : /// # The second shard in a two-shard tenant
61 : /// 072f1291a5310026820b2fe4b2968934-0102
62 : ///
63 : /// Historically, tenants could not have multiple shards, and were identified
64 : /// by TenantId. To support this, TenantShardId has a special legacy
65 : /// mode where `shard_count` is equal to zero: this represents a single-sharded
66 : /// tenant which should be written as a TenantId with no suffix.
67 : ///
68 : /// The human-readable encoding of TenantShardId, such as used in API URLs,
69 : /// is both forward and backward compatible: a legacy TenantId can be
70 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
71 : /// as a TenantId.
72 : ///
73 : /// Note that the binary encoding is _not_ backward compatible, because
74 : /// at the time sharding is introduced, there are no existing binary structures
75 : /// containing TenantId that we need to handle.
76 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
77 : pub struct TenantShardId {
78 : pub tenant_id: TenantId,
79 : pub shard_number: ShardNumber,
80 : pub shard_count: ShardCount,
81 : }
82 :
83 : impl TenantShardId {
84 140 : pub fn unsharded(tenant_id: TenantId) -> Self {
85 140 : Self {
86 140 : tenant_id,
87 140 : shard_number: ShardNumber(0),
88 140 : shard_count: ShardCount(0),
89 140 : }
90 140 : }
91 :
92 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
93 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
94 0 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
95 0 : RangeInclusive::new(
96 0 : Self {
97 0 : tenant_id,
98 0 : shard_number: ShardNumber(0),
99 0 : shard_count: ShardCount(0),
100 0 : },
101 0 : Self {
102 0 : tenant_id,
103 0 : shard_number: ShardNumber::MAX,
104 0 : shard_count: ShardCount::MAX,
105 0 : },
106 0 : )
107 0 : }
108 :
109 7263 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
110 7263 : ShardSlug(self)
111 7263 : }
112 :
113 : /// Convenience for code that has special behavior on the 0th shard.
114 6 : pub fn is_zero(&self) -> bool {
115 6 : self.shard_number == ShardNumber(0)
116 6 : }
117 :
118 0 : pub fn is_unsharded(&self) -> bool {
119 0 : self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
120 0 : }
121 :
122 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
123 : /// is useful when logging from code that is already in a span that includes tenant ID, to
124 : /// keep messages reasonably terse.
125 0 : pub fn to_index(&self) -> ShardIndex {
126 0 : ShardIndex {
127 0 : shard_number: self.shard_number,
128 0 : shard_count: self.shard_count,
129 0 : }
130 0 : }
131 :
132 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
133 : /// the given number of shards.
134 8 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
135 8 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
136 8 : let mut child_shards = Vec::new();
137 32 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
138 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
139 : // so our child shards are the ones which the same keys would map to.
140 32 : if shard_number % effective_old_shard_count == self.shard_number.0 {
141 24 : child_shards.push(TenantShardId {
142 24 : tenant_id: self.tenant_id,
143 24 : shard_number: ShardNumber(shard_number),
144 24 : shard_count: new_shard_count,
145 24 : })
146 8 : }
147 : }
148 :
149 8 : child_shards
150 8 : }
151 : }
152 :
153 : /// Formatting helper
154 : struct ShardSlug<'a>(&'a TenantShardId);
155 :
156 : impl<'a> std::fmt::Display for ShardSlug<'a> {
157 7263 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 7263 : write!(
159 7263 : f,
160 7263 : "{:02x}{:02x}",
161 7263 : self.0.shard_number.0, self.0.shard_count.0
162 7263 : )
163 7263 : }
164 : }
165 :
166 : impl std::fmt::Display for TenantShardId {
167 5597 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 5597 : if self.shard_count != ShardCount(0) {
169 2 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
170 : } else {
171 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
172 : // is distinct from the normal single shard case (shard count == 1).
173 5595 : self.tenant_id.fmt(f)
174 : }
175 5597 : }
176 : }
177 :
178 : impl std::fmt::Debug for TenantShardId {
179 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 0 : // Debug is the same as Display: the compact hex representation
181 0 : write!(f, "{}", self)
182 0 : }
183 : }
184 :
185 : impl std::str::FromStr for TenantShardId {
186 : type Err = hex::FromHexError;
187 :
188 2385 : fn from_str(s: &str) -> Result<Self, Self::Err> {
189 2385 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
190 2385 : if s.len() == 32 {
191 : // Legacy case: no shard specified
192 : Ok(Self {
193 2383 : tenant_id: TenantId::from_str(s)?,
194 2383 : shard_number: ShardNumber(0),
195 2383 : shard_count: ShardCount(0),
196 : })
197 2 : } else if s.len() == 37 {
198 2 : let bytes = s.as_bytes();
199 2 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
200 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
201 2 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
202 2 : Ok(Self {
203 2 : tenant_id,
204 2 : shard_number: ShardNumber(shard_parts[0]),
205 2 : shard_count: ShardCount(shard_parts[1]),
206 2 : })
207 : } else {
208 0 : Err(hex::FromHexError::InvalidStringLength)
209 : }
210 2385 : }
211 : }
212 :
213 : impl From<[u8; 18]> for TenantShardId {
214 4 : fn from(b: [u8; 18]) -> Self {
215 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
216 4 :
217 4 : Self {
218 4 : tenant_id: TenantId::from(tenant_id_bytes),
219 4 : shard_number: ShardNumber(b[16]),
220 4 : shard_count: ShardCount(b[17]),
221 4 : }
222 4 : }
223 : }
224 :
225 : /// For use within the context of a particular tenant, when we need to know which
226 : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
227 : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
228 : /// TenantShardId.
229 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
230 : pub struct ShardIndex {
231 : pub shard_number: ShardNumber,
232 : pub shard_count: ShardCount,
233 : }
234 :
235 : impl ShardIndex {
236 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
237 0 : Self {
238 0 : shard_number: number,
239 0 : shard_count: count,
240 0 : }
241 0 : }
242 166 : pub fn unsharded() -> Self {
243 166 : Self {
244 166 : shard_number: ShardNumber(0),
245 166 : shard_count: ShardCount(0),
246 166 : }
247 166 : }
248 :
249 8608 : pub fn is_unsharded(&self) -> bool {
250 8608 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
251 8608 : }
252 :
253 : /// For use in constructing remote storage paths: concatenate this with a TenantId
254 : /// to get a fully qualified TenantShardId.
255 : ///
256 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
257 : /// that the legacy pre-sharding remote key format is preserved.
258 18 : pub fn get_suffix(&self) -> String {
259 18 : if self.is_unsharded() {
260 18 : "".to_string()
261 : } else {
262 0 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
263 : }
264 18 : }
265 : }
266 :
267 : impl std::fmt::Display for ShardIndex {
268 1398 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 1398 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
270 1398 : }
271 : }
272 :
273 : impl std::fmt::Debug for ShardIndex {
274 948 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 948 : // Debug is the same as Display: the compact hex representation
276 948 : write!(f, "{}", self)
277 948 : }
278 : }
279 :
280 : impl std::str::FromStr for ShardIndex {
281 : type Err = hex::FromHexError;
282 :
283 2 : fn from_str(s: &str) -> Result<Self, Self::Err> {
284 2 : // Expect format: 1 byte shard number, 1 byte shard count
285 2 : if s.len() == 4 {
286 2 : let bytes = s.as_bytes();
287 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
288 2 : hex::decode_to_slice(bytes, &mut shard_parts)?;
289 2 : Ok(Self {
290 2 : shard_number: ShardNumber(shard_parts[0]),
291 2 : shard_count: ShardCount(shard_parts[1]),
292 2 : })
293 : } else {
294 0 : Err(hex::FromHexError::InvalidStringLength)
295 : }
296 2 : }
297 : }
298 :
299 : impl From<[u8; 2]> for ShardIndex {
300 2 : fn from(b: [u8; 2]) -> Self {
301 2 : Self {
302 2 : shard_number: ShardNumber(b[0]),
303 2 : shard_count: ShardCount(b[1]),
304 2 : }
305 2 : }
306 : }
307 :
308 : impl Serialize for TenantShardId {
309 66 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
310 66 : where
311 66 : S: serde::Serializer,
312 66 : {
313 66 : if serializer.is_human_readable() {
314 58 : serializer.collect_str(self)
315 : } else {
316 8 : let mut packed: [u8; 18] = [0; 18];
317 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
318 8 : packed[16] = self.shard_number.0;
319 8 : packed[17] = self.shard_count.0;
320 8 :
321 8 : packed.serialize(serializer)
322 : }
323 66 : }
324 : }
325 :
326 : impl<'de> Deserialize<'de> for TenantShardId {
327 12 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
328 12 : where
329 12 : D: serde::Deserializer<'de>,
330 12 : {
331 12 : struct IdVisitor {
332 12 : is_human_readable_deserializer: bool,
333 12 : }
334 12 :
335 12 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
336 12 : type Value = TenantShardId;
337 12 :
338 12 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
339 0 : if self.is_human_readable_deserializer {
340 12 : formatter.write_str("value in form of hex string")
341 12 : } else {
342 12 : formatter.write_str("value in form of integer array([u8; 18])")
343 12 : }
344 12 : }
345 12 :
346 12 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
347 4 : where
348 4 : A: serde::de::SeqAccess<'de>,
349 4 : {
350 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
351 12 : let id: [u8; 18] = Deserialize::deserialize(s)?;
352 12 : Ok(TenantShardId::from(id))
353 12 : }
354 12 :
355 12 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
356 8 : where
357 8 : E: serde::de::Error,
358 8 : {
359 8 : TenantShardId::from_str(v).map_err(E::custom)
360 8 : }
361 12 : }
362 12 :
363 12 : if deserializer.is_human_readable() {
364 8 : deserializer.deserialize_str(IdVisitor {
365 8 : is_human_readable_deserializer: true,
366 8 : })
367 : } else {
368 4 : deserializer.deserialize_tuple(
369 4 : 18,
370 4 : IdVisitor {
371 4 : is_human_readable_deserializer: false,
372 4 : },
373 4 : )
374 : }
375 12 : }
376 : }
377 :
378 : /// Stripe size in number of pages
379 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
380 : pub struct ShardStripeSize(pub u32);
381 :
382 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
383 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
384 : pub struct ShardLayout(u8);
385 :
386 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
387 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
388 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
389 :
390 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
391 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
392 :
393 : /// The ShardIdentity contains the information needed for one member of map
394 : /// to resolve a key to a shard, and then check whether that shard is ==self.
395 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
396 : pub struct ShardIdentity {
397 : pub number: ShardNumber,
398 : pub count: ShardCount,
399 : pub stripe_size: ShardStripeSize,
400 : layout: ShardLayout,
401 : }
402 :
403 0 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
404 : pub enum ShardConfigError {
405 : #[error("Invalid shard count")]
406 : InvalidCount,
407 : #[error("Invalid shard number")]
408 : InvalidNumber,
409 : #[error("Invalid stripe size")]
410 : InvalidStripeSize,
411 : }
412 :
413 : impl ShardIdentity {
414 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
415 : /// tenants. Modern single-shard tenants should not use this: they should
416 : /// have number=0 count=1.
417 104 : pub fn unsharded() -> Self {
418 104 : Self {
419 104 : number: ShardNumber(0),
420 104 : count: ShardCount(0),
421 104 : layout: LAYOUT_V1,
422 104 : stripe_size: DEFAULT_STRIPE_SIZE,
423 104 : }
424 104 : }
425 :
426 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
427 : /// which are constructed in code paths that don't have access to proper configuration.
428 : ///
429 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
430 : /// Enforcement is via assertions, to avoid making our interface fallible for this
431 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
432 : /// state, and by extension to avoid trying to do any page->shard resolution.
433 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
434 0 : Self {
435 0 : number,
436 0 : count,
437 0 : layout: LAYOUT_BROKEN,
438 0 : stripe_size: DEFAULT_STRIPE_SIZE,
439 0 : }
440 0 : }
441 :
442 0 : pub fn is_unsharded(&self) -> bool {
443 0 : self.number == ShardNumber(0) && self.count == ShardCount(0)
444 0 : }
445 :
446 : /// Count must be nonzero, and number must be < count. To construct
447 : /// the legacy case (count==0), use Self::unsharded instead.
448 38 : pub fn new(
449 38 : number: ShardNumber,
450 38 : count: ShardCount,
451 38 : stripe_size: ShardStripeSize,
452 38 : ) -> Result<Self, ShardConfigError> {
453 38 : if count.0 == 0 {
454 2 : Err(ShardConfigError::InvalidCount)
455 36 : } else if number.0 > count.0 - 1 {
456 6 : Err(ShardConfigError::InvalidNumber)
457 30 : } else if stripe_size.0 == 0 {
458 2 : Err(ShardConfigError::InvalidStripeSize)
459 : } else {
460 28 : Ok(Self {
461 28 : number,
462 28 : count,
463 28 : layout: LAYOUT_V1,
464 28 : stripe_size,
465 28 : })
466 : }
467 38 : }
468 :
469 : /// For use when creating ShardIdentity instances for new shards, where a creation request
470 : /// specifies the ShardParameters that apply to all shards.
471 104 : pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
472 104 : Self {
473 104 : number,
474 104 : count: params.count,
475 104 : layout: LAYOUT_V1,
476 104 : stripe_size: params.stripe_size,
477 104 : }
478 104 : }
479 :
480 172470 : fn is_broken(&self) -> bool {
481 172470 : self.layout == LAYOUT_BROKEN
482 172470 : }
483 :
484 3004 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
485 3004 : assert!(!self.is_broken());
486 3004 : key_to_shard_number(self.count, self.stripe_size, key)
487 3004 : }
488 :
489 : /// Return true if the key should be ingested by this shard
490 169466 : pub fn is_key_local(&self, key: &Key) -> bool {
491 169466 : assert!(!self.is_broken());
492 169466 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
493 169466 : true
494 : } else {
495 0 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
496 : }
497 169466 : }
498 :
499 : /// Return true if the key should be discarded if found in this shard's
500 : /// data store, e.g. during compaction after a split
501 3626211 : pub fn is_key_disposable(&self, key: &Key) -> bool {
502 3626211 : if key_is_shard0(key) {
503 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
504 : // A1: because the WAL ingestion logic currently ingests some shard 0
505 : // content on all shards, even though it's only read on shard 0. If we
506 : // dropped it, then subsequent WAL ingest to these keys would encounter
507 : // an error.
508 : // A2: because key_is_shard0 also covers relation size keys, which are written
509 : // on all shards even though they're only maintained accurately on shard 0.
510 3607827 : false
511 : } else {
512 18384 : !self.is_key_local(key)
513 : }
514 3626211 : }
515 :
516 0 : pub fn shard_slug(&self) -> String {
517 0 : if self.count > ShardCount(0) {
518 0 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
519 : } else {
520 0 : String::new()
521 : }
522 0 : }
523 :
524 : /// Convenience for checking if this identity is the 0th shard in a tenant,
525 : /// for special cases on shard 0 such as ingesting relation sizes.
526 0 : pub fn is_zero(&self) -> bool {
527 0 : self.number == ShardNumber(0)
528 0 : }
529 : }
530 :
531 : impl Serialize for ShardIndex {
532 4 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
533 4 : where
534 4 : S: serde::Serializer,
535 4 : {
536 4 : if serializer.is_human_readable() {
537 0 : serializer.collect_str(self)
538 : } else {
539 : // Binary encoding is not used in index_part.json, but is included in anticipation of
540 : // switching various structures (e.g. inter-process communication, remote metadata) to more
541 : // compact binary encodings in future.
542 4 : let mut packed: [u8; 2] = [0; 2];
543 4 : packed[0] = self.shard_number.0;
544 4 : packed[1] = self.shard_count.0;
545 4 : packed.serialize(serializer)
546 : }
547 4 : }
548 : }
549 :
550 : impl<'de> Deserialize<'de> for ShardIndex {
551 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
552 2 : where
553 2 : D: serde::Deserializer<'de>,
554 2 : {
555 2 : struct IdVisitor {
556 2 : is_human_readable_deserializer: bool,
557 2 : }
558 2 :
559 2 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
560 2 : type Value = ShardIndex;
561 2 :
562 2 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
563 0 : if self.is_human_readable_deserializer {
564 2 : formatter.write_str("value in form of hex string")
565 2 : } else {
566 2 : formatter.write_str("value in form of integer array([u8; 2])")
567 2 : }
568 2 : }
569 2 :
570 2 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
571 2 : where
572 2 : A: serde::de::SeqAccess<'de>,
573 2 : {
574 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
575 2 : let id: [u8; 2] = Deserialize::deserialize(s)?;
576 2 : Ok(ShardIndex::from(id))
577 2 : }
578 2 :
579 2 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
580 0 : where
581 0 : E: serde::de::Error,
582 0 : {
583 0 : ShardIndex::from_str(v).map_err(E::custom)
584 0 : }
585 2 : }
586 2 :
587 2 : if deserializer.is_human_readable() {
588 0 : deserializer.deserialize_str(IdVisitor {
589 0 : is_human_readable_deserializer: true,
590 0 : })
591 : } else {
592 2 : deserializer.deserialize_tuple(
593 2 : 2,
594 2 : IdVisitor {
595 2 : is_human_readable_deserializer: false,
596 2 : },
597 2 : )
598 : }
599 2 : }
600 : }
601 :
602 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
603 : /// in order to be able to serve basebackup requests without peer communication).
604 3626213 : fn key_is_shard0(key: &Key) -> bool {
605 3626213 : // To decide what to shard out to shards >0, we apply a simple rule that only
606 3626213 : // relation pages are distributed to shards other than shard zero. Everything else gets
607 3626213 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
608 3626213 : // requests, and any request other than those for particular blocks in relations.
609 3626213 : !is_rel_block_key(key)
610 3626213 : }
611 :
612 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
613 6 : fn murmurhash32(mut h: u32) -> u32 {
614 6 : h ^= h >> 16;
615 6 : h = h.wrapping_mul(0x85ebca6b);
616 6 : h ^= h >> 13;
617 6 : h = h.wrapping_mul(0xc2b2ae35);
618 6 : h ^= h >> 16;
619 6 : h
620 6 : }
621 :
622 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
623 4 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
624 4 : b = b.wrapping_add(0x9e3779b9);
625 4 : b = b.wrapping_add(a << 6);
626 4 : b = b.wrapping_add(a >> 2);
627 4 :
628 4 : a ^= b;
629 4 : a
630 4 : }
631 :
632 : /// Where a Key is to be distributed across shards, select the shard. This function
633 : /// does not account for keys that should be broadcast across shards.
634 : ///
635 : /// The hashing in this function must exactly match what we do in postgres smgr
636 : /// code. The resulting distribution of pages is intended to preserve locality within
637 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
638 : /// distributing data pseudo-randomly.
639 : ///
640 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
641 : /// and will be handled at higher levels when shards are split.
642 3006 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
643 3006 : // Fast path for un-sharded tenants or broadcast keys
644 3006 : if count < ShardCount(2) || key_is_shard0(key) {
645 3004 : return ShardNumber(0);
646 2 : }
647 2 :
648 2 : // relNode
649 2 : let mut hash = murmurhash32(key.field4);
650 2 : // blockNum/stripe size
651 2 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
652 2 :
653 2 : ShardNumber((hash % count.0 as u32) as u8)
654 3006 : }
655 :
656 : #[cfg(test)]
657 : mod tests {
658 : use utils::Hex;
659 :
660 : use super::*;
661 :
662 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
663 :
664 : #[test]
665 2 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
666 2 : let example = TenantShardId {
667 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
668 2 : shard_count: ShardCount(10),
669 2 : shard_number: ShardNumber(7),
670 2 : };
671 2 :
672 2 : let encoded = format!("{example}");
673 2 :
674 2 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
675 2 : assert_eq!(&encoded, &expected);
676 :
677 2 : let decoded = TenantShardId::from_str(&encoded)?;
678 :
679 2 : assert_eq!(example, decoded);
680 :
681 2 : Ok(())
682 2 : }
683 :
684 : #[test]
685 2 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
686 2 : let example = TenantShardId {
687 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
688 2 : shard_count: ShardCount(10),
689 2 : shard_number: ShardNumber(7),
690 2 : };
691 2 :
692 2 : let encoded = bincode::serialize(&example).unwrap();
693 2 : let expected: [u8; 18] = [
694 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
695 2 : 0xf6, 0xfc, 0x07, 0x0a,
696 2 : ];
697 2 : assert_eq!(Hex(&encoded), Hex(&expected));
698 :
699 2 : let decoded = bincode::deserialize(&encoded).unwrap();
700 2 :
701 2 : assert_eq!(example, decoded);
702 :
703 2 : Ok(())
704 2 : }
705 :
706 : #[test]
707 2 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
708 2 : // Test that TenantShardId can decode a TenantId in human
709 2 : // readable form
710 2 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
711 2 : let encoded = format!("{example}");
712 2 :
713 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
714 :
715 2 : let decoded = TenantShardId::from_str(&encoded)?;
716 :
717 2 : assert_eq!(example, decoded.tenant_id);
718 2 : assert_eq!(decoded.shard_count, ShardCount(0));
719 2 : assert_eq!(decoded.shard_number, ShardNumber(0));
720 :
721 2 : Ok(())
722 2 : }
723 :
724 : #[test]
725 2 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
726 2 : // Test that a legacy TenantShardId encodes into a form that
727 2 : // can be decoded as TenantId
728 2 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
729 2 : let example = TenantShardId::unsharded(example_tenant_id);
730 2 : let encoded = format!("{example}");
731 2 :
732 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
733 :
734 2 : let decoded = TenantId::from_str(&encoded)?;
735 :
736 2 : assert_eq!(example_tenant_id, decoded);
737 :
738 2 : Ok(())
739 2 : }
740 :
741 : #[test]
742 2 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
743 2 : // Unlike in human readable encoding, binary encoding does not
744 2 : // do any special handling of legacy unsharded TenantIds: this test
745 2 : // is equivalent to the main test for binary encoding, just verifying
746 2 : // that the same behavior applies when we have used `unsharded()` to
747 2 : // construct a TenantShardId.
748 2 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
749 2 : let encoded = bincode::serialize(&example).unwrap();
750 2 :
751 2 : let expected: [u8; 18] = [
752 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
753 2 : 0xf6, 0xfc, 0x00, 0x00,
754 2 : ];
755 2 : assert_eq!(Hex(&encoded), Hex(&expected));
756 :
757 2 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
758 2 : assert_eq!(example, decoded);
759 :
760 2 : Ok(())
761 2 : }
762 :
763 : #[test]
764 2 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
765 2 : // Happy cases
766 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
767 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
768 2 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
769 :
770 2 : assert_eq!(
771 2 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
772 2 : Err(ShardConfigError::InvalidCount)
773 2 : );
774 2 : assert_eq!(
775 2 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
776 2 : Err(ShardConfigError::InvalidNumber)
777 2 : );
778 2 : assert_eq!(
779 2 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
780 2 : Err(ShardConfigError::InvalidNumber)
781 2 : );
782 2 : assert_eq!(
783 2 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
784 2 : Err(ShardConfigError::InvalidNumber)
785 2 : );
786 2 : assert_eq!(
787 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
788 2 : Err(ShardConfigError::InvalidStripeSize)
789 2 : );
790 :
791 2 : Ok(())
792 2 : }
793 :
794 : #[test]
795 2 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
796 2 : let example = ShardIndex {
797 2 : shard_number: ShardNumber(13),
798 2 : shard_count: ShardCount(17),
799 2 : };
800 2 : let expected: String = "0d11".to_string();
801 2 : let encoded = format!("{example}");
802 2 : assert_eq!(&encoded, &expected);
803 :
804 2 : let decoded = ShardIndex::from_str(&encoded)?;
805 2 : assert_eq!(example, decoded);
806 2 : Ok(())
807 2 : }
808 :
809 : #[test]
810 2 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
811 2 : let example = ShardIndex {
812 2 : shard_number: ShardNumber(13),
813 2 : shard_count: ShardCount(17),
814 2 : };
815 2 : let expected: [u8; 2] = [0x0d, 0x11];
816 2 :
817 2 : let encoded = bincode::serialize(&example).unwrap();
818 2 : assert_eq!(Hex(&encoded), Hex(&expected));
819 2 : let decoded = bincode::deserialize(&encoded).unwrap();
820 2 : assert_eq!(example, decoded);
821 :
822 2 : Ok(())
823 2 : }
824 :
825 : // These are only smoke tests to spot check that our implementation doesn't
826 : // deviate from a few examples values: not aiming to validate the overall
827 : // hashing algorithm.
828 : #[test]
829 2 : fn murmur_hash() {
830 2 : assert_eq!(murmurhash32(0), 0);
831 :
832 2 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
833 2 : }
834 :
835 : #[test]
836 2 : fn shard_mapping() {
837 2 : let key = Key {
838 2 : field1: 0x00,
839 2 : field2: 0x67f,
840 2 : field3: 0x5,
841 2 : field4: 0x400c,
842 2 : field5: 0x00,
843 2 : field6: 0x7d06,
844 2 : };
845 2 :
846 2 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
847 2 : assert_eq!(shard, ShardNumber(8));
848 2 : }
849 :
850 : #[test]
851 2 : fn shard_id_split() {
852 2 : let tenant_id = TenantId::generate();
853 2 : let parent = TenantShardId::unsharded(tenant_id);
854 2 :
855 2 : // Unsharded into 2
856 2 : assert_eq!(
857 2 : parent.split(ShardCount(2)),
858 2 : vec![
859 2 : TenantShardId {
860 2 : tenant_id,
861 2 : shard_count: ShardCount(2),
862 2 : shard_number: ShardNumber(0)
863 2 : },
864 2 : TenantShardId {
865 2 : tenant_id,
866 2 : shard_count: ShardCount(2),
867 2 : shard_number: ShardNumber(1)
868 2 : }
869 2 : ]
870 2 : );
871 :
872 : // Unsharded into 4
873 2 : assert_eq!(
874 2 : parent.split(ShardCount(4)),
875 2 : vec![
876 2 : TenantShardId {
877 2 : tenant_id,
878 2 : shard_count: ShardCount(4),
879 2 : shard_number: ShardNumber(0)
880 2 : },
881 2 : TenantShardId {
882 2 : tenant_id,
883 2 : shard_count: ShardCount(4),
884 2 : shard_number: ShardNumber(1)
885 2 : },
886 2 : TenantShardId {
887 2 : tenant_id,
888 2 : shard_count: ShardCount(4),
889 2 : shard_number: ShardNumber(2)
890 2 : },
891 2 : TenantShardId {
892 2 : tenant_id,
893 2 : shard_count: ShardCount(4),
894 2 : shard_number: ShardNumber(3)
895 2 : }
896 2 : ]
897 2 : );
898 :
899 : // count=1 into 2 (check this works the same as unsharded.)
900 2 : let parent = TenantShardId {
901 2 : tenant_id,
902 2 : shard_count: ShardCount(1),
903 2 : shard_number: ShardNumber(0),
904 2 : };
905 2 : assert_eq!(
906 2 : parent.split(ShardCount(2)),
907 2 : vec![
908 2 : TenantShardId {
909 2 : tenant_id,
910 2 : shard_count: ShardCount(2),
911 2 : shard_number: ShardNumber(0)
912 2 : },
913 2 : TenantShardId {
914 2 : tenant_id,
915 2 : shard_count: ShardCount(2),
916 2 : shard_number: ShardNumber(1)
917 2 : }
918 2 : ]
919 2 : );
920 :
921 : // count=2 into count=8
922 2 : let parent = TenantShardId {
923 2 : tenant_id,
924 2 : shard_count: ShardCount(2),
925 2 : shard_number: ShardNumber(1),
926 2 : };
927 2 : assert_eq!(
928 2 : parent.split(ShardCount(8)),
929 2 : vec![
930 2 : TenantShardId {
931 2 : tenant_id,
932 2 : shard_count: ShardCount(8),
933 2 : shard_number: ShardNumber(1)
934 2 : },
935 2 : TenantShardId {
936 2 : tenant_id,
937 2 : shard_count: ShardCount(8),
938 2 : shard_number: ShardNumber(3)
939 2 : },
940 2 : TenantShardId {
941 2 : tenant_id,
942 2 : shard_count: ShardCount(8),
943 2 : shard_number: ShardNumber(5)
944 2 : },
945 2 : TenantShardId {
946 2 : tenant_id,
947 2 : shard_count: ShardCount(8),
948 2 : shard_number: ShardNumber(7)
949 2 : },
950 2 : ]
951 2 : );
952 2 : }
953 : }
|