LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - shard.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 96.3 % 622 599
Test Date: 2024-02-12 20:26:03 Functions: 53.6 % 233 125

            Line data    Source code
       1              : use std::{ops::RangeInclusive, str::FromStr};
       2              : 
       3              : use crate::{
       4              :     key::{is_rel_block_key, Key},
       5              :     models::ShardParameters,
       6              : };
       7              : use hex::FromHex;
       8              : use serde::{Deserialize, Serialize};
       9              : use thiserror;
      10              : use utils::id::TenantId;
      11              : 
      12     43834135 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      13              : pub struct ShardNumber(pub u8);
      14              : 
      15    112095629 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      16              : pub struct ShardCount(pub u8);
      17              : 
      18              : impl ShardCount {
      19              :     pub const MAX: Self = Self(u8::MAX);
      20              : }
      21              : 
      22              : impl ShardNumber {
      23              :     pub const MAX: Self = Self(u8::MAX);
      24              : }
      25              : 
      26              : /// TenantShardId identify the units of work for the Pageserver.
      27              : ///
      28              : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
      29              : ///
      30              : ///   # The second shard in a two-shard tenant
      31              : ///   072f1291a5310026820b2fe4b2968934-0102
      32              : ///
      33              : /// Historically, tenants could not have multiple shards, and were identified
      34              : /// by TenantId.  To support this, TenantShardId has a special legacy
      35              : /// mode where `shard_count` is equal to zero: this represents a single-sharded
      36              : /// tenant which should be written as a TenantId with no suffix.
      37              : ///
      38              : /// The human-readable encoding of TenantShardId, such as used in API URLs,
      39              : /// is both forward and backward compatible: a legacy TenantId can be
      40              : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      41              : /// as a TenantId.
      42              : ///
      43              : /// Note that the binary encoding is _not_ backward compatible, because
      44              : /// at the time sharding is introduced, there are no existing binary structures
      45              : /// containing TenantId that we need to handle.
      46     14370456 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      47              : pub struct TenantShardId {
      48              :     pub tenant_id: TenantId,
      49              :     pub shard_number: ShardNumber,
      50              :     pub shard_count: ShardCount,
      51              : }
      52              : 
      53              : impl TenantShardId {
      54          692 :     pub fn unsharded(tenant_id: TenantId) -> Self {
      55          692 :         Self {
      56          692 :             tenant_id,
      57          692 :             shard_number: ShardNumber(0),
      58          692 :             shard_count: ShardCount(0),
      59          692 :         }
      60          692 :     }
      61              : 
      62              :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
      63              :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
      64        23339 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
      65        23339 :         RangeInclusive::new(
      66        23339 :             Self {
      67        23339 :                 tenant_id,
      68        23339 :                 shard_number: ShardNumber(0),
      69        23339 :                 shard_count: ShardCount(0),
      70        23339 :             },
      71        23339 :             Self {
      72        23339 :                 tenant_id,
      73        23339 :                 shard_number: ShardNumber::MAX,
      74        23339 :                 shard_count: ShardCount::MAX,
      75        23339 :             },
      76        23339 :         )
      77        23339 :     }
      78              : 
      79      7056301 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
      80      7056301 :         ShardSlug(self)
      81      7056301 :     }
      82              : 
      83              :     /// Convenience for code that has special behavior on the 0th shard.
      84      1510131 :     pub fn is_zero(&self) -> bool {
      85      1510131 :         self.shard_number == ShardNumber(0)
      86      1510131 :     }
      87              : 
      88            1 :     pub fn is_unsharded(&self) -> bool {
      89            1 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
      90            1 :     }
      91              : 
      92              :     /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
      93              :     /// is useful when logging from code that is already in a span that includes tenant ID, to
      94              :     /// keep messages reasonably terse.
      95         9872 :     pub fn to_index(&self) -> ShardIndex {
      96         9872 :         ShardIndex {
      97         9872 :             shard_number: self.shard_number,
      98         9872 :             shard_count: self.shard_count,
      99         9872 :         }
     100         9872 :     }
     101              : 
     102              :     /// Calculate the children of this TenantShardId when splitting the overall tenant into
     103              :     /// the given number of shards.
     104           18 :     pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
     105           18 :         let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
     106           18 :         let mut child_shards = Vec::new();
     107          100 :         for shard_number in 0..ShardNumber(new_shard_count.0).0 {
     108              :             // Key mapping is based on a round robin mapping of key hash modulo shard count,
     109              :             // so our child shards are the ones which the same keys would map to.
     110          100 :             if shard_number % effective_old_shard_count == self.shard_number.0 {
     111           44 :                 child_shards.push(TenantShardId {
     112           44 :                     tenant_id: self.tenant_id,
     113           44 :                     shard_number: ShardNumber(shard_number),
     114           44 :                     shard_count: new_shard_count,
     115           44 :                 })
     116           56 :             }
     117              :         }
     118              : 
     119           18 :         child_shards
     120           18 :     }
     121              : }
     122              : 
     123              : /// Formatting helper
     124              : struct ShardSlug<'a>(&'a TenantShardId);
     125              : 
     126              : impl<'a> std::fmt::Display for ShardSlug<'a> {
     127      7056301 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     128      7056301 :         write!(
     129      7056301 :             f,
     130      7056301 :             "{:02x}{:02x}",
     131      7056301 :             self.0.shard_number.0, self.0.shard_count.0
     132      7056301 :         )
     133      7056301 :     }
     134              : }
     135              : 
     136              : impl std::fmt::Display for TenantShardId {
     137       179205 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     138       179205 :         if self.shard_count != ShardCount(0) {
     139         4221 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     140              :         } else {
     141              :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     142              :             // is distinct from the normal single shard case (shard count == 1).
     143       174984 :             self.tenant_id.fmt(f)
     144              :         }
     145       179205 :     }
     146              : }
     147              : 
     148              : impl std::fmt::Debug for TenantShardId {
     149         1049 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     150         1049 :         // Debug is the same as Display: the compact hex representation
     151         1049 :         write!(f, "{}", self)
     152         1049 :     }
     153              : }
     154              : 
     155              : impl std::str::FromStr for TenantShardId {
     156              :     type Err = hex::FromHexError;
     157              : 
     158        82462 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     159        82462 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     160        82462 :         if s.len() == 32 {
     161              :             // Legacy case: no shard specified
     162              :             Ok(Self {
     163        79626 :                 tenant_id: TenantId::from_str(s)?,
     164        79626 :                 shard_number: ShardNumber(0),
     165        79626 :                 shard_count: ShardCount(0),
     166              :             })
     167         2836 :         } else if s.len() == 37 {
     168         2836 :             let bytes = s.as_bytes();
     169         2836 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     170         2836 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     171         2836 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     172         2836 :             Ok(Self {
     173         2836 :                 tenant_id,
     174         2836 :                 shard_number: ShardNumber(shard_parts[0]),
     175         2836 :                 shard_count: ShardCount(shard_parts[1]),
     176         2836 :             })
     177              :         } else {
     178            0 :             Err(hex::FromHexError::InvalidStringLength)
     179              :         }
     180        82462 :     }
     181              : }
     182              : 
     183              : impl From<[u8; 18]> for TenantShardId {
     184            4 :     fn from(b: [u8; 18]) -> Self {
     185            4 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     186            4 : 
     187            4 :         Self {
     188            4 :             tenant_id: TenantId::from(tenant_id_bytes),
     189            4 :             shard_number: ShardNumber(b[16]),
     190            4 :             shard_count: ShardCount(b[17]),
     191            4 :         }
     192            4 :     }
     193              : }
     194              : 
     195              : /// For use within the context of a particular tenant, when we need to know which
     196              : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
     197              : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
     198              : /// TenantShardId.
     199       679147 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
     200              : pub struct ShardIndex {
     201              :     pub shard_number: ShardNumber,
     202              :     pub shard_count: ShardCount,
     203              : }
     204              : 
     205              : impl ShardIndex {
     206            0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     207            0 :         Self {
     208            0 :             shard_number: number,
     209            0 :             shard_count: count,
     210            0 :         }
     211            0 :     }
     212        56092 :     pub fn unsharded() -> Self {
     213        56092 :         Self {
     214        56092 :             shard_number: ShardNumber(0),
     215        56092 :             shard_count: ShardCount(0),
     216        56092 :         }
     217        56092 :     }
     218              : 
     219      1222599 :     pub fn is_unsharded(&self) -> bool {
     220      1222599 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     221      1222599 :     }
     222              : 
     223              :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     224              :     /// to get a fully qualified TenantShardId.
     225              :     ///
     226              :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     227              :     /// that the legacy pre-sharding remote key format is preserved.
     228        19493 :     pub fn get_suffix(&self) -> String {
     229        19493 :         if self.is_unsharded() {
     230        19429 :             "".to_string()
     231              :         } else {
     232           64 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     233              :         }
     234        19493 :     }
     235              : }
     236              : 
     237              : impl std::fmt::Display for ShardIndex {
     238        28261 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     239        28261 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     240        28261 :     }
     241              : }
     242              : 
     243              : impl std::fmt::Debug for ShardIndex {
     244        22792 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     245        22792 :         // Debug is the same as Display: the compact hex representation
     246        22792 :         write!(f, "{}", self)
     247        22792 :     }
     248              : }
     249              : 
     250              : impl std::str::FromStr for ShardIndex {
     251              :     type Err = hex::FromHexError;
     252              : 
     253          106 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     254          106 :         // Expect format: 1 byte shard number, 1 byte shard count
     255          106 :         if s.len() == 4 {
     256          106 :             let bytes = s.as_bytes();
     257          106 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     258          106 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     259          106 :             Ok(Self {
     260          106 :                 shard_number: ShardNumber(shard_parts[0]),
     261          106 :                 shard_count: ShardCount(shard_parts[1]),
     262          106 :             })
     263              :         } else {
     264            0 :             Err(hex::FromHexError::InvalidStringLength)
     265              :         }
     266          106 :     }
     267              : }
     268              : 
     269              : impl From<[u8; 2]> for ShardIndex {
     270            2 :     fn from(b: [u8; 2]) -> Self {
     271            2 :         Self {
     272            2 :             shard_number: ShardNumber(b[0]),
     273            2 :             shard_count: ShardCount(b[1]),
     274            2 :         }
     275            2 :     }
     276              : }
     277              : 
     278              : impl Serialize for TenantShardId {
     279         9751 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     280         9751 :     where
     281         9751 :         S: serde::Serializer,
     282         9751 :     {
     283         9751 :         if serializer.is_human_readable() {
     284         9743 :             serializer.collect_str(self)
     285              :         } else {
     286            8 :             let mut packed: [u8; 18] = [0; 18];
     287            8 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     288            8 :             packed[16] = self.shard_number.0;
     289            8 :             packed[17] = self.shard_count.0;
     290            8 : 
     291            8 :             packed.serialize(serializer)
     292              :         }
     293         9751 :     }
     294              : }
     295              : 
     296              : impl<'de> Deserialize<'de> for TenantShardId {
     297         5612 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     298         5612 :     where
     299         5612 :         D: serde::Deserializer<'de>,
     300         5612 :     {
     301         5612 :         struct IdVisitor {
     302         5612 :             is_human_readable_deserializer: bool,
     303         5612 :         }
     304         5612 : 
     305         5612 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     306         5612 :             type Value = TenantShardId;
     307         5612 : 
     308         5612 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     309            0 :                 if self.is_human_readable_deserializer {
     310         5612 :                     formatter.write_str("value in form of hex string")
     311         5612 :                 } else {
     312         5612 :                     formatter.write_str("value in form of integer array([u8; 18])")
     313         5612 :                 }
     314         5612 :             }
     315         5612 : 
     316         5612 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     317            4 :             where
     318            4 :                 A: serde::de::SeqAccess<'de>,
     319            4 :             {
     320            4 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     321         5612 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     322         5612 :                 Ok(TenantShardId::from(id))
     323         5612 :             }
     324         5612 : 
     325         5612 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     326         5608 :             where
     327         5608 :                 E: serde::de::Error,
     328         5608 :             {
     329         5608 :                 TenantShardId::from_str(v).map_err(E::custom)
     330         5608 :             }
     331         5612 :         }
     332         5612 : 
     333         5612 :         if deserializer.is_human_readable() {
     334         5608 :             deserializer.deserialize_str(IdVisitor {
     335         5608 :                 is_human_readable_deserializer: true,
     336         5608 :             })
     337              :         } else {
     338            4 :             deserializer.deserialize_tuple(
     339            4 :                 18,
     340            4 :                 IdVisitor {
     341            4 :                     is_human_readable_deserializer: false,
     342            4 :                 },
     343            4 :             )
     344              :         }
     345         5612 :     }
     346              : }
     347              : 
     348              : /// Stripe size in number of pages
     349          940 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     350              : pub struct ShardStripeSize(pub u32);
     351              : 
     352              : /// Layout version: for future upgrades where we might change how the key->shard mapping works
     353     88482680 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     354              : pub struct ShardLayout(u8);
     355              : 
     356              : const LAYOUT_V1: ShardLayout = ShardLayout(1);
     357              : /// ShardIdentity uses a magic layout value to indicate if it is unusable
     358              : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
     359              : 
     360              : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
     361              : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
     362              : 
     363              : /// The ShardIdentity contains the information needed for one member of map
     364              : /// to resolve a key to a shard, and then check whether that shard is ==self.
     365          108 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     366              : pub struct ShardIdentity {
     367              :     pub number: ShardNumber,
     368              :     pub count: ShardCount,
     369              :     pub stripe_size: ShardStripeSize,
     370              :     layout: ShardLayout,
     371              : }
     372              : 
     373           10 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
     374              : pub enum ShardConfigError {
     375              :     #[error("Invalid shard count")]
     376              :     InvalidCount,
     377              :     #[error("Invalid shard number")]
     378              :     InvalidNumber,
     379              :     #[error("Invalid stripe size")]
     380              :     InvalidStripeSize,
     381              : }
     382              : 
     383              : impl ShardIdentity {
     384              :     /// An identity with number=0 count=0 is a "none" identity, which represents legacy
     385              :     /// tenants.  Modern single-shard tenants should not use this: they should
     386              :     /// have number=0 count=1.
     387          904 :     pub fn unsharded() -> Self {
     388          904 :         Self {
     389          904 :             number: ShardNumber(0),
     390          904 :             count: ShardCount(0),
     391          904 :             layout: LAYOUT_V1,
     392          904 :             stripe_size: DEFAULT_STRIPE_SIZE,
     393          904 :         }
     394          904 :     }
     395              : 
     396              :     /// A broken instance of this type is only used for `TenantState::Broken` tenants,
     397              :     /// which are constructed in code paths that don't have access to proper configuration.
     398              :     ///
     399              :     /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
     400              :     /// Enforcement is via assertions, to avoid making our interface fallible for this
     401              :     /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
     402              :     /// state, and by extension to avoid trying to do any page->shard resolution.
     403            0 :     pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
     404            0 :         Self {
     405            0 :             number,
     406            0 :             count,
     407            0 :             layout: LAYOUT_BROKEN,
     408            0 :             stripe_size: DEFAULT_STRIPE_SIZE,
     409            0 :         }
     410            0 :     }
     411              : 
     412         1966 :     pub fn is_unsharded(&self) -> bool {
     413         1966 :         self.number == ShardNumber(0) && self.count == ShardCount(0)
     414         1966 :     }
     415              : 
     416              :     /// Count must be nonzero, and number must be < count. To construct
     417              :     /// the legacy case (count==0), use Self::unsharded instead.
     418           82 :     pub fn new(
     419           82 :         number: ShardNumber,
     420           82 :         count: ShardCount,
     421           82 :         stripe_size: ShardStripeSize,
     422           82 :     ) -> Result<Self, ShardConfigError> {
     423           82 :         if count.0 == 0 {
     424            2 :             Err(ShardConfigError::InvalidCount)
     425           80 :         } else if number.0 > count.0 - 1 {
     426            6 :             Err(ShardConfigError::InvalidNumber)
     427           74 :         } else if stripe_size.0 == 0 {
     428            2 :             Err(ShardConfigError::InvalidStripeSize)
     429              :         } else {
     430           72 :             Ok(Self {
     431           72 :                 number,
     432           72 :                 count,
     433           72 :                 layout: LAYOUT_V1,
     434           72 :                 stripe_size,
     435           72 :             })
     436              :         }
     437           82 :     }
     438              : 
     439              :     /// For use when creating ShardIdentity instances for new shards, where a creation request
     440              :     /// specifies the ShardParameters that apply to all shards.
     441          777 :     pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
     442          777 :         Self {
     443          777 :             number,
     444          777 :             count: params.count,
     445          777 :             layout: LAYOUT_V1,
     446          777 :             stripe_size: params.stripe_size,
     447          777 :         }
     448          777 :     }
     449              : 
     450     88482680 :     fn is_broken(&self) -> bool {
     451     88482680 :         self.layout == LAYOUT_BROKEN
     452     88482680 :     }
     453              : 
     454       811936 :     pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
     455       811936 :         assert!(!self.is_broken());
     456       811936 :         key_to_shard_number(self.count, self.stripe_size, key)
     457       811936 :     }
     458              : 
     459              :     /// Return true if the key should be ingested by this shard
     460     87670744 :     pub fn is_key_local(&self, key: &Key) -> bool {
     461     87670744 :         assert!(!self.is_broken());
     462     87670744 :         if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
     463     68547946 :             true
     464              :         } else {
     465     19122798 :             key_to_shard_number(self.count, self.stripe_size, key) == self.number
     466              :         }
     467     87670744 :     }
     468              : 
     469              :     /// Return true if the key should be discarded if found in this shard's
     470              :     /// data store, e.g. during compaction after a split
     471     25250858 :     pub fn is_key_disposable(&self, key: &Key) -> bool {
     472     25250858 :         if key_is_shard0(key) {
     473              :             // Q: Why can't we dispose of shard0 content if we're not shard 0?
     474              :             // A: because the WAL ingestion logic currently ingests some shard 0
     475              :             //    content on all shards, even though it's only read on shard 0.  If we
     476              :             //    dropped it, then subsequent WAL ingest to these keys would encounter
     477              :             //    an error.
     478      4821049 :             false
     479              :         } else {
     480     20429809 :             !self.is_key_local(key)
     481              :         }
     482     25250858 :     }
     483              : 
     484            0 :     pub fn shard_slug(&self) -> String {
     485            0 :         if self.count > ShardCount(0) {
     486            0 :             format!("-{:02x}{:02x}", self.number.0, self.count.0)
     487              :         } else {
     488            0 :             String::new()
     489              :         }
     490            0 :     }
     491              : 
     492              :     /// Convenience for checking if this identity is the 0th shard in a tenant,
     493              :     /// for special cases on shard 0 such as ingesting relation sizes.
     494     13903908 :     pub fn is_zero(&self) -> bool {
     495     13903908 :         self.number == ShardNumber(0)
     496     13903908 :     }
     497              : }
     498              : 
     499              : impl Serialize for ShardIndex {
     500          248 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     501          248 :     where
     502          248 :         S: serde::Serializer,
     503          248 :     {
     504          248 :         if serializer.is_human_readable() {
     505          244 :             serializer.collect_str(self)
     506              :         } else {
     507              :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     508              :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     509              :             // compact binary encodings in future.
     510            4 :             let mut packed: [u8; 2] = [0; 2];
     511            4 :             packed[0] = self.shard_number.0;
     512            4 :             packed[1] = self.shard_count.0;
     513            4 :             packed.serialize(serializer)
     514              :         }
     515          248 :     }
     516              : }
     517              : 
     518              : impl<'de> Deserialize<'de> for ShardIndex {
     519          106 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     520          106 :     where
     521          106 :         D: serde::Deserializer<'de>,
     522          106 :     {
     523          106 :         struct IdVisitor {
     524          106 :             is_human_readable_deserializer: bool,
     525          106 :         }
     526          106 : 
     527          106 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     528          106 :             type Value = ShardIndex;
     529          106 : 
     530          106 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     531            0 :                 if self.is_human_readable_deserializer {
     532          106 :                     formatter.write_str("value in form of hex string")
     533          106 :                 } else {
     534          106 :                     formatter.write_str("value in form of integer array([u8; 2])")
     535          106 :                 }
     536          106 :             }
     537          106 : 
     538          106 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     539            2 :             where
     540            2 :                 A: serde::de::SeqAccess<'de>,
     541            2 :             {
     542            2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     543          106 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     544          106 :                 Ok(ShardIndex::from(id))
     545          106 :             }
     546          106 : 
     547          106 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     548          104 :             where
     549          104 :                 E: serde::de::Error,
     550          104 :             {
     551          104 :                 ShardIndex::from_str(v).map_err(E::custom)
     552          104 :             }
     553          106 :         }
     554          106 : 
     555          106 :         if deserializer.is_human_readable() {
     556          104 :             deserializer.deserialize_str(IdVisitor {
     557          104 :                 is_human_readable_deserializer: true,
     558          104 :             })
     559              :         } else {
     560            2 :             deserializer.deserialize_tuple(
     561            2 :                 2,
     562            2 :                 IdVisitor {
     563            2 :                     is_human_readable_deserializer: false,
     564            2 :                 },
     565            2 :             )
     566              :         }
     567          106 :     }
     568              : }
     569              : 
     570              : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
     571              : /// in order to be able to serve basebackup requests without peer communication).
     572     64070513 : fn key_is_shard0(key: &Key) -> bool {
     573     64070513 :     // To decide what to shard out to shards >0, we apply a simple rule that only
     574     64070513 :     // relation pages are distributed to shards other than shard zero. Everything else gets
     575     64070513 :     // stored on shard 0.  This guarantees that shard 0 can independently serve basebackup
     576     64070513 :     // requests, and any request other than those for particular blocks in relations.
     577     64070513 :     !is_rel_block_key(key)
     578     64070513 : }
     579              : 
     580              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     581     39393716 : fn murmurhash32(mut h: u32) -> u32 {
     582     39393716 :     h ^= h >> 16;
     583     39393716 :     h = h.wrapping_mul(0x85ebca6b);
     584     39393716 :     h ^= h >> 13;
     585     39393716 :     h = h.wrapping_mul(0xc2b2ae35);
     586     39393716 :     h ^= h >> 16;
     587     39393716 :     h
     588     39393716 : }
     589              : 
     590              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     591     19696859 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
     592     19696859 :     b = b.wrapping_add(0x9e3779b9);
     593     19696859 :     b = b.wrapping_add(a << 6);
     594     19696859 :     b = b.wrapping_add(a >> 2);
     595     19696859 : 
     596     19696859 :     a ^= b;
     597     19696859 :     a
     598     19696859 : }
     599              : 
     600              : /// Where a Key is to be distributed across shards, select the shard.  This function
     601              : /// does not account for keys that should be broadcast across shards.
     602              : ///
     603              : /// The hashing in this function must exactly match what we do in postgres smgr
     604              : /// code.  The resulting distribution of pages is intended to preserve locality within
     605              : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
     606              : /// distributing data pseudo-randomly.
     607              : ///
     608              : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
     609              : /// and will be handled at higher levels when shards are split.
     610     19934736 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
     611     19934736 :     // Fast path for un-sharded tenants or broadcast keys
     612     19934736 :     if count < ShardCount(2) || key_is_shard0(key) {
     613       237879 :         return ShardNumber(0);
     614     19696857 :     }
     615     19696857 : 
     616     19696857 :     // relNode
     617     19696857 :     let mut hash = murmurhash32(key.field4);
     618     19696857 :     // blockNum/stripe size
     619     19696857 :     hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
     620     19696857 : 
     621     19696857 :     ShardNumber((hash % count.0 as u32) as u8)
     622     19934736 : }
     623              : 
     624              : #[cfg(test)]
     625              : mod tests {
     626              :     use std::str::FromStr;
     627              : 
     628              :     use bincode;
     629              :     use utils::{id::TenantId, Hex};
     630              : 
     631              :     use super::*;
     632              : 
     633              :     const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
     634              : 
     635            2 :     #[test]
     636            2 :     fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
     637            2 :         let example = TenantShardId {
     638            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     639            2 :             shard_count: ShardCount(10),
     640            2 :             shard_number: ShardNumber(7),
     641            2 :         };
     642            2 : 
     643            2 :         let encoded = format!("{example}");
     644            2 : 
     645            2 :         let expected = format!("{EXAMPLE_TENANT_ID}-070a");
     646            2 :         assert_eq!(&encoded, &expected);
     647              : 
     648            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     649              : 
     650            2 :         assert_eq!(example, decoded);
     651              : 
     652            2 :         Ok(())
     653            2 :     }
     654              : 
     655            2 :     #[test]
     656            2 :     fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
     657            2 :         let example = TenantShardId {
     658            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     659            2 :             shard_count: ShardCount(10),
     660            2 :             shard_number: ShardNumber(7),
     661            2 :         };
     662            2 : 
     663            2 :         let encoded = bincode::serialize(&example).unwrap();
     664            2 :         let expected: [u8; 18] = [
     665            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     666            2 :             0xf6, 0xfc, 0x07, 0x0a,
     667            2 :         ];
     668            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     669              : 
     670            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     671            2 : 
     672            2 :         assert_eq!(example, decoded);
     673              : 
     674            2 :         Ok(())
     675            2 :     }
     676              : 
     677            2 :     #[test]
     678            2 :     fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
     679            2 :         // Test that TenantShardId can decode a TenantId in human
     680            2 :         // readable form
     681            2 :         let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     682            2 :         let encoded = format!("{example}");
     683            2 : 
     684            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     685              : 
     686            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     687              : 
     688            2 :         assert_eq!(example, decoded.tenant_id);
     689            2 :         assert_eq!(decoded.shard_count, ShardCount(0));
     690            2 :         assert_eq!(decoded.shard_number, ShardNumber(0));
     691              : 
     692            2 :         Ok(())
     693            2 :     }
     694              : 
     695            2 :     #[test]
     696            2 :     fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
     697            2 :         // Test that a legacy TenantShardId encodes into a form that
     698            2 :         // can be decoded as TenantId
     699            2 :         let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     700            2 :         let example = TenantShardId::unsharded(example_tenant_id);
     701            2 :         let encoded = format!("{example}");
     702            2 : 
     703            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     704              : 
     705            2 :         let decoded = TenantId::from_str(&encoded)?;
     706              : 
     707            2 :         assert_eq!(example_tenant_id, decoded);
     708              : 
     709            2 :         Ok(())
     710            2 :     }
     711              : 
     712            2 :     #[test]
     713            2 :     fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
     714            2 :         // Unlike in human readable encoding, binary encoding does not
     715            2 :         // do any special handling of legacy unsharded TenantIds: this test
     716            2 :         // is equivalent to the main test for binary encoding, just verifying
     717            2 :         // that the same behavior applies when we have used `unsharded()` to
     718            2 :         // construct a TenantShardId.
     719            2 :         let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
     720            2 :         let encoded = bincode::serialize(&example).unwrap();
     721            2 : 
     722            2 :         let expected: [u8; 18] = [
     723            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     724            2 :             0xf6, 0xfc, 0x00, 0x00,
     725            2 :         ];
     726            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     727              : 
     728            2 :         let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
     729            2 :         assert_eq!(example, decoded);
     730              : 
     731            2 :         Ok(())
     732            2 :     }
     733              : 
     734            2 :     #[test]
     735            2 :     fn shard_identity_validation() -> Result<(), ShardConfigError> {
     736            2 :         // Happy cases
     737            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
     738            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
     739            2 :         ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
     740              : 
     741            2 :         assert_eq!(
     742            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
     743            2 :             Err(ShardConfigError::InvalidCount)
     744            2 :         );
     745            2 :         assert_eq!(
     746            2 :             ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
     747            2 :             Err(ShardConfigError::InvalidNumber)
     748            2 :         );
     749            2 :         assert_eq!(
     750            2 :             ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
     751            2 :             Err(ShardConfigError::InvalidNumber)
     752            2 :         );
     753            2 :         assert_eq!(
     754            2 :             ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
     755            2 :             Err(ShardConfigError::InvalidNumber)
     756            2 :         );
     757            2 :         assert_eq!(
     758            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
     759            2 :             Err(ShardConfigError::InvalidStripeSize)
     760            2 :         );
     761              : 
     762            2 :         Ok(())
     763            2 :     }
     764              : 
     765            2 :     #[test]
     766            2 :     fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
     767            2 :         let example = ShardIndex {
     768            2 :             shard_number: ShardNumber(13),
     769            2 :             shard_count: ShardCount(17),
     770            2 :         };
     771            2 :         let expected: String = "0d11".to_string();
     772            2 :         let encoded = format!("{example}");
     773            2 :         assert_eq!(&encoded, &expected);
     774              : 
     775            2 :         let decoded = ShardIndex::from_str(&encoded)?;
     776            2 :         assert_eq!(example, decoded);
     777            2 :         Ok(())
     778            2 :     }
     779              : 
     780            2 :     #[test]
     781            2 :     fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
     782            2 :         let example = ShardIndex {
     783            2 :             shard_number: ShardNumber(13),
     784            2 :             shard_count: ShardCount(17),
     785            2 :         };
     786            2 :         let expected: [u8; 2] = [0x0d, 0x11];
     787            2 : 
     788            2 :         let encoded = bincode::serialize(&example).unwrap();
     789            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     790            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     791            2 :         assert_eq!(example, decoded);
     792              : 
     793            2 :         Ok(())
     794            2 :     }
     795              : 
     796              :     // These are only smoke tests to spot check that our implementation doesn't
     797              :     // deviate from a few examples values: not aiming to validate the overall
     798              :     // hashing algorithm.
     799            2 :     #[test]
     800            2 :     fn murmur_hash() {
     801            2 :         assert_eq!(murmurhash32(0), 0);
     802              : 
     803            2 :         assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
     804            2 :     }
     805              : 
     806            2 :     #[test]
     807            2 :     fn shard_mapping() {
     808            2 :         let key = Key {
     809            2 :             field1: 0x00,
     810            2 :             field2: 0x67f,
     811            2 :             field3: 0x5,
     812            2 :             field4: 0x400c,
     813            2 :             field5: 0x00,
     814            2 :             field6: 0x7d06,
     815            2 :         };
     816            2 : 
     817            2 :         let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
     818            2 :         assert_eq!(shard, ShardNumber(8));
     819            2 :     }
     820              : 
     821            2 :     #[test]
     822            2 :     fn shard_id_split() {
     823            2 :         let tenant_id = TenantId::generate();
     824            2 :         let parent = TenantShardId::unsharded(tenant_id);
     825            2 : 
     826            2 :         // Unsharded into 2
     827            2 :         assert_eq!(
     828            2 :             parent.split(ShardCount(2)),
     829            2 :             vec![
     830            2 :                 TenantShardId {
     831            2 :                     tenant_id,
     832            2 :                     shard_count: ShardCount(2),
     833            2 :                     shard_number: ShardNumber(0)
     834            2 :                 },
     835            2 :                 TenantShardId {
     836            2 :                     tenant_id,
     837            2 :                     shard_count: ShardCount(2),
     838            2 :                     shard_number: ShardNumber(1)
     839            2 :                 }
     840            2 :             ]
     841            2 :         );
     842              : 
     843              :         // Unsharded into 4
     844            2 :         assert_eq!(
     845            2 :             parent.split(ShardCount(4)),
     846            2 :             vec![
     847            2 :                 TenantShardId {
     848            2 :                     tenant_id,
     849            2 :                     shard_count: ShardCount(4),
     850            2 :                     shard_number: ShardNumber(0)
     851            2 :                 },
     852            2 :                 TenantShardId {
     853            2 :                     tenant_id,
     854            2 :                     shard_count: ShardCount(4),
     855            2 :                     shard_number: ShardNumber(1)
     856            2 :                 },
     857            2 :                 TenantShardId {
     858            2 :                     tenant_id,
     859            2 :                     shard_count: ShardCount(4),
     860            2 :                     shard_number: ShardNumber(2)
     861            2 :                 },
     862            2 :                 TenantShardId {
     863            2 :                     tenant_id,
     864            2 :                     shard_count: ShardCount(4),
     865            2 :                     shard_number: ShardNumber(3)
     866            2 :                 }
     867            2 :             ]
     868            2 :         );
     869              : 
     870              :         // count=1 into 2 (check this works the same as unsharded.)
     871            2 :         let parent = TenantShardId {
     872            2 :             tenant_id,
     873            2 :             shard_count: ShardCount(1),
     874            2 :             shard_number: ShardNumber(0),
     875            2 :         };
     876            2 :         assert_eq!(
     877            2 :             parent.split(ShardCount(2)),
     878            2 :             vec![
     879            2 :                 TenantShardId {
     880            2 :                     tenant_id,
     881            2 :                     shard_count: ShardCount(2),
     882            2 :                     shard_number: ShardNumber(0)
     883            2 :                 },
     884            2 :                 TenantShardId {
     885            2 :                     tenant_id,
     886            2 :                     shard_count: ShardCount(2),
     887            2 :                     shard_number: ShardNumber(1)
     888            2 :                 }
     889            2 :             ]
     890            2 :         );
     891              : 
     892              :         // count=2 into count=8
     893            2 :         let parent = TenantShardId {
     894            2 :             tenant_id,
     895            2 :             shard_count: ShardCount(2),
     896            2 :             shard_number: ShardNumber(1),
     897            2 :         };
     898            2 :         assert_eq!(
     899            2 :             parent.split(ShardCount(8)),
     900            2 :             vec![
     901            2 :                 TenantShardId {
     902            2 :                     tenant_id,
     903            2 :                     shard_count: ShardCount(8),
     904            2 :                     shard_number: ShardNumber(1)
     905            2 :                 },
     906            2 :                 TenantShardId {
     907            2 :                     tenant_id,
     908            2 :                     shard_count: ShardCount(8),
     909            2 :                     shard_number: ShardNumber(3)
     910            2 :                 },
     911            2 :                 TenantShardId {
     912            2 :                     tenant_id,
     913            2 :                     shard_count: ShardCount(8),
     914            2 :                     shard_number: ShardNumber(5)
     915            2 :                 },
     916            2 :                 TenantShardId {
     917            2 :                     tenant_id,
     918            2 :                     shard_count: ShardCount(8),
     919            2 :                     shard_number: ShardNumber(7)
     920            2 :                 },
     921            2 :             ]
     922            2 :         );
     923            2 :     }
     924              : }
        

Generated by: LCOV version 2.1-beta