LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - shard.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 87.4 % 636 556
Test Date: 2024-02-29 11:57:12 Functions: 33.2 % 241 80

            Line data    Source code
       1              : use std::{ops::RangeInclusive, str::FromStr};
       2              : 
       3              : use crate::{
       4              :     key::{is_rel_block_key, Key},
       5              :     models::ShardParameters,
       6              : };
       7              : use hex::FromHex;
       8              : use serde::{Deserialize, Serialize};
       9              : use thiserror;
      10              : use utils::id::TenantId;
      11              : 
      12       100752 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      13              : pub struct ShardNumber(pub u8);
      14              : 
      15       167032 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      16              : pub struct ShardCount(u8);
      17              : 
      18              : impl ShardCount {
      19              :     pub const MAX: Self = Self(u8::MAX);
      20              : 
      21              :     /// The internal value of a ShardCount may be zero, which means "1 shard, but use
      22              :     /// legacy format for TenantShardId that excludes the shard suffix", also known
      23              :     /// as `TenantShardId::unsharded`.
      24              :     ///
      25              :     /// This method returns the actual number of shards, i.e. if our internal value is
      26              :     /// zero, we return 1 (unsharded tenants have 1 shard).
      27            0 :     pub fn count(&self) -> u8 {
      28            0 :         if self.0 > 0 {
      29            0 :             self.0
      30              :         } else {
      31            0 :             1
      32              :         }
      33            0 :     }
      34              : 
      35              :     /// The literal internal value: this is **not** the number of shards in the
      36              :     /// tenant, as we have a special zero value for legacy unsharded tenants.  Use
      37              :     /// [`Self::count`] if you want to know the cardinality of shards.
      38            0 :     pub fn literal(&self) -> u8 {
      39            0 :         self.0
      40            0 :     }
      41              : 
      42            0 :     pub fn is_unsharded(&self) -> bool {
      43            0 :         self.0 == 0
      44            0 :     }
      45              : 
      46              :     /// `v` may be zero, or the number of shards in the tenant.  `v` is what
      47              :     /// [`Self::literal`] would return.
      48           92 :     pub fn new(val: u8) -> Self {
      49           92 :         Self(val)
      50           92 :     }
      51              : }
      52              : 
      53              : impl ShardNumber {
      54              :     pub const MAX: Self = Self(u8::MAX);
      55              : }
      56              : 
      57              : /// TenantShardId identify the units of work for the Pageserver.
      58              : ///
      59              : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
      60              : ///
      61              : ///   # The second shard in a two-shard tenant
      62              : ///   072f1291a5310026820b2fe4b2968934-0102
      63              : ///
      64              : /// Historically, tenants could not have multiple shards, and were identified
      65              : /// by TenantId.  To support this, TenantShardId has a special legacy
      66              : /// mode where `shard_count` is equal to zero: this represents a single-sharded
      67              : /// tenant which should be written as a TenantId with no suffix.
      68              : ///
      69              : /// The human-readable encoding of TenantShardId, such as used in API URLs,
      70              : /// is both forward and backward compatible: a legacy TenantId can be
      71              : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      72              : /// as a TenantId.
      73              : ///
      74              : /// Note that the binary encoding is _not_ backward compatible, because
      75              : /// at the time sharding is introduced, there are no existing binary structures
      76              : /// containing TenantId that we need to handle.
      77       503586 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      78              : pub struct TenantShardId {
      79              :     pub tenant_id: TenantId,
      80              :     pub shard_number: ShardNumber,
      81              :     pub shard_count: ShardCount,
      82              : }
      83              : 
      84              : impl TenantShardId {
      85          125 :     pub fn unsharded(tenant_id: TenantId) -> Self {
      86          125 :         Self {
      87          125 :             tenant_id,
      88          125 :             shard_number: ShardNumber(0),
      89          125 :             shard_count: ShardCount(0),
      90          125 :         }
      91          125 :     }
      92              : 
      93              :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
      94              :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
      95            0 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
      96            0 :         RangeInclusive::new(
      97            0 :             Self {
      98            0 :                 tenant_id,
      99            0 :                 shard_number: ShardNumber(0),
     100            0 :                 shard_count: ShardCount(0),
     101            0 :             },
     102            0 :             Self {
     103            0 :                 tenant_id,
     104            0 :                 shard_number: ShardNumber::MAX,
     105            0 :                 shard_count: ShardCount::MAX,
     106            0 :             },
     107            0 :         )
     108            0 :     }
     109              : 
     110         5466 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
     111         5466 :         ShardSlug(self)
     112         5466 :     }
     113              : 
     114              :     /// Convenience for code that has special behavior on the 0th shard.
     115           88 :     pub fn is_zero(&self) -> bool {
     116           88 :         self.shard_number == ShardNumber(0)
     117           88 :     }
     118              : 
     119            0 :     pub fn is_unsharded(&self) -> bool {
     120            0 :         self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
     121            0 :     }
     122              : 
     123              :     /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
     124              :     /// is useful when logging from code that is already in a span that includes tenant ID, to
     125              :     /// keep messages reasonably terse.
     126            0 :     pub fn to_index(&self) -> ShardIndex {
     127            0 :         ShardIndex {
     128            0 :             shard_number: self.shard_number,
     129            0 :             shard_count: self.shard_count,
     130            0 :         }
     131            0 :     }
     132              : 
     133              :     /// Calculate the children of this TenantShardId when splitting the overall tenant into
     134              :     /// the given number of shards.
     135            8 :     pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
     136            8 :         let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
     137            8 :         let mut child_shards = Vec::new();
     138           32 :         for shard_number in 0..ShardNumber(new_shard_count.0).0 {
     139              :             // Key mapping is based on a round robin mapping of key hash modulo shard count,
     140              :             // so our child shards are the ones which the same keys would map to.
     141           32 :             if shard_number % effective_old_shard_count == self.shard_number.0 {
     142           24 :                 child_shards.push(TenantShardId {
     143           24 :                     tenant_id: self.tenant_id,
     144           24 :                     shard_number: ShardNumber(shard_number),
     145           24 :                     shard_count: new_shard_count,
     146           24 :                 })
     147            8 :             }
     148              :         }
     149              : 
     150            8 :         child_shards
     151            8 :     }
     152              : }
     153              : 
     154              : /// Formatting helper
     155              : struct ShardSlug<'a>(&'a TenantShardId);
     156              : 
     157              : impl<'a> std::fmt::Display for ShardSlug<'a> {
     158         5466 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     159         5466 :         write!(
     160         5466 :             f,
     161         5466 :             "{:02x}{:02x}",
     162         5466 :             self.0.shard_number.0, self.0.shard_count.0
     163         5466 :         )
     164         5466 :     }
     165              : }
     166              : 
     167              : impl std::fmt::Display for TenantShardId {
     168         4473 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     169         4473 :         if self.shard_count != ShardCount(0) {
     170            2 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     171              :         } else {
     172              :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     173              :             // is distinct from the normal single shard case (shard count == 1).
     174         4471 :             self.tenant_id.fmt(f)
     175              :         }
     176         4473 :     }
     177              : }
     178              : 
     179              : impl std::fmt::Debug for TenantShardId {
     180            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     181            0 :         // Debug is the same as Display: the compact hex representation
     182            0 :         write!(f, "{}", self)
     183            0 :     }
     184              : }
     185              : 
     186              : impl std::str::FromStr for TenantShardId {
     187              :     type Err = hex::FromHexError;
     188              : 
     189         1666 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     190         1666 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     191         1666 :         if s.len() == 32 {
     192              :             // Legacy case: no shard specified
     193              :             Ok(Self {
     194         1664 :                 tenant_id: TenantId::from_str(s)?,
     195         1664 :                 shard_number: ShardNumber(0),
     196         1664 :                 shard_count: ShardCount(0),
     197              :             })
     198            2 :         } else if s.len() == 37 {
     199            2 :             let bytes = s.as_bytes();
     200            2 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     201            2 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     202            2 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     203            2 :             Ok(Self {
     204            2 :                 tenant_id,
     205            2 :                 shard_number: ShardNumber(shard_parts[0]),
     206            2 :                 shard_count: ShardCount(shard_parts[1]),
     207            2 :             })
     208              :         } else {
     209            0 :             Err(hex::FromHexError::InvalidStringLength)
     210              :         }
     211         1666 :     }
     212              : }
     213              : 
     214              : impl From<[u8; 18]> for TenantShardId {
     215            4 :     fn from(b: [u8; 18]) -> Self {
     216            4 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     217            4 : 
     218            4 :         Self {
     219            4 :             tenant_id: TenantId::from(tenant_id_bytes),
     220            4 :             shard_number: ShardNumber(b[16]),
     221            4 :             shard_count: ShardCount(b[17]),
     222            4 :         }
     223            4 :     }
     224              : }
     225              : 
     226              : /// For use within the context of a particular tenant, when we need to know which
     227              : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
     228              : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
     229              : /// TenantShardId.
     230         3816 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
     231              : pub struct ShardIndex {
     232              :     pub shard_number: ShardNumber,
     233              :     pub shard_count: ShardCount,
     234              : }
     235              : 
     236              : impl ShardIndex {
     237            0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     238            0 :         Self {
     239            0 :             shard_number: number,
     240            0 :             shard_count: count,
     241            0 :         }
     242            0 :     }
     243          152 :     pub fn unsharded() -> Self {
     244          152 :         Self {
     245          152 :             shard_number: ShardNumber(0),
     246          152 :             shard_count: ShardCount(0),
     247          152 :         }
     248          152 :     }
     249              : 
     250         5825 :     pub fn is_unsharded(&self) -> bool {
     251         5825 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     252         5825 :     }
     253              : 
     254              :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     255              :     /// to get a fully qualified TenantShardId.
     256              :     ///
     257              :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     258              :     /// that the legacy pre-sharding remote key format is preserved.
     259           13 :     pub fn get_suffix(&self) -> String {
     260           13 :         if self.is_unsharded() {
     261           13 :             "".to_string()
     262              :         } else {
     263            0 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     264              :         }
     265           13 :     }
     266              : }
     267              : 
     268              : impl std::fmt::Display for ShardIndex {
     269          868 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     270          868 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     271          868 :     }
     272              : }
     273              : 
     274              : impl std::fmt::Debug for ShardIndex {
     275          562 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     276          562 :         // Debug is the same as Display: the compact hex representation
     277          562 :         write!(f, "{}", self)
     278          562 :     }
     279              : }
     280              : 
     281              : impl std::str::FromStr for ShardIndex {
     282              :     type Err = hex::FromHexError;
     283              : 
     284            2 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     285            2 :         // Expect format: 1 byte shard number, 1 byte shard count
     286            2 :         if s.len() == 4 {
     287            2 :             let bytes = s.as_bytes();
     288            2 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     289            2 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     290            2 :             Ok(Self {
     291            2 :                 shard_number: ShardNumber(shard_parts[0]),
     292            2 :                 shard_count: ShardCount(shard_parts[1]),
     293            2 :             })
     294              :         } else {
     295            0 :             Err(hex::FromHexError::InvalidStringLength)
     296              :         }
     297            2 :     }
     298              : }
     299              : 
     300              : impl From<[u8; 2]> for ShardIndex {
     301            2 :     fn from(b: [u8; 2]) -> Self {
     302            2 :         Self {
     303            2 :             shard_number: ShardNumber(b[0]),
     304            2 :             shard_count: ShardCount(b[1]),
     305            2 :         }
     306            2 :     }
     307              : }
     308              : 
     309              : impl Serialize for TenantShardId {
     310           55 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     311           55 :     where
     312           55 :         S: serde::Serializer,
     313           55 :     {
     314           55 :         if serializer.is_human_readable() {
     315           47 :             serializer.collect_str(self)
     316              :         } else {
     317            8 :             let mut packed: [u8; 18] = [0; 18];
     318            8 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     319            8 :             packed[16] = self.shard_number.0;
     320            8 :             packed[17] = self.shard_count.0;
     321            8 : 
     322            8 :             packed.serialize(serializer)
     323              :         }
     324           55 :     }
     325              : }
     326              : 
     327              : impl<'de> Deserialize<'de> for TenantShardId {
     328           12 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     329           12 :     where
     330           12 :         D: serde::Deserializer<'de>,
     331           12 :     {
     332           12 :         struct IdVisitor {
     333           12 :             is_human_readable_deserializer: bool,
     334           12 :         }
     335           12 : 
     336           12 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     337           12 :             type Value = TenantShardId;
     338           12 : 
     339           12 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     340            0 :                 if self.is_human_readable_deserializer {
     341           12 :                     formatter.write_str("value in form of hex string")
     342           12 :                 } else {
     343           12 :                     formatter.write_str("value in form of integer array([u8; 18])")
     344           12 :                 }
     345           12 :             }
     346           12 : 
     347           12 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     348            4 :             where
     349            4 :                 A: serde::de::SeqAccess<'de>,
     350            4 :             {
     351            4 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     352           12 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     353           12 :                 Ok(TenantShardId::from(id))
     354           12 :             }
     355           12 : 
     356           12 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     357            8 :             where
     358            8 :                 E: serde::de::Error,
     359            8 :             {
     360            8 :                 TenantShardId::from_str(v).map_err(E::custom)
     361            8 :             }
     362           12 :         }
     363           12 : 
     364           12 :         if deserializer.is_human_readable() {
     365            8 :             deserializer.deserialize_str(IdVisitor {
     366            8 :                 is_human_readable_deserializer: true,
     367            8 :             })
     368              :         } else {
     369            4 :             deserializer.deserialize_tuple(
     370            4 :                 18,
     371            4 :                 IdVisitor {
     372            4 :                     is_human_readable_deserializer: false,
     373            4 :                 },
     374            4 :             )
     375              :         }
     376           12 :     }
     377              : }
     378              : 
     379              : /// Stripe size in number of pages
     380            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     381              : pub struct ShardStripeSize(pub u32);
     382              : 
     383              : /// Layout version: for future upgrades where we might change how the key->shard mapping works
     384       167030 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     385              : pub struct ShardLayout(u8);
     386              : 
     387              : const LAYOUT_V1: ShardLayout = ShardLayout(1);
     388              : /// ShardIdentity uses a magic layout value to indicate if it is unusable
     389              : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
     390              : 
     391              : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
     392              : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
     393              : 
     394              : /// The ShardIdentity contains the information needed for one member of map
     395              : /// to resolve a key to a shard, and then check whether that shard is ==self.
     396            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     397              : pub struct ShardIdentity {
     398              :     pub number: ShardNumber,
     399              :     pub count: ShardCount,
     400              :     pub stripe_size: ShardStripeSize,
     401              :     layout: ShardLayout,
     402              : }
     403              : 
     404           10 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
     405              : pub enum ShardConfigError {
     406              :     #[error("Invalid shard count")]
     407              :     InvalidCount,
     408              :     #[error("Invalid shard number")]
     409              :     InvalidNumber,
     410              :     #[error("Invalid stripe size")]
     411              :     InvalidStripeSize,
     412              : }
     413              : 
     414              : impl ShardIdentity {
     415              :     /// An identity with number=0 count=0 is a "none" identity, which represents legacy
     416              :     /// tenants.  Modern single-shard tenants should not use this: they should
     417              :     /// have number=0 count=1.
     418           88 :     pub fn unsharded() -> Self {
     419           88 :         Self {
     420           88 :             number: ShardNumber(0),
     421           88 :             count: ShardCount(0),
     422           88 :             layout: LAYOUT_V1,
     423           88 :             stripe_size: DEFAULT_STRIPE_SIZE,
     424           88 :         }
     425           88 :     }
     426              : 
     427              :     /// A broken instance of this type is only used for `TenantState::Broken` tenants,
     428              :     /// which are constructed in code paths that don't have access to proper configuration.
     429              :     ///
     430              :     /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
     431              :     /// Enforcement is via assertions, to avoid making our interface fallible for this
     432              :     /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
     433              :     /// state, and by extension to avoid trying to do any page->shard resolution.
     434            0 :     pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
     435            0 :         Self {
     436            0 :             number,
     437            0 :             count,
     438            0 :             layout: LAYOUT_BROKEN,
     439            0 :             stripe_size: DEFAULT_STRIPE_SIZE,
     440            0 :         }
     441            0 :     }
     442              : 
     443            0 :     pub fn is_unsharded(&self) -> bool {
     444            0 :         self.number == ShardNumber(0) && self.count == ShardCount(0)
     445            0 :     }
     446              : 
     447              :     /// Count must be nonzero, and number must be < count. To construct
     448              :     /// the legacy case (count==0), use Self::unsharded instead.
     449           18 :     pub fn new(
     450           18 :         number: ShardNumber,
     451           18 :         count: ShardCount,
     452           18 :         stripe_size: ShardStripeSize,
     453           18 :     ) -> Result<Self, ShardConfigError> {
     454           18 :         if count.0 == 0 {
     455            2 :             Err(ShardConfigError::InvalidCount)
     456           16 :         } else if number.0 > count.0 - 1 {
     457            6 :             Err(ShardConfigError::InvalidNumber)
     458           10 :         } else if stripe_size.0 == 0 {
     459            2 :             Err(ShardConfigError::InvalidStripeSize)
     460              :         } else {
     461            8 :             Ok(Self {
     462            8 :                 number,
     463            8 :                 count,
     464            8 :                 layout: LAYOUT_V1,
     465            8 :                 stripe_size,
     466            8 :             })
     467              :         }
     468           18 :     }
     469              : 
     470              :     /// For use when creating ShardIdentity instances for new shards, where a creation request
     471              :     /// specifies the ShardParameters that apply to all shards.
     472           88 :     pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
     473           88 :         Self {
     474           88 :             number,
     475           88 :             count: params.count,
     476           88 :             layout: LAYOUT_V1,
     477           88 :             stripe_size: params.stripe_size,
     478           88 :         }
     479           88 :     }
     480              : 
     481       167030 :     fn is_broken(&self) -> bool {
     482       167030 :         self.layout == LAYOUT_BROKEN
     483       167030 :     }
     484              : 
     485         3004 :     pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
     486         3004 :         assert!(!self.is_broken());
     487         3004 :         key_to_shard_number(self.count, self.stripe_size, key)
     488         3004 :     }
     489              : 
     490              :     /// Return true if the key should be ingested by this shard
     491       164026 :     pub fn is_key_local(&self, key: &Key) -> bool {
     492       164026 :         assert!(!self.is_broken());
     493       164026 :         if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
     494       164026 :             true
     495              :         } else {
     496            0 :             key_to_shard_number(self.count, self.stripe_size, key) == self.number
     497              :         }
     498       164026 :     }
     499              : 
     500              :     /// Return true if the key should be discarded if found in this shard's
     501              :     /// data store, e.g. during compaction after a split
     502      2605708 :     pub fn is_key_disposable(&self, key: &Key) -> bool {
     503      2605708 :         if key_is_shard0(key) {
     504              :             // Q: Why can't we dispose of shard0 content if we're not shard 0?
     505              :             // A1: because the WAL ingestion logic currently ingests some shard 0
     506              :             //     content on all shards, even though it's only read on shard 0.  If we
     507              :             //     dropped it, then subsequent WAL ingest to these keys would encounter
     508              :             //     an error.
     509              :             // A2: because key_is_shard0 also covers relation size keys, which are written
     510              :             //     on all shards even though they're only maintained accurately on shard 0.
     511      2587324 :             false
     512              :         } else {
     513        18384 :             !self.is_key_local(key)
     514              :         }
     515      2605708 :     }
     516              : 
     517            0 :     pub fn shard_slug(&self) -> String {
     518            0 :         if self.count > ShardCount(0) {
     519            0 :             format!("-{:02x}{:02x}", self.number.0, self.count.0)
     520              :         } else {
     521            0 :             String::new()
     522              :         }
     523            0 :     }
     524              : 
     525              :     /// Convenience for checking if this identity is the 0th shard in a tenant,
     526              :     /// for special cases on shard 0 such as ingesting relation sizes.
     527            0 :     pub fn is_zero(&self) -> bool {
     528            0 :         self.number == ShardNumber(0)
     529            0 :     }
     530              : }
     531              : 
     532              : impl Serialize for ShardIndex {
     533            4 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     534            4 :     where
     535            4 :         S: serde::Serializer,
     536            4 :     {
     537            4 :         if serializer.is_human_readable() {
     538            0 :             serializer.collect_str(self)
     539              :         } else {
     540              :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     541              :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     542              :             // compact binary encodings in future.
     543            4 :             let mut packed: [u8; 2] = [0; 2];
     544            4 :             packed[0] = self.shard_number.0;
     545            4 :             packed[1] = self.shard_count.0;
     546            4 :             packed.serialize(serializer)
     547              :         }
     548            4 :     }
     549              : }
     550              : 
     551              : impl<'de> Deserialize<'de> for ShardIndex {
     552            2 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     553            2 :     where
     554            2 :         D: serde::Deserializer<'de>,
     555            2 :     {
     556            2 :         struct IdVisitor {
     557            2 :             is_human_readable_deserializer: bool,
     558            2 :         }
     559            2 : 
     560            2 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     561            2 :             type Value = ShardIndex;
     562            2 : 
     563            2 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     564            0 :                 if self.is_human_readable_deserializer {
     565            2 :                     formatter.write_str("value in form of hex string")
     566            2 :                 } else {
     567            2 :                     formatter.write_str("value in form of integer array([u8; 2])")
     568            2 :                 }
     569            2 :             }
     570            2 : 
     571            2 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     572            2 :             where
     573            2 :                 A: serde::de::SeqAccess<'de>,
     574            2 :             {
     575            2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     576            2 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     577            2 :                 Ok(ShardIndex::from(id))
     578            2 :             }
     579            2 : 
     580            2 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     581            0 :             where
     582            0 :                 E: serde::de::Error,
     583            0 :             {
     584            0 :                 ShardIndex::from_str(v).map_err(E::custom)
     585            0 :             }
     586            2 :         }
     587            2 : 
     588            2 :         if deserializer.is_human_readable() {
     589            0 :             deserializer.deserialize_str(IdVisitor {
     590            0 :                 is_human_readable_deserializer: true,
     591            0 :             })
     592              :         } else {
     593            2 :             deserializer.deserialize_tuple(
     594            2 :                 2,
     595            2 :                 IdVisitor {
     596            2 :                     is_human_readable_deserializer: false,
     597            2 :                 },
     598            2 :             )
     599              :         }
     600            2 :     }
     601              : }
     602              : 
     603              : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
     604              : /// in order to be able to serve basebackup requests without peer communication).
     605      2605710 : fn key_is_shard0(key: &Key) -> bool {
     606      2605710 :     // To decide what to shard out to shards >0, we apply a simple rule that only
     607      2605710 :     // relation pages are distributed to shards other than shard zero. Everything else gets
     608      2605710 :     // stored on shard 0.  This guarantees that shard 0 can independently serve basebackup
     609      2605710 :     // requests, and any request other than those for particular blocks in relations.
     610      2605710 :     !is_rel_block_key(key)
     611      2605710 : }
     612              : 
     613              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     614            6 : fn murmurhash32(mut h: u32) -> u32 {
     615            6 :     h ^= h >> 16;
     616            6 :     h = h.wrapping_mul(0x85ebca6b);
     617            6 :     h ^= h >> 13;
     618            6 :     h = h.wrapping_mul(0xc2b2ae35);
     619            6 :     h ^= h >> 16;
     620            6 :     h
     621            6 : }
     622              : 
     623              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     624            4 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
     625            4 :     b = b.wrapping_add(0x9e3779b9);
     626            4 :     b = b.wrapping_add(a << 6);
     627            4 :     b = b.wrapping_add(a >> 2);
     628            4 : 
     629            4 :     a ^= b;
     630            4 :     a
     631            4 : }
     632              : 
     633              : /// Where a Key is to be distributed across shards, select the shard.  This function
     634              : /// does not account for keys that should be broadcast across shards.
     635              : ///
     636              : /// The hashing in this function must exactly match what we do in postgres smgr
     637              : /// code.  The resulting distribution of pages is intended to preserve locality within
     638              : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
     639              : /// distributing data pseudo-randomly.
     640              : ///
     641              : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
     642              : /// and will be handled at higher levels when shards are split.
     643         3006 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
     644         3006 :     // Fast path for un-sharded tenants or broadcast keys
     645         3006 :     if count < ShardCount(2) || key_is_shard0(key) {
     646         3004 :         return ShardNumber(0);
     647            2 :     }
     648            2 : 
     649            2 :     // relNode
     650            2 :     let mut hash = murmurhash32(key.field4);
     651            2 :     // blockNum/stripe size
     652            2 :     hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
     653            2 : 
     654            2 :     ShardNumber((hash % count.0 as u32) as u8)
     655         3006 : }
     656              : 
     657              : #[cfg(test)]
     658              : mod tests {
     659              :     use std::str::FromStr;
     660              : 
     661              :     use bincode;
     662              :     use utils::{id::TenantId, Hex};
     663              : 
     664              :     use super::*;
     665              : 
     666              :     const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
     667              : 
     668            2 :     #[test]
     669            2 :     fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
     670            2 :         let example = TenantShardId {
     671            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     672            2 :             shard_count: ShardCount(10),
     673            2 :             shard_number: ShardNumber(7),
     674            2 :         };
     675            2 : 
     676            2 :         let encoded = format!("{example}");
     677            2 : 
     678            2 :         let expected = format!("{EXAMPLE_TENANT_ID}-070a");
     679            2 :         assert_eq!(&encoded, &expected);
     680              : 
     681            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     682              : 
     683            2 :         assert_eq!(example, decoded);
     684              : 
     685            2 :         Ok(())
     686            2 :     }
     687              : 
     688            2 :     #[test]
     689            2 :     fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
     690            2 :         let example = TenantShardId {
     691            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     692            2 :             shard_count: ShardCount(10),
     693            2 :             shard_number: ShardNumber(7),
     694            2 :         };
     695            2 : 
     696            2 :         let encoded = bincode::serialize(&example).unwrap();
     697            2 :         let expected: [u8; 18] = [
     698            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     699            2 :             0xf6, 0xfc, 0x07, 0x0a,
     700            2 :         ];
     701            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     702              : 
     703            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     704            2 : 
     705            2 :         assert_eq!(example, decoded);
     706              : 
     707            2 :         Ok(())
     708            2 :     }
     709              : 
     710            2 :     #[test]
     711            2 :     fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
     712            2 :         // Test that TenantShardId can decode a TenantId in human
     713            2 :         // readable form
     714            2 :         let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     715            2 :         let encoded = format!("{example}");
     716            2 : 
     717            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     718              : 
     719            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     720              : 
     721            2 :         assert_eq!(example, decoded.tenant_id);
     722            2 :         assert_eq!(decoded.shard_count, ShardCount(0));
     723            2 :         assert_eq!(decoded.shard_number, ShardNumber(0));
     724              : 
     725            2 :         Ok(())
     726            2 :     }
     727              : 
     728            2 :     #[test]
     729            2 :     fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
     730            2 :         // Test that a legacy TenantShardId encodes into a form that
     731            2 :         // can be decoded as TenantId
     732            2 :         let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     733            2 :         let example = TenantShardId::unsharded(example_tenant_id);
     734            2 :         let encoded = format!("{example}");
     735            2 : 
     736            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     737              : 
     738            2 :         let decoded = TenantId::from_str(&encoded)?;
     739              : 
     740            2 :         assert_eq!(example_tenant_id, decoded);
     741              : 
     742            2 :         Ok(())
     743            2 :     }
     744              : 
     745            2 :     #[test]
     746            2 :     fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
     747            2 :         // Unlike in human readable encoding, binary encoding does not
     748            2 :         // do any special handling of legacy unsharded TenantIds: this test
     749            2 :         // is equivalent to the main test for binary encoding, just verifying
     750            2 :         // that the same behavior applies when we have used `unsharded()` to
     751            2 :         // construct a TenantShardId.
     752            2 :         let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
     753            2 :         let encoded = bincode::serialize(&example).unwrap();
     754            2 : 
     755            2 :         let expected: [u8; 18] = [
     756            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     757            2 :             0xf6, 0xfc, 0x00, 0x00,
     758            2 :         ];
     759            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     760              : 
     761            2 :         let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
     762            2 :         assert_eq!(example, decoded);
     763              : 
     764            2 :         Ok(())
     765            2 :     }
     766              : 
     767            2 :     #[test]
     768            2 :     fn shard_identity_validation() -> Result<(), ShardConfigError> {
     769            2 :         // Happy cases
     770            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
     771            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
     772            2 :         ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
     773              : 
     774            2 :         assert_eq!(
     775            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
     776            2 :             Err(ShardConfigError::InvalidCount)
     777            2 :         );
     778            2 :         assert_eq!(
     779            2 :             ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
     780            2 :             Err(ShardConfigError::InvalidNumber)
     781            2 :         );
     782            2 :         assert_eq!(
     783            2 :             ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
     784            2 :             Err(ShardConfigError::InvalidNumber)
     785            2 :         );
     786            2 :         assert_eq!(
     787            2 :             ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
     788            2 :             Err(ShardConfigError::InvalidNumber)
     789            2 :         );
     790            2 :         assert_eq!(
     791            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
     792            2 :             Err(ShardConfigError::InvalidStripeSize)
     793            2 :         );
     794              : 
     795            2 :         Ok(())
     796            2 :     }
     797              : 
     798            2 :     #[test]
     799            2 :     fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
     800            2 :         let example = ShardIndex {
     801            2 :             shard_number: ShardNumber(13),
     802            2 :             shard_count: ShardCount(17),
     803            2 :         };
     804            2 :         let expected: String = "0d11".to_string();
     805            2 :         let encoded = format!("{example}");
     806            2 :         assert_eq!(&encoded, &expected);
     807              : 
     808            2 :         let decoded = ShardIndex::from_str(&encoded)?;
     809            2 :         assert_eq!(example, decoded);
     810            2 :         Ok(())
     811            2 :     }
     812              : 
     813            2 :     #[test]
     814            2 :     fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
     815            2 :         let example = ShardIndex {
     816            2 :             shard_number: ShardNumber(13),
     817            2 :             shard_count: ShardCount(17),
     818            2 :         };
     819            2 :         let expected: [u8; 2] = [0x0d, 0x11];
     820            2 : 
     821            2 :         let encoded = bincode::serialize(&example).unwrap();
     822            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     823            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     824            2 :         assert_eq!(example, decoded);
     825              : 
     826            2 :         Ok(())
     827            2 :     }
     828              : 
     829              :     // These are only smoke tests to spot check that our implementation doesn't
     830              :     // deviate from a few examples values: not aiming to validate the overall
     831              :     // hashing algorithm.
     832            2 :     #[test]
     833            2 :     fn murmur_hash() {
     834            2 :         assert_eq!(murmurhash32(0), 0);
     835              : 
     836            2 :         assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
     837            2 :     }
     838              : 
     839            2 :     #[test]
     840            2 :     fn shard_mapping() {
     841            2 :         let key = Key {
     842            2 :             field1: 0x00,
     843            2 :             field2: 0x67f,
     844            2 :             field3: 0x5,
     845            2 :             field4: 0x400c,
     846            2 :             field5: 0x00,
     847            2 :             field6: 0x7d06,
     848            2 :         };
     849            2 : 
     850            2 :         let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
     851            2 :         assert_eq!(shard, ShardNumber(8));
     852            2 :     }
     853              : 
     854            2 :     #[test]
     855            2 :     fn shard_id_split() {
     856            2 :         let tenant_id = TenantId::generate();
     857            2 :         let parent = TenantShardId::unsharded(tenant_id);
     858            2 : 
     859            2 :         // Unsharded into 2
     860            2 :         assert_eq!(
     861            2 :             parent.split(ShardCount(2)),
     862            2 :             vec![
     863            2 :                 TenantShardId {
     864            2 :                     tenant_id,
     865            2 :                     shard_count: ShardCount(2),
     866            2 :                     shard_number: ShardNumber(0)
     867            2 :                 },
     868            2 :                 TenantShardId {
     869            2 :                     tenant_id,
     870            2 :                     shard_count: ShardCount(2),
     871            2 :                     shard_number: ShardNumber(1)
     872            2 :                 }
     873            2 :             ]
     874            2 :         );
     875              : 
     876              :         // Unsharded into 4
     877            2 :         assert_eq!(
     878            2 :             parent.split(ShardCount(4)),
     879            2 :             vec![
     880            2 :                 TenantShardId {
     881            2 :                     tenant_id,
     882            2 :                     shard_count: ShardCount(4),
     883            2 :                     shard_number: ShardNumber(0)
     884            2 :                 },
     885            2 :                 TenantShardId {
     886            2 :                     tenant_id,
     887            2 :                     shard_count: ShardCount(4),
     888            2 :                     shard_number: ShardNumber(1)
     889            2 :                 },
     890            2 :                 TenantShardId {
     891            2 :                     tenant_id,
     892            2 :                     shard_count: ShardCount(4),
     893            2 :                     shard_number: ShardNumber(2)
     894            2 :                 },
     895            2 :                 TenantShardId {
     896            2 :                     tenant_id,
     897            2 :                     shard_count: ShardCount(4),
     898            2 :                     shard_number: ShardNumber(3)
     899            2 :                 }
     900            2 :             ]
     901            2 :         );
     902              : 
     903              :         // count=1 into 2 (check this works the same as unsharded.)
     904            2 :         let parent = TenantShardId {
     905            2 :             tenant_id,
     906            2 :             shard_count: ShardCount(1),
     907            2 :             shard_number: ShardNumber(0),
     908            2 :         };
     909            2 :         assert_eq!(
     910            2 :             parent.split(ShardCount(2)),
     911            2 :             vec![
     912            2 :                 TenantShardId {
     913            2 :                     tenant_id,
     914            2 :                     shard_count: ShardCount(2),
     915            2 :                     shard_number: ShardNumber(0)
     916            2 :                 },
     917            2 :                 TenantShardId {
     918            2 :                     tenant_id,
     919            2 :                     shard_count: ShardCount(2),
     920            2 :                     shard_number: ShardNumber(1)
     921            2 :                 }
     922            2 :             ]
     923            2 :         );
     924              : 
     925              :         // count=2 into count=8
     926            2 :         let parent = TenantShardId {
     927            2 :             tenant_id,
     928            2 :             shard_count: ShardCount(2),
     929            2 :             shard_number: ShardNumber(1),
     930            2 :         };
     931            2 :         assert_eq!(
     932            2 :             parent.split(ShardCount(8)),
     933            2 :             vec![
     934            2 :                 TenantShardId {
     935            2 :                     tenant_id,
     936            2 :                     shard_count: ShardCount(8),
     937            2 :                     shard_number: ShardNumber(1)
     938            2 :                 },
     939            2 :                 TenantShardId {
     940            2 :                     tenant_id,
     941            2 :                     shard_count: ShardCount(8),
     942            2 :                     shard_number: ShardNumber(3)
     943            2 :                 },
     944            2 :                 TenantShardId {
     945            2 :                     tenant_id,
     946            2 :                     shard_count: ShardCount(8),
     947            2 :                     shard_number: ShardNumber(5)
     948            2 :                 },
     949            2 :                 TenantShardId {
     950            2 :                     tenant_id,
     951            2 :                     shard_count: ShardCount(8),
     952            2 :                     shard_number: ShardNumber(7)
     953            2 :                 },
     954            2 :             ]
     955            2 :         );
     956            2 :     }
     957              : }
        

Generated by: LCOV version 2.1-beta