LCOV - differential code coverage report
Current view: top level - libs/pageserver_api/src - shard.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.0 % 498 453 45 453
Current Date: 2024-01-09 02:06:09 Functions: 39.4 % 208 82 126 82
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::{ops::RangeInclusive, str::FromStr};
       2                 : 
       3                 : use crate::key::{is_rel_block_key, Key};
       4                 : use hex::FromHex;
       5                 : use serde::{Deserialize, Serialize};
       6                 : use thiserror;
       7                 : use utils::id::TenantId;
       8                 : 
       9 CBC    11069924 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      10                 : pub struct ShardNumber(pub u8);
      11                 : 
      12        69860880 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      13                 : pub struct ShardCount(pub u8);
      14                 : 
      15                 : impl ShardCount {
      16                 :     pub const MAX: Self = Self(u8::MAX);
      17                 : }
      18                 : 
      19                 : impl ShardNumber {
      20                 :     pub const MAX: Self = Self(u8::MAX);
      21                 : }
      22                 : 
      23                 : /// TenantShardId identify the units of work for the Pageserver.
      24                 : ///
      25                 : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
      26                 : ///
      27                 : ///   # The second shard in a two-shard tenant
      28                 : ///   072f1291a5310026820b2fe4b2968934-0102
      29                 : ///
      30                 : /// Historically, tenants could not have multiple shards, and were identified
      31                 : /// by TenantId.  To support this, TenantShardId has a special legacy
      32                 : /// mode where `shard_count` is equal to zero: this represents a single-sharded
      33                 : /// tenant which should be written as a TenantId with no suffix.
      34                 : ///
      35                 : /// The human-readable encoding of TenantShardId, such as used in API URLs,
      36                 : /// is both forward and backward compatible: a legacy TenantId can be
      37                 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      38                 : /// as a TenantId.
      39                 : ///
      40                 : /// Note that the binary encoding is _not_ backward compatible, because
      41                 : /// at the time sharding is introduced, there are no existing binary structures
      42                 : /// containing TenantId that we need to handle.
      43        12041212 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      44                 : pub struct TenantShardId {
      45                 :     pub tenant_id: TenantId,
      46                 :     pub shard_number: ShardNumber,
      47                 :     pub shard_count: ShardCount,
      48                 : }
      49                 : 
      50                 : impl TenantShardId {
      51             798 :     pub fn unsharded(tenant_id: TenantId) -> Self {
      52             798 :         Self {
      53             798 :             tenant_id,
      54             798 :             shard_number: ShardNumber(0),
      55             798 :             shard_count: ShardCount(0),
      56             798 :         }
      57             798 :     }
      58                 : 
      59                 :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
      60                 :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
      61            8067 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
      62            8067 :         RangeInclusive::new(
      63            8067 :             Self {
      64            8067 :                 tenant_id,
      65            8067 :                 shard_number: ShardNumber(0),
      66            8067 :                 shard_count: ShardCount(0),
      67            8067 :             },
      68            8067 :             Self {
      69            8067 :                 tenant_id,
      70            8067 :                 shard_number: ShardNumber::MAX,
      71            8067 :                 shard_count: ShardCount::MAX,
      72            8067 :             },
      73            8067 :         )
      74            8067 :     }
      75                 : 
      76         2121323 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
      77         2121323 :         ShardSlug(self)
      78         2121323 :     }
      79                 : 
      80                 :     /// Convenience for code that has special behavior on the 0th shard.
      81             136 :     pub fn is_zero(&self) -> bool {
      82             136 :         self.shard_number == ShardNumber(0)
      83             136 :     }
      84                 : 
      85 UBC           0 :     pub fn is_unsharded(&self) -> bool {
      86               0 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
      87               0 :     }
      88                 : }
      89                 : 
      90                 : /// Formatting helper
      91                 : struct ShardSlug<'a>(&'a TenantShardId);
      92                 : 
      93                 : impl<'a> std::fmt::Display for ShardSlug<'a> {
      94 CBC     2121323 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      95         2121323 :         write!(
      96         2121323 :             f,
      97         2121323 :             "{:02x}{:02x}",
      98         2121323 :             self.0.shard_number.0, self.0.shard_count.0
      99         2121323 :         )
     100         2121323 :     }
     101                 : }
     102                 : 
     103                 : impl std::fmt::Display for TenantShardId {
     104          164121 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     105          164121 :         if self.shard_count != ShardCount(0) {
     106               1 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     107                 :         } else {
     108                 :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     109                 :             // is distinct from the normal single shard case (shard count == 1).
     110          164120 :             self.tenant_id.fmt(f)
     111                 :         }
     112          164121 :     }
     113                 : }
     114                 : 
     115                 : impl std::fmt::Debug for TenantShardId {
     116 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     117               0 :         // Debug is the same as Display: the compact hex representation
     118               0 :         write!(f, "{}", self)
     119               0 :     }
     120                 : }
     121                 : 
     122                 : impl std::str::FromStr for TenantShardId {
     123                 :     type Err = hex::FromHexError;
     124                 : 
     125 CBC       10148 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     126           10148 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     127           10148 :         if s.len() == 32 {
     128                 :             // Legacy case: no shard specified
     129                 :             Ok(Self {
     130           10147 :                 tenant_id: TenantId::from_str(s)?,
     131           10147 :                 shard_number: ShardNumber(0),
     132           10147 :                 shard_count: ShardCount(0),
     133                 :             })
     134               1 :         } else if s.len() == 37 {
     135               1 :             let bytes = s.as_bytes();
     136               1 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     137               1 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     138               1 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     139               1 :             Ok(Self {
     140               1 :                 tenant_id,
     141               1 :                 shard_number: ShardNumber(shard_parts[0]),
     142               1 :                 shard_count: ShardCount(shard_parts[1]),
     143               1 :             })
     144                 :         } else {
     145 UBC           0 :             Err(hex::FromHexError::InvalidStringLength)
     146                 :         }
     147 CBC       10148 :     }
     148                 : }
     149                 : 
     150                 : impl From<[u8; 18]> for TenantShardId {
     151               2 :     fn from(b: [u8; 18]) -> Self {
     152               2 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     153               2 : 
     154               2 :         Self {
     155               2 :             tenant_id: TenantId::from(tenant_id_bytes),
     156               2 :             shard_number: ShardNumber(b[16]),
     157               2 :             shard_count: ShardCount(b[17]),
     158               2 :         }
     159               2 :     }
     160                 : }
     161                 : 
     162                 : /// For use within the context of a particular tenant, when we need to know which
     163                 : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
     164                 : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
     165                 : /// TenantShardId.
     166          569516 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
     167                 : pub struct ShardIndex {
     168                 :     pub shard_number: ShardNumber,
     169                 :     pub shard_count: ShardCount,
     170                 : }
     171                 : 
     172                 : impl ShardIndex {
     173 UBC           0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     174               0 :         Self {
     175               0 :             shard_number: number,
     176               0 :             shard_count: count,
     177               0 :         }
     178               0 :     }
     179 CBC       61922 :     pub fn unsharded() -> Self {
     180           61922 :         Self {
     181           61922 :             shard_number: ShardNumber(0),
     182           61922 :             shard_count: ShardCount(0),
     183           61922 :         }
     184           61922 :     }
     185                 : 
     186          990479 :     pub fn is_unsharded(&self) -> bool {
     187          990479 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     188          990479 :     }
     189                 : 
     190                 :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     191                 :     /// to get a fully qualified TenantShardId.
     192                 :     ///
     193                 :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     194                 :     /// that the legacy pre-sharding remote key format is preserved.
     195           17953 :     pub fn get_suffix(&self) -> String {
     196           17953 :         if self.is_unsharded() {
     197           17953 :             "".to_string()
     198                 :         } else {
     199 UBC           0 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     200                 :         }
     201 CBC       17953 :     }
     202                 : }
     203                 : 
     204                 : impl std::fmt::Display for ShardIndex {
     205            4934 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     206            4934 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     207            4934 :     }
     208                 : }
     209                 : 
     210                 : impl std::fmt::Debug for ShardIndex {
     211             633 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     212             633 :         // Debug is the same as Display: the compact hex representation
     213             633 :         write!(f, "{}", self)
     214             633 :     }
     215                 : }
     216                 : 
     217                 : impl std::str::FromStr for ShardIndex {
     218                 :     type Err = hex::FromHexError;
     219                 : 
     220               1 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     221               1 :         // Expect format: 1 byte shard number, 1 byte shard count
     222               1 :         if s.len() == 4 {
     223               1 :             let bytes = s.as_bytes();
     224               1 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     225               1 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     226               1 :             Ok(Self {
     227               1 :                 shard_number: ShardNumber(shard_parts[0]),
     228               1 :                 shard_count: ShardCount(shard_parts[1]),
     229               1 :             })
     230                 :         } else {
     231 UBC           0 :             Err(hex::FromHexError::InvalidStringLength)
     232                 :         }
     233 CBC           1 :     }
     234                 : }
     235                 : 
     236                 : impl From<[u8; 2]> for ShardIndex {
     237               1 :     fn from(b: [u8; 2]) -> Self {
     238               1 :         Self {
     239               1 :             shard_number: ShardNumber(b[0]),
     240               1 :             shard_count: ShardCount(b[1]),
     241               1 :         }
     242               1 :     }
     243                 : }
     244                 : 
     245                 : impl Serialize for TenantShardId {
     246            5616 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     247            5616 :     where
     248            5616 :         S: serde::Serializer,
     249            5616 :     {
     250            5616 :         if serializer.is_human_readable() {
     251            5612 :             serializer.collect_str(self)
     252                 :         } else {
     253               4 :             let mut packed: [u8; 18] = [0; 18];
     254               4 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     255               4 :             packed[16] = self.shard_number.0;
     256               4 :             packed[17] = self.shard_count.0;
     257               4 : 
     258               4 :             packed.serialize(serializer)
     259                 :         }
     260            5616 :     }
     261                 : }
     262                 : 
     263                 : impl<'de> Deserialize<'de> for TenantShardId {
     264            2494 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     265            2494 :     where
     266            2494 :         D: serde::Deserializer<'de>,
     267            2494 :     {
     268            2494 :         struct IdVisitor {
     269            2494 :             is_human_readable_deserializer: bool,
     270            2494 :         }
     271            2494 : 
     272            2494 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     273            2494 :             type Value = TenantShardId;
     274            2494 : 
     275            2494 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     276 UBC           0 :                 if self.is_human_readable_deserializer {
     277 CBC        2494 :                     formatter.write_str("value in form of hex string")
     278            2494 :                 } else {
     279            2494 :                     formatter.write_str("value in form of integer array([u8; 18])")
     280            2494 :                 }
     281            2494 :             }
     282            2494 : 
     283            2494 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     284               2 :             where
     285               2 :                 A: serde::de::SeqAccess<'de>,
     286               2 :             {
     287               2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     288            2494 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     289            2494 :                 Ok(TenantShardId::from(id))
     290            2494 :             }
     291            2494 : 
     292            2494 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     293            2492 :             where
     294            2492 :                 E: serde::de::Error,
     295            2492 :             {
     296            2492 :                 TenantShardId::from_str(v).map_err(E::custom)
     297            2492 :             }
     298            2494 :         }
     299            2494 : 
     300            2494 :         if deserializer.is_human_readable() {
     301            2492 :             deserializer.deserialize_str(IdVisitor {
     302            2492 :                 is_human_readable_deserializer: true,
     303            2492 :             })
     304                 :         } else {
     305               2 :             deserializer.deserialize_tuple(
     306               2 :                 18,
     307               2 :                 IdVisitor {
     308               2 :                     is_human_readable_deserializer: false,
     309               2 :                 },
     310               2 :             )
     311                 :         }
     312            2494 :     }
     313                 : }
     314                 : 
     315                 : /// Stripe size in number of pages
     316 UBC           0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     317                 : pub struct ShardStripeSize(pub u32);
     318                 : 
     319                 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
     320 CBC    69860879 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     321                 : pub struct ShardLayout(u8);
     322                 : 
     323                 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
     324                 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
     325                 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
     326                 : 
     327                 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
     328                 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
     329                 : 
     330                 : /// The ShardIdentity contains the information needed for one member of map
     331                 : /// to resolve a key to a shard, and then check whether that shard is ==self.
     332              20 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     333                 : pub struct ShardIdentity {
     334                 :     pub number: ShardNumber,
     335                 :     pub count: ShardCount,
     336                 :     stripe_size: ShardStripeSize,
     337                 :     layout: ShardLayout,
     338                 : }
     339                 : 
     340               5 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
     341                 : pub enum ShardConfigError {
     342                 :     #[error("Invalid shard count")]
     343                 :     InvalidCount,
     344                 :     #[error("Invalid shard number")]
     345                 :     InvalidNumber,
     346                 :     #[error("Invalid stripe size")]
     347                 :     InvalidStripeSize,
     348                 : }
     349                 : 
     350                 : impl ShardIdentity {
     351                 :     /// An identity with number=0 count=0 is a "none" identity, which represents legacy
     352                 :     /// tenants.  Modern single-shard tenants should not use this: they should
     353                 :     /// have number=0 count=1.
     354             917 :     pub fn unsharded() -> Self {
     355             917 :         Self {
     356             917 :             number: ShardNumber(0),
     357             917 :             count: ShardCount(0),
     358             917 :             layout: LAYOUT_V1,
     359             917 :             stripe_size: DEFAULT_STRIPE_SIZE,
     360             917 :         }
     361             917 :     }
     362                 : 
     363                 :     /// A broken instance of this type is only used for `TenantState::Broken` tenants,
     364                 :     /// which are constructed in code paths that don't have access to proper configuration.
     365                 :     ///
     366                 :     /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
     367                 :     /// Enforcement is via assertions, to avoid making our interface fallible for this
     368                 :     /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
     369                 :     /// state, and by extension to avoid trying to do any page->shard resolution.
     370 UBC           0 :     pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
     371               0 :         Self {
     372               0 :             number,
     373               0 :             count,
     374               0 :             layout: LAYOUT_BROKEN,
     375               0 :             stripe_size: DEFAULT_STRIPE_SIZE,
     376               0 :         }
     377               0 :     }
     378                 : 
     379 CBC        1624 :     pub fn is_unsharded(&self) -> bool {
     380            1624 :         self.number == ShardNumber(0) && self.count == ShardCount(0)
     381            1624 :     }
     382                 : 
     383                 :     /// Count must be nonzero, and number must be < count. To construct
     384                 :     /// the legacy case (count==0), use Self::unsharded instead.
     385               8 :     pub fn new(
     386               8 :         number: ShardNumber,
     387               8 :         count: ShardCount,
     388               8 :         stripe_size: ShardStripeSize,
     389               8 :     ) -> Result<Self, ShardConfigError> {
     390               8 :         if count.0 == 0 {
     391               1 :             Err(ShardConfigError::InvalidCount)
     392               7 :         } else if number.0 > count.0 - 1 {
     393               3 :             Err(ShardConfigError::InvalidNumber)
     394               4 :         } else if stripe_size.0 == 0 {
     395               1 :             Err(ShardConfigError::InvalidStripeSize)
     396                 :         } else {
     397               3 :             Ok(Self {
     398               3 :                 number,
     399               3 :                 count,
     400               3 :                 layout: LAYOUT_V1,
     401               3 :                 stripe_size,
     402               3 :             })
     403                 :         }
     404               8 :     }
     405                 : 
     406        69860879 :     fn is_broken(&self) -> bool {
     407        69860879 :         self.layout == LAYOUT_BROKEN
     408        69860879 :     }
     409                 : 
     410          228408 :     pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
     411          228408 :         assert!(!self.is_broken());
     412          228408 :         key_to_shard_number(self.count, self.stripe_size, key)
     413          228408 :     }
     414                 : 
     415                 :     /// Return true if the key should be ingested by this shard
     416        69632471 :     pub fn is_key_local(&self, key: &Key) -> bool {
     417        69632471 :         assert!(!self.is_broken());
     418        69632471 :         if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
     419        69632471 :             true
     420                 :         } else {
     421 UBC           0 :             key_to_shard_number(self.count, self.stripe_size, key) == self.number
     422                 :         }
     423 CBC    69632471 :     }
     424                 : 
     425                 :     /// Return true if the key should be discarded if found in this shard's
     426                 :     /// data store, e.g. during compaction after a split
     427        22558332 :     pub fn is_key_disposable(&self, key: &Key) -> bool {
     428        22558332 :         if key_is_shard0(key) {
     429                 :             // Q: Why can't we dispose of shard0 content if we're not shard 0?
     430                 :             // A: because the WAL ingestion logic currently ingests some shard 0
     431                 :             //    content on all shards, even though it's only read on shard 0.  If we
     432                 :             //    dropped it, then subsequent WAL ingest to these keys would encounter
     433                 :             //    an error.
     434         3510804 :             false
     435                 :         } else {
     436        19047528 :             !self.is_key_local(key)
     437                 :         }
     438        22558332 :     }
     439                 : 
     440 UBC           0 :     pub fn shard_slug(&self) -> String {
     441               0 :         if self.count > ShardCount(0) {
     442               0 :             format!("-{:02x}{:02x}", self.number.0, self.count.0)
     443                 :         } else {
     444               0 :             String::new()
     445                 :         }
     446               0 :     }
     447                 : 
     448                 :     /// Convenience for checking if this identity is the 0th shard in a tenant,
     449                 :     /// for special cases on shard 0 such as ingesting relation sizes.
     450               0 :     pub fn is_zero(&self) -> bool {
     451               0 :         self.number == ShardNumber(0)
     452               0 :     }
     453                 : }
     454                 : 
     455                 : impl Serialize for ShardIndex {
     456 CBC           2 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     457               2 :     where
     458               2 :         S: serde::Serializer,
     459               2 :     {
     460               2 :         if serializer.is_human_readable() {
     461 UBC           0 :             serializer.collect_str(self)
     462                 :         } else {
     463                 :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     464                 :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     465                 :             // compact binary encodings in future.
     466 CBC           2 :             let mut packed: [u8; 2] = [0; 2];
     467               2 :             packed[0] = self.shard_number.0;
     468               2 :             packed[1] = self.shard_count.0;
     469               2 :             packed.serialize(serializer)
     470                 :         }
     471               2 :     }
     472                 : }
     473                 : 
     474                 : impl<'de> Deserialize<'de> for ShardIndex {
     475               1 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     476               1 :     where
     477               1 :         D: serde::Deserializer<'de>,
     478               1 :     {
     479               1 :         struct IdVisitor {
     480               1 :             is_human_readable_deserializer: bool,
     481               1 :         }
     482               1 : 
     483               1 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     484               1 :             type Value = ShardIndex;
     485               1 : 
     486               1 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     487 UBC           0 :                 if self.is_human_readable_deserializer {
     488 CBC           1 :                     formatter.write_str("value in form of hex string")
     489               1 :                 } else {
     490               1 :                     formatter.write_str("value in form of integer array([u8; 2])")
     491               1 :                 }
     492               1 :             }
     493               1 : 
     494               1 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     495               1 :             where
     496               1 :                 A: serde::de::SeqAccess<'de>,
     497               1 :             {
     498               1 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     499               1 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     500               1 :                 Ok(ShardIndex::from(id))
     501               1 :             }
     502               1 : 
     503               1 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     504 UBC           0 :             where
     505               0 :                 E: serde::de::Error,
     506               0 :             {
     507               0 :                 ShardIndex::from_str(v).map_err(E::custom)
     508               0 :             }
     509 CBC           1 :         }
     510               1 : 
     511               1 :         if deserializer.is_human_readable() {
     512 UBC           0 :             deserializer.deserialize_str(IdVisitor {
     513               0 :                 is_human_readable_deserializer: true,
     514               0 :             })
     515                 :         } else {
     516 CBC           1 :             deserializer.deserialize_tuple(
     517               1 :                 2,
     518               1 :                 IdVisitor {
     519               1 :                     is_human_readable_deserializer: false,
     520               1 :                 },
     521               1 :             )
     522                 :         }
     523               1 :     }
     524                 : }
     525                 : 
     526                 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
     527                 : /// in order to be able to serve basebackup requests without peer communication).
     528        22558333 : fn key_is_shard0(key: &Key) -> bool {
     529        22558333 :     // To decide what to shard out to shards >0, we apply a simple rule that only
     530        22558333 :     // relation pages are distributed to shards other than shard zero. Everything else gets
     531        22558333 :     // stored on shard 0.  This guarantees that shard 0 can independently serve basebackup
     532        22558333 :     // requests, and any request other than those for particular blocks in relations.
     533        22558333 :     !is_rel_block_key(key)
     534        22558333 : }
     535                 : 
     536                 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     537               3 : fn murmurhash32(mut h: u32) -> u32 {
     538               3 :     h ^= h >> 16;
     539               3 :     h = h.wrapping_mul(0x85ebca6b);
     540               3 :     h ^= h >> 13;
     541               3 :     h = h.wrapping_mul(0xc2b2ae35);
     542               3 :     h ^= h >> 16;
     543               3 :     h
     544               3 : }
     545                 : 
     546                 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     547               2 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
     548               2 :     b = b.wrapping_add(0x9e3779b9);
     549               2 :     b = b.wrapping_add(a << 6);
     550               2 :     b = b.wrapping_add(a >> 2);
     551               2 : 
     552               2 :     a ^= b;
     553               2 :     a
     554               2 : }
     555                 : 
     556                 : /// Where a Key is to be distributed across shards, select the shard.  This function
     557                 : /// does not account for keys that should be broadcast across shards.
     558                 : ///
     559                 : /// The hashing in this function must exactly match what we do in postgres smgr
     560                 : /// code.  The resulting distribution of pages is intended to preserve locality within
     561                 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
     562                 : /// distributing data pseudo-randomly.
     563                 : ///
     564                 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
     565                 : /// and will be handled at higher levels when shards are split.
     566          228409 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
     567          228409 :     // Fast path for un-sharded tenants or broadcast keys
     568          228409 :     if count < ShardCount(2) || key_is_shard0(key) {
     569          228408 :         return ShardNumber(0);
     570               1 :     }
     571               1 : 
     572               1 :     // relNode
     573               1 :     let mut hash = murmurhash32(key.field4);
     574               1 :     // blockNum/stripe size
     575               1 :     hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
     576               1 : 
     577               1 :     ShardNumber((hash % count.0 as u32) as u8)
     578          228409 : }
     579                 : 
     580                 : #[cfg(test)]
     581                 : mod tests {
     582                 :     use std::str::FromStr;
     583                 : 
     584                 :     use bincode;
     585                 :     use utils::{id::TenantId, Hex};
     586                 : 
     587                 :     use super::*;
     588                 : 
     589                 :     const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
     590                 : 
     591               1 :     #[test]
     592               1 :     fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
     593               1 :         let example = TenantShardId {
     594               1 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     595               1 :             shard_count: ShardCount(10),
     596               1 :             shard_number: ShardNumber(7),
     597               1 :         };
     598               1 : 
     599               1 :         let encoded = format!("{example}");
     600               1 : 
     601               1 :         let expected = format!("{EXAMPLE_TENANT_ID}-070a");
     602               1 :         assert_eq!(&encoded, &expected);
     603                 : 
     604               1 :         let decoded = TenantShardId::from_str(&encoded)?;
     605                 : 
     606               1 :         assert_eq!(example, decoded);
     607                 : 
     608               1 :         Ok(())
     609               1 :     }
     610                 : 
     611               1 :     #[test]
     612               1 :     fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
     613               1 :         let example = TenantShardId {
     614               1 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     615               1 :             shard_count: ShardCount(10),
     616               1 :             shard_number: ShardNumber(7),
     617               1 :         };
     618               1 : 
     619               1 :         let encoded = bincode::serialize(&example).unwrap();
     620               1 :         let expected: [u8; 18] = [
     621               1 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     622               1 :             0xf6, 0xfc, 0x07, 0x0a,
     623               1 :         ];
     624               1 :         assert_eq!(Hex(&encoded), Hex(&expected));
     625                 : 
     626               1 :         let decoded = bincode::deserialize(&encoded).unwrap();
     627               1 : 
     628               1 :         assert_eq!(example, decoded);
     629                 : 
     630               1 :         Ok(())
     631               1 :     }
     632                 : 
     633               1 :     #[test]
     634               1 :     fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
     635               1 :         // Test that TenantShardId can decode a TenantId in human
     636               1 :         // readable form
     637               1 :         let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     638               1 :         let encoded = format!("{example}");
     639               1 : 
     640               1 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     641                 : 
     642               1 :         let decoded = TenantShardId::from_str(&encoded)?;
     643                 : 
     644               1 :         assert_eq!(example, decoded.tenant_id);
     645               1 :         assert_eq!(decoded.shard_count, ShardCount(0));
     646               1 :         assert_eq!(decoded.shard_number, ShardNumber(0));
     647                 : 
     648               1 :         Ok(())
     649               1 :     }
     650                 : 
     651               1 :     #[test]
     652               1 :     fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
     653               1 :         // Test that a legacy TenantShardId encodes into a form that
     654               1 :         // can be decoded as TenantId
     655               1 :         let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     656               1 :         let example = TenantShardId::unsharded(example_tenant_id);
     657               1 :         let encoded = format!("{example}");
     658               1 : 
     659               1 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     660                 : 
     661               1 :         let decoded = TenantId::from_str(&encoded)?;
     662                 : 
     663               1 :         assert_eq!(example_tenant_id, decoded);
     664                 : 
     665               1 :         Ok(())
     666               1 :     }
     667                 : 
     668               1 :     #[test]
     669               1 :     fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
     670               1 :         // Unlike in human readable encoding, binary encoding does not
     671               1 :         // do any special handling of legacy unsharded TenantIds: this test
     672               1 :         // is equivalent to the main test for binary encoding, just verifying
     673               1 :         // that the same behavior applies when we have used `unsharded()` to
     674               1 :         // construct a TenantShardId.
     675               1 :         let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
     676               1 :         let encoded = bincode::serialize(&example).unwrap();
     677               1 : 
     678               1 :         let expected: [u8; 18] = [
     679               1 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     680               1 :             0xf6, 0xfc, 0x00, 0x00,
     681               1 :         ];
     682               1 :         assert_eq!(Hex(&encoded), Hex(&expected));
     683                 : 
     684               1 :         let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
     685               1 :         assert_eq!(example, decoded);
     686                 : 
     687               1 :         Ok(())
     688               1 :     }
     689                 : 
     690               1 :     #[test]
     691               1 :     fn shard_identity_validation() -> Result<(), ShardConfigError> {
     692               1 :         // Happy cases
     693               1 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
     694               1 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
     695               1 :         ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
     696                 : 
     697               1 :         assert_eq!(
     698               1 :             ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
     699               1 :             Err(ShardConfigError::InvalidCount)
     700               1 :         );
     701               1 :         assert_eq!(
     702               1 :             ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
     703               1 :             Err(ShardConfigError::InvalidNumber)
     704               1 :         );
     705               1 :         assert_eq!(
     706               1 :             ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
     707               1 :             Err(ShardConfigError::InvalidNumber)
     708               1 :         );
     709               1 :         assert_eq!(
     710               1 :             ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
     711               1 :             Err(ShardConfigError::InvalidNumber)
     712               1 :         );
     713               1 :         assert_eq!(
     714               1 :             ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
     715               1 :             Err(ShardConfigError::InvalidStripeSize)
     716               1 :         );
     717                 : 
     718               1 :         Ok(())
     719               1 :     }
     720                 : 
     721               1 :     #[test]
     722               1 :     fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
     723               1 :         let example = ShardIndex {
     724               1 :             shard_number: ShardNumber(13),
     725               1 :             shard_count: ShardCount(17),
     726               1 :         };
     727               1 :         let expected: String = "0d11".to_string();
     728               1 :         let encoded = format!("{example}");
     729               1 :         assert_eq!(&encoded, &expected);
     730                 : 
     731               1 :         let decoded = ShardIndex::from_str(&encoded)?;
     732               1 :         assert_eq!(example, decoded);
     733               1 :         Ok(())
     734               1 :     }
     735                 : 
     736               1 :     #[test]
     737               1 :     fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
     738               1 :         let example = ShardIndex {
     739               1 :             shard_number: ShardNumber(13),
     740               1 :             shard_count: ShardCount(17),
     741               1 :         };
     742               1 :         let expected: [u8; 2] = [0x0d, 0x11];
     743               1 : 
     744               1 :         let encoded = bincode::serialize(&example).unwrap();
     745               1 :         assert_eq!(Hex(&encoded), Hex(&expected));
     746               1 :         let decoded = bincode::deserialize(&encoded).unwrap();
     747               1 :         assert_eq!(example, decoded);
     748                 : 
     749               1 :         Ok(())
     750               1 :     }
     751                 : 
     752                 :     // These are only smoke tests to spot check that our implementation doesn't
     753                 :     // deviate from a few examples values: not aiming to validate the overall
     754                 :     // hashing algorithm.
     755               1 :     #[test]
     756               1 :     fn murmur_hash() {
     757               1 :         assert_eq!(murmurhash32(0), 0);
     758                 : 
     759               1 :         assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
     760               1 :     }
     761                 : 
     762               1 :     #[test]
     763               1 :     fn shard_mapping() {
     764               1 :         let key = Key {
     765               1 :             field1: 0x00,
     766               1 :             field2: 0x67f,
     767               1 :             field3: 0x5,
     768               1 :             field4: 0x400c,
     769               1 :             field5: 0x00,
     770               1 :             field6: 0x7d06,
     771               1 :         };
     772               1 : 
     773               1 :         let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
     774               1 :         assert_eq!(shard, ShardNumber(8));
     775               1 :     }
     776                 : }
        

Generated by: LCOV version 2.1-beta