LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - shard.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 95.5 % 512 489
Test Date: 2024-02-07 07:37:29 Functions: 47.8 % 230 110

            Line data    Source code
       1              : use std::{ops::RangeInclusive, str::FromStr};
       2              : 
       3              : use crate::{
       4              :     key::{is_rel_block_key, Key},
       5              :     models::ShardParameters,
       6              : };
       7              : use hex::FromHex;
       8              : use serde::{Deserialize, Serialize};
       9              : use thiserror;
      10              : use utils::id::TenantId;
      11              : 
      12     44290764 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      13              : pub struct ShardNumber(pub u8);
      14              : 
      15    114246878 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      16              : pub struct ShardCount(pub u8);
      17              : 
      18              : impl ShardCount {
      19              :     pub const MAX: Self = Self(u8::MAX);
      20              : }
      21              : 
      22              : impl ShardNumber {
      23              :     pub const MAX: Self = Self(u8::MAX);
      24              : }
      25              : 
      26              : /// TenantShardId identify the units of work for the Pageserver.
      27              : ///
      28              : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
      29              : ///
      30              : ///   # The second shard in a two-shard tenant
      31              : ///   072f1291a5310026820b2fe4b2968934-0102
      32              : ///
      33              : /// Historically, tenants could not have multiple shards, and were identified
      34              : /// by TenantId.  To support this, TenantShardId has a special legacy
      35              : /// mode where `shard_count` is equal to zero: this represents a single-sharded
      36              : /// tenant which should be written as a TenantId with no suffix.
      37              : ///
      38              : /// The human-readable encoding of TenantShardId, such as used in API URLs,
      39              : /// is both forward and backward compatible: a legacy TenantId can be
      40              : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      41              : /// as a TenantId.
      42              : ///
      43              : /// Note that the binary encoding is _not_ backward compatible, because
      44              : /// at the time sharding is introduced, there are no existing binary structures
      45              : /// containing TenantId that we need to handle.
      46     17098954 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      47              : pub struct TenantShardId {
      48              :     pub tenant_id: TenantId,
      49              :     pub shard_number: ShardNumber,
      50              :     pub shard_count: ShardCount,
      51              : }
      52              : 
      53              : impl TenantShardId {
      54          690 :     pub fn unsharded(tenant_id: TenantId) -> Self {
      55          690 :         Self {
      56          690 :             tenant_id,
      57          690 :             shard_number: ShardNumber(0),
      58          690 :             shard_count: ShardCount(0),
      59          690 :         }
      60          690 :     }
      61              : 
      62              :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
      63              :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
      64        23602 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
      65        23602 :         RangeInclusive::new(
      66        23602 :             Self {
      67        23602 :                 tenant_id,
      68        23602 :                 shard_number: ShardNumber(0),
      69        23602 :                 shard_count: ShardCount(0),
      70        23602 :             },
      71        23602 :             Self {
      72        23602 :                 tenant_id,
      73        23602 :                 shard_number: ShardNumber::MAX,
      74        23602 :                 shard_count: ShardCount::MAX,
      75        23602 :             },
      76        23602 :         )
      77        23602 :     }
      78              : 
      79      8554581 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
      80      8554581 :         ShardSlug(self)
      81      8554581 :     }
      82              : 
      83              :     /// Convenience for code that has special behavior on the 0th shard.
      84      1508636 :     pub fn is_zero(&self) -> bool {
      85      1508636 :         self.shard_number == ShardNumber(0)
      86      1508636 :     }
      87              : 
      88            1 :     pub fn is_unsharded(&self) -> bool {
      89            1 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
      90            1 :     }
      91         9985 :     pub fn to_index(&self) -> ShardIndex {
      92         9985 :         ShardIndex {
      93         9985 :             shard_number: self.shard_number,
      94         9985 :             shard_count: self.shard_count,
      95         9985 :         }
      96         9985 :     }
      97              : }
      98              : 
      99              : /// Formatting helper
     100              : struct ShardSlug<'a>(&'a TenantShardId);
     101              : 
     102              : impl<'a> std::fmt::Display for ShardSlug<'a> {
     103      8554581 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     104      8554581 :         write!(
     105      8554581 :             f,
     106      8554581 :             "{:02x}{:02x}",
     107      8554581 :             self.0.shard_number.0, self.0.shard_count.0
     108      8554581 :         )
     109      8554581 :     }
     110              : }
     111              : 
     112              : impl std::fmt::Display for TenantShardId {
     113       180873 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     114       180873 :         if self.shard_count != ShardCount(0) {
     115         2489 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     116              :         } else {
     117              :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     118              :             // is distinct from the normal single shard case (shard count == 1).
     119       178384 :             self.tenant_id.fmt(f)
     120              :         }
     121       180873 :     }
     122              : }
     123              : 
     124              : impl std::fmt::Debug for TenantShardId {
     125          892 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     126          892 :         // Debug is the same as Display: the compact hex representation
     127          892 :         write!(f, "{}", self)
     128          892 :     }
     129              : }
     130              : 
     131              : impl std::str::FromStr for TenantShardId {
     132              :     type Err = hex::FromHexError;
     133              : 
     134        82608 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     135        82608 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     136        82608 :         if s.len() == 32 {
     137              :             // Legacy case: no shard specified
     138              :             Ok(Self {
     139        80368 :                 tenant_id: TenantId::from_str(s)?,
     140        80368 :                 shard_number: ShardNumber(0),
     141        80368 :                 shard_count: ShardCount(0),
     142              :             })
     143         2240 :         } else if s.len() == 37 {
     144         2240 :             let bytes = s.as_bytes();
     145         2240 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     146         2240 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     147         2240 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     148         2240 :             Ok(Self {
     149         2240 :                 tenant_id,
     150         2240 :                 shard_number: ShardNumber(shard_parts[0]),
     151         2240 :                 shard_count: ShardCount(shard_parts[1]),
     152         2240 :             })
     153              :         } else {
     154            0 :             Err(hex::FromHexError::InvalidStringLength)
     155              :         }
     156        82608 :     }
     157              : }
     158              : 
     159              : impl From<[u8; 18]> for TenantShardId {
     160            4 :     fn from(b: [u8; 18]) -> Self {
     161            4 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     162            4 : 
     163            4 :         Self {
     164            4 :             tenant_id: TenantId::from(tenant_id_bytes),
     165            4 :             shard_number: ShardNumber(b[16]),
     166            4 :             shard_count: ShardCount(b[17]),
     167            4 :         }
     168            4 :     }
     169              : }
     170              : 
     171              : /// For use within the context of a particular tenant, when we need to know which
     172              : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
     173              : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
     174              : /// TenantShardId.
     175       703686 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
     176              : pub struct ShardIndex {
     177              :     pub shard_number: ShardNumber,
     178              :     pub shard_count: ShardCount,
     179              : }
     180              : 
     181              : impl ShardIndex {
     182            0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     183            0 :         Self {
     184            0 :             shard_number: number,
     185            0 :             shard_count: count,
     186            0 :         }
     187            0 :     }
     188        59834 :     pub fn unsharded() -> Self {
     189        59834 :         Self {
     190        59834 :             shard_number: ShardNumber(0),
     191        59834 :             shard_count: ShardCount(0),
     192        59834 :         }
     193        59834 :     }
     194              : 
     195      1259277 :     pub fn is_unsharded(&self) -> bool {
     196      1259277 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     197      1259277 :     }
     198              : 
     199              :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     200              :     /// to get a fully qualified TenantShardId.
     201              :     ///
     202              :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     203              :     /// that the legacy pre-sharding remote key format is preserved.
     204        19331 :     pub fn get_suffix(&self) -> String {
     205        19331 :         if self.is_unsharded() {
     206        19314 :             "".to_string()
     207              :         } else {
     208           17 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     209              :         }
     210        19331 :     }
     211              : }
     212              : 
     213              : impl std::fmt::Display for ShardIndex {
     214        28102 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     215        28102 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     216        28102 :     }
     217              : }
     218              : 
     219              : impl std::fmt::Debug for ShardIndex {
     220        22625 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     221        22625 :         // Debug is the same as Display: the compact hex representation
     222        22625 :         write!(f, "{}", self)
     223        22625 :     }
     224              : }
     225              : 
     226              : impl std::str::FromStr for ShardIndex {
     227              :     type Err = hex::FromHexError;
     228              : 
     229            6 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     230            6 :         // Expect format: 1 byte shard number, 1 byte shard count
     231            6 :         if s.len() == 4 {
     232            6 :             let bytes = s.as_bytes();
     233            6 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     234            6 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     235            6 :             Ok(Self {
     236            6 :                 shard_number: ShardNumber(shard_parts[0]),
     237            6 :                 shard_count: ShardCount(shard_parts[1]),
     238            6 :             })
     239              :         } else {
     240            0 :             Err(hex::FromHexError::InvalidStringLength)
     241              :         }
     242            6 :     }
     243              : }
     244              : 
     245              : impl From<[u8; 2]> for ShardIndex {
     246            2 :     fn from(b: [u8; 2]) -> Self {
     247            2 :         Self {
     248            2 :             shard_number: ShardNumber(b[0]),
     249            2 :             shard_count: ShardCount(b[1]),
     250            2 :         }
     251            2 :     }
     252              : }
     253              : 
     254              : impl Serialize for TenantShardId {
     255         9484 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     256         9484 :     where
     257         9484 :         S: serde::Serializer,
     258         9484 :     {
     259         9484 :         if serializer.is_human_readable() {
     260         9476 :             serializer.collect_str(self)
     261              :         } else {
     262            8 :             let mut packed: [u8; 18] = [0; 18];
     263            8 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     264            8 :             packed[16] = self.shard_number.0;
     265            8 :             packed[17] = self.shard_count.0;
     266            8 : 
     267            8 :             packed.serialize(serializer)
     268              :         }
     269         9484 :     }
     270              : }
     271              : 
     272              : impl<'de> Deserialize<'de> for TenantShardId {
     273         5519 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     274         5519 :     where
     275         5519 :         D: serde::Deserializer<'de>,
     276         5519 :     {
     277         5519 :         struct IdVisitor {
     278         5519 :             is_human_readable_deserializer: bool,
     279         5519 :         }
     280         5519 : 
     281         5519 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     282         5519 :             type Value = TenantShardId;
     283         5519 : 
     284         5519 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     285            0 :                 if self.is_human_readable_deserializer {
     286         5519 :                     formatter.write_str("value in form of hex string")
     287         5519 :                 } else {
     288         5519 :                     formatter.write_str("value in form of integer array([u8; 18])")
     289         5519 :                 }
     290         5519 :             }
     291         5519 : 
     292         5519 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     293            4 :             where
     294            4 :                 A: serde::de::SeqAccess<'de>,
     295            4 :             {
     296            4 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     297         5519 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     298         5519 :                 Ok(TenantShardId::from(id))
     299         5519 :             }
     300         5519 : 
     301         5519 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     302         5515 :             where
     303         5515 :                 E: serde::de::Error,
     304         5515 :             {
     305         5515 :                 TenantShardId::from_str(v).map_err(E::custom)
     306         5515 :             }
     307         5519 :         }
     308         5519 : 
     309         5519 :         if deserializer.is_human_readable() {
     310         5515 :             deserializer.deserialize_str(IdVisitor {
     311         5515 :                 is_human_readable_deserializer: true,
     312         5515 :             })
     313              :         } else {
     314            4 :             deserializer.deserialize_tuple(
     315            4 :                 18,
     316            4 :                 IdVisitor {
     317            4 :                     is_human_readable_deserializer: false,
     318            4 :                 },
     319            4 :             )
     320              :         }
     321         5519 :     }
     322              : }
     323              : 
     324              : /// Stripe size in number of pages
     325          883 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     326              : pub struct ShardStripeSize(pub u32);
     327              : 
     328              : /// Layout version: for future upgrades where we might change how the key->shard mapping works
     329     89659452 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     330              : pub struct ShardLayout(u8);
     331              : 
     332              : const LAYOUT_V1: ShardLayout = ShardLayout(1);
     333              : /// ShardIdentity uses a magic layout value to indicate if it is unusable
     334              : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
     335              : 
     336              : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
     337              : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
     338              : 
     339              : /// The ShardIdentity contains the information needed for one member of map
     340              : /// to resolve a key to a shard, and then check whether that shard is ==self.
     341           44 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     342              : pub struct ShardIdentity {
     343              :     pub number: ShardNumber,
     344              :     pub count: ShardCount,
     345              :     pub stripe_size: ShardStripeSize,
     346              :     layout: ShardLayout,
     347              : }
     348              : 
     349           10 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
     350              : pub enum ShardConfigError {
     351              :     #[error("Invalid shard count")]
     352              :     InvalidCount,
     353              :     #[error("Invalid shard number")]
     354              :     InvalidNumber,
     355              :     #[error("Invalid stripe size")]
     356              :     InvalidStripeSize,
     357              : }
     358              : 
     359              : impl ShardIdentity {
     360              :     /// An identity with number=0 count=0 is a "none" identity, which represents legacy
     361              :     /// tenants.  Modern single-shard tenants should not use this: they should
     362              :     /// have number=0 count=1.
     363          906 :     pub fn unsharded() -> Self {
     364          906 :         Self {
     365          906 :             number: ShardNumber(0),
     366          906 :             count: ShardCount(0),
     367          906 :             layout: LAYOUT_V1,
     368          906 :             stripe_size: DEFAULT_STRIPE_SIZE,
     369          906 :         }
     370          906 :     }
     371              : 
     372              :     /// A broken instance of this type is only used for `TenantState::Broken` tenants,
     373              :     /// which are constructed in code paths that don't have access to proper configuration.
     374              :     ///
     375              :     /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
     376              :     /// Enforcement is via assertions, to avoid making our interface fallible for this
     377              :     /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
     378              :     /// state, and by extension to avoid trying to do any page->shard resolution.
     379            0 :     pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
     380            0 :         Self {
     381            0 :             number,
     382            0 :             count,
     383            0 :             layout: LAYOUT_BROKEN,
     384            0 :             stripe_size: DEFAULT_STRIPE_SIZE,
     385            0 :         }
     386            0 :     }
     387              : 
     388         1890 :     pub fn is_unsharded(&self) -> bool {
     389         1890 :         self.number == ShardNumber(0) && self.count == ShardCount(0)
     390         1890 :     }
     391              : 
     392              :     /// Count must be nonzero, and number must be < count. To construct
     393              :     /// the legacy case (count==0), use Self::unsharded instead.
     394           60 :     pub fn new(
     395           60 :         number: ShardNumber,
     396           60 :         count: ShardCount,
     397           60 :         stripe_size: ShardStripeSize,
     398           60 :     ) -> Result<Self, ShardConfigError> {
     399           60 :         if count.0 == 0 {
     400            2 :             Err(ShardConfigError::InvalidCount)
     401           58 :         } else if number.0 > count.0 - 1 {
     402            6 :             Err(ShardConfigError::InvalidNumber)
     403           52 :         } else if stripe_size.0 == 0 {
     404            2 :             Err(ShardConfigError::InvalidStripeSize)
     405              :         } else {
     406           50 :             Ok(Self {
     407           50 :                 number,
     408           50 :                 count,
     409           50 :                 layout: LAYOUT_V1,
     410           50 :                 stripe_size,
     411           50 :             })
     412              :         }
     413           60 :     }
     414              : 
     415              :     /// For use when creating ShardIdentity instances for new shards, where a creation request
     416              :     /// specifies the ShardParameters that apply to all shards.
     417          759 :     pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
     418          759 :         Self {
     419          759 :             number,
     420          759 :             count: params.count,
     421          759 :             layout: LAYOUT_V1,
     422          759 :             stripe_size: params.stripe_size,
     423          759 :         }
     424          759 :     }
     425              : 
     426     89659452 :     fn is_broken(&self) -> bool {
     427     89659452 :         self.layout == LAYOUT_BROKEN
     428     89659452 :     }
     429              : 
     430       783934 :     pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
     431       783934 :         assert!(!self.is_broken());
     432       783934 :         key_to_shard_number(self.count, self.stripe_size, key)
     433       783934 :     }
     434              : 
     435              :     /// Return true if the key should be ingested by this shard
     436     88875518 :     pub fn is_key_local(&self, key: &Key) -> bool {
     437     88875518 :         assert!(!self.is_broken());
     438     88875518 :         if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
     439     69823070 :             true
     440              :         } else {
     441     19052448 :             key_to_shard_number(self.count, self.stripe_size, key) == self.number
     442              :         }
     443     88875518 :     }
     444              : 
     445              :     /// Return true if the key should be discarded if found in this shard's
     446              :     /// data store, e.g. during compaction after a split
     447     26590132 :     pub fn is_key_disposable(&self, key: &Key) -> bool {
     448     26590132 :         if key_is_shard0(key) {
     449              :             // Q: Why can't we dispose of shard0 content if we're not shard 0?
     450              :             // A: because the WAL ingestion logic currently ingests some shard 0
     451              :             //    content on all shards, even though it's only read on shard 0.  If we
     452              :             //    dropped it, then subsequent WAL ingest to these keys would encounter
     453              :             //    an error.
     454      4955851 :             false
     455              :         } else {
     456     21634281 :             !self.is_key_local(key)
     457              :         }
     458     26590132 :     }
     459              : 
     460            0 :     pub fn shard_slug(&self) -> String {
     461            0 :         if self.count > ShardCount(0) {
     462            0 :             format!("-{:02x}{:02x}", self.number.0, self.count.0)
     463              :         } else {
     464            0 :             String::new()
     465              :         }
     466            0 :     }
     467              : 
     468              :     /// Convenience for checking if this identity is the 0th shard in a tenant,
     469              :     /// for special cases on shard 0 such as ingesting relation sizes.
     470     13873379 :     pub fn is_zero(&self) -> bool {
     471     13873379 :         self.number == ShardNumber(0)
     472     13873379 :     }
     473              : }
     474              : 
     475              : impl Serialize for ShardIndex {
     476           80 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     477           80 :     where
     478           80 :         S: serde::Serializer,
     479           80 :     {
     480           80 :         if serializer.is_human_readable() {
     481           76 :             serializer.collect_str(self)
     482              :         } else {
     483              :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     484              :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     485              :             // compact binary encodings in future.
     486            4 :             let mut packed: [u8; 2] = [0; 2];
     487            4 :             packed[0] = self.shard_number.0;
     488            4 :             packed[1] = self.shard_count.0;
     489            4 :             packed.serialize(serializer)
     490              :         }
     491           80 :     }
     492              : }
     493              : 
     494              : impl<'de> Deserialize<'de> for ShardIndex {
     495            6 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     496            6 :     where
     497            6 :         D: serde::Deserializer<'de>,
     498            6 :     {
     499            6 :         struct IdVisitor {
     500            6 :             is_human_readable_deserializer: bool,
     501            6 :         }
     502            6 : 
     503            6 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     504            6 :             type Value = ShardIndex;
     505            6 : 
     506            6 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     507            0 :                 if self.is_human_readable_deserializer {
     508            6 :                     formatter.write_str("value in form of hex string")
     509            6 :                 } else {
     510            6 :                     formatter.write_str("value in form of integer array([u8; 2])")
     511            6 :                 }
     512            6 :             }
     513            6 : 
     514            6 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     515            2 :             where
     516            2 :                 A: serde::de::SeqAccess<'de>,
     517            2 :             {
     518            2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     519            6 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     520            6 :                 Ok(ShardIndex::from(id))
     521            6 :             }
     522            6 : 
     523            6 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     524            4 :             where
     525            4 :                 E: serde::de::Error,
     526            4 :             {
     527            4 :                 ShardIndex::from_str(v).map_err(E::custom)
     528            4 :             }
     529            6 :         }
     530            6 : 
     531            6 :         if deserializer.is_human_readable() {
     532            4 :             deserializer.deserialize_str(IdVisitor {
     533            4 :                 is_human_readable_deserializer: true,
     534            4 :             })
     535              :         } else {
     536            2 :             deserializer.deserialize_tuple(
     537            2 :                 2,
     538            2 :                 IdVisitor {
     539            2 :                     is_human_readable_deserializer: false,
     540            2 :                 },
     541            2 :             )
     542              :         }
     543            6 :     }
     544              : }
     545              : 
     546              : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
     547              : /// in order to be able to serve basebackup requests without peer communication).
     548     65237179 : fn key_is_shard0(key: &Key) -> bool {
     549     65237179 :     // To decide what to shard out to shards >0, we apply a simple rule that only
     550     65237179 :     // relation pages are distributed to shards other than shard zero. Everything else gets
     551     65237179 :     // stored on shard 0.  This guarantees that shard 0 can independently serve basebackup
     552     65237179 :     // requests, and any request other than those for particular blocks in relations.
     553     65237179 :     !is_rel_block_key(key)
     554     65237179 : }
     555              : 
     556              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     557     39189200 : fn murmurhash32(mut h: u32) -> u32 {
     558     39189200 :     h ^= h >> 16;
     559     39189200 :     h = h.wrapping_mul(0x85ebca6b);
     560     39189200 :     h ^= h >> 13;
     561     39189200 :     h = h.wrapping_mul(0xc2b2ae35);
     562     39189200 :     h ^= h >> 16;
     563     39189200 :     h
     564     39189200 : }
     565              : 
     566              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     567     19594601 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
     568     19594601 :     b = b.wrapping_add(0x9e3779b9);
     569     19594601 :     b = b.wrapping_add(a << 6);
     570     19594601 :     b = b.wrapping_add(a >> 2);
     571     19594601 : 
     572     19594601 :     a ^= b;
     573     19594601 :     a
     574     19594601 : }
     575              : 
     576              : /// Where a Key is to be distributed across shards, select the shard.  This function
     577              : /// does not account for keys that should be broadcast across shards.
     578              : ///
     579              : /// The hashing in this function must exactly match what we do in postgres smgr
     580              : /// code.  The resulting distribution of pages is intended to preserve locality within
     581              : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
     582              : /// distributing data pseudo-randomly.
     583              : ///
     584              : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
     585              : /// and will be handled at higher levels when shards are split.
     586     19836384 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
     587     19836384 :     // Fast path for un-sharded tenants or broadcast keys
     588     19836384 :     if count < ShardCount(2) || key_is_shard0(key) {
     589       241785 :         return ShardNumber(0);
     590     19594599 :     }
     591     19594599 : 
     592     19594599 :     // relNode
     593     19594599 :     let mut hash = murmurhash32(key.field4);
     594     19594599 :     // blockNum/stripe size
     595     19594599 :     hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
     596     19594599 : 
     597     19594599 :     ShardNumber((hash % count.0 as u32) as u8)
     598     19836384 : }
     599              : 
     600              : #[cfg(test)]
     601              : mod tests {
     602              :     use std::str::FromStr;
     603              : 
     604              :     use bincode;
     605              :     use utils::{id::TenantId, Hex};
     606              : 
     607              :     use super::*;
     608              : 
     609              :     const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
     610              : 
     611            2 :     #[test]
     612            2 :     fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
     613            2 :         let example = TenantShardId {
     614            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     615            2 :             shard_count: ShardCount(10),
     616            2 :             shard_number: ShardNumber(7),
     617            2 :         };
     618            2 : 
     619            2 :         let encoded = format!("{example}");
     620            2 : 
     621            2 :         let expected = format!("{EXAMPLE_TENANT_ID}-070a");
     622            2 :         assert_eq!(&encoded, &expected);
     623              : 
     624            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     625              : 
     626            2 :         assert_eq!(example, decoded);
     627              : 
     628            2 :         Ok(())
     629            2 :     }
     630              : 
     631            2 :     #[test]
     632            2 :     fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
     633            2 :         let example = TenantShardId {
     634            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     635            2 :             shard_count: ShardCount(10),
     636            2 :             shard_number: ShardNumber(7),
     637            2 :         };
     638            2 : 
     639            2 :         let encoded = bincode::serialize(&example).unwrap();
     640            2 :         let expected: [u8; 18] = [
     641            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     642            2 :             0xf6, 0xfc, 0x07, 0x0a,
     643            2 :         ];
     644            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     645              : 
     646            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     647            2 : 
     648            2 :         assert_eq!(example, decoded);
     649              : 
     650            2 :         Ok(())
     651            2 :     }
     652              : 
     653            2 :     #[test]
     654            2 :     fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
     655            2 :         // Test that TenantShardId can decode a TenantId in human
     656            2 :         // readable form
     657            2 :         let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     658            2 :         let encoded = format!("{example}");
     659            2 : 
     660            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     661              : 
     662            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     663              : 
     664            2 :         assert_eq!(example, decoded.tenant_id);
     665            2 :         assert_eq!(decoded.shard_count, ShardCount(0));
     666            2 :         assert_eq!(decoded.shard_number, ShardNumber(0));
     667              : 
     668            2 :         Ok(())
     669            2 :     }
     670              : 
     671            2 :     #[test]
     672            2 :     fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
     673            2 :         // Test that a legacy TenantShardId encodes into a form that
     674            2 :         // can be decoded as TenantId
     675            2 :         let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     676            2 :         let example = TenantShardId::unsharded(example_tenant_id);
     677            2 :         let encoded = format!("{example}");
     678            2 : 
     679            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     680              : 
     681            2 :         let decoded = TenantId::from_str(&encoded)?;
     682              : 
     683            2 :         assert_eq!(example_tenant_id, decoded);
     684              : 
     685            2 :         Ok(())
     686            2 :     }
     687              : 
     688            2 :     #[test]
     689            2 :     fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
     690            2 :         // Unlike in human readable encoding, binary encoding does not
     691            2 :         // do any special handling of legacy unsharded TenantIds: this test
     692            2 :         // is equivalent to the main test for binary encoding, just verifying
     693            2 :         // that the same behavior applies when we have used `unsharded()` to
     694            2 :         // construct a TenantShardId.
     695            2 :         let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
     696            2 :         let encoded = bincode::serialize(&example).unwrap();
     697            2 : 
     698            2 :         let expected: [u8; 18] = [
     699            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     700            2 :             0xf6, 0xfc, 0x00, 0x00,
     701            2 :         ];
     702            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     703              : 
     704            2 :         let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
     705            2 :         assert_eq!(example, decoded);
     706              : 
     707            2 :         Ok(())
     708            2 :     }
     709              : 
     710            2 :     #[test]
     711            2 :     fn shard_identity_validation() -> Result<(), ShardConfigError> {
     712            2 :         // Happy cases
     713            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
     714            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
     715            2 :         ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
     716              : 
     717            2 :         assert_eq!(
     718            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
     719            2 :             Err(ShardConfigError::InvalidCount)
     720            2 :         );
     721            2 :         assert_eq!(
     722            2 :             ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
     723            2 :             Err(ShardConfigError::InvalidNumber)
     724            2 :         );
     725            2 :         assert_eq!(
     726            2 :             ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
     727            2 :             Err(ShardConfigError::InvalidNumber)
     728            2 :         );
     729            2 :         assert_eq!(
     730            2 :             ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
     731            2 :             Err(ShardConfigError::InvalidNumber)
     732            2 :         );
     733            2 :         assert_eq!(
     734            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
     735            2 :             Err(ShardConfigError::InvalidStripeSize)
     736            2 :         );
     737              : 
     738            2 :         Ok(())
     739            2 :     }
     740              : 
     741            2 :     #[test]
     742            2 :     fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
     743            2 :         let example = ShardIndex {
     744            2 :             shard_number: ShardNumber(13),
     745            2 :             shard_count: ShardCount(17),
     746            2 :         };
     747            2 :         let expected: String = "0d11".to_string();
     748            2 :         let encoded = format!("{example}");
     749            2 :         assert_eq!(&encoded, &expected);
     750              : 
     751            2 :         let decoded = ShardIndex::from_str(&encoded)?;
     752            2 :         assert_eq!(example, decoded);
     753            2 :         Ok(())
     754            2 :     }
     755              : 
     756            2 :     #[test]
     757            2 :     fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
     758            2 :         let example = ShardIndex {
     759            2 :             shard_number: ShardNumber(13),
     760            2 :             shard_count: ShardCount(17),
     761            2 :         };
     762            2 :         let expected: [u8; 2] = [0x0d, 0x11];
     763            2 : 
     764            2 :         let encoded = bincode::serialize(&example).unwrap();
     765            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     766            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     767            2 :         assert_eq!(example, decoded);
     768              : 
     769            2 :         Ok(())
     770            2 :     }
     771              : 
     772              :     // These are only smoke tests to spot check that our implementation doesn't
     773              :     // deviate from a few examples values: not aiming to validate the overall
     774              :     // hashing algorithm.
     775            2 :     #[test]
     776            2 :     fn murmur_hash() {
     777            2 :         assert_eq!(murmurhash32(0), 0);
     778              : 
     779            2 :         assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
     780            2 :     }
     781              : 
     782            2 :     #[test]
     783            2 :     fn shard_mapping() {
     784            2 :         let key = Key {
     785            2 :             field1: 0x00,
     786            2 :             field2: 0x67f,
     787            2 :             field3: 0x5,
     788            2 :             field4: 0x400c,
     789            2 :             field5: 0x00,
     790            2 :             field6: 0x7d06,
     791            2 :         };
     792            2 : 
     793            2 :         let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
     794            2 :         assert_eq!(shard, ShardNumber(8));
     795            2 :     }
     796              : }
        

Generated by: LCOV version 2.1-beta