LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - shard.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 87.8 % 623 547
Test Date: 2024-04-18 15:32:49 Functions: 40.4 % 136 55

            Line data    Source code
       1              : use std::{ops::RangeInclusive, str::FromStr};
       2              : 
       3              : use crate::{
       4              :     key::{is_rel_block_key, Key},
       5              :     models::ShardParameters,
       6              : };
       7              : use hex::FromHex;
       8              : use serde::{Deserialize, Serialize};
       9              : use utils::id::TenantId;
      10              : 
      11              : /// See docs/rfcs/031-sharding-static.md for an overview of sharding.
      12              : ///
      13              : /// This module contains a variety of types used to represent the concept of sharding
      14              : /// a Neon tenant across multiple physical shards.  Since there are quite a few of these,
      15              : /// we provide an summary here.
      16              : ///
      17              : /// Types used to describe shards:
      18              : /// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
      19              : ///   which identifies a tenant which is not shard-aware.  This means its storage paths do not include
      20              : ///   a shard suffix.
      21              : /// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
      22              : /// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
      23              : ///   without the tenant ID.  This is useful for things that are implicitly scoped to a particular
      24              : ///   tenant, such as layer files.
      25              : /// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
      26              : ///   detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
      27              : /// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
      28              : ///   four hex digits.  An unsharded tenant is `0000`.
      29              : /// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
      30              : ///
      31              : /// Types used to describe the parameters for data distribution in a sharded tenant:
      32              : /// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
      33              : ///   multiple shards.  Its value is given in 8kiB pages.
      34              : /// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
      35              : ///   always zero: this is provided for future upgrades that might introduce different
      36              : ///   data distribution schemes.
      37              : ///
      38              : /// Examples:
      39              : /// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
      40              : /// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
      41              : /// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
      42              : ///   and their slugs are 0004, 0104, 0204, and 0304.
      43              : 
      44            0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      45              : pub struct ShardNumber(pub u8);
      46              : 
      47            0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      48              : pub struct ShardCount(u8);
      49              : 
      50              : /// Combination of ShardNumber and ShardCount.  For use within the context of a particular tenant,
      51              : /// when we need to know which shard we're dealing with, but do not need to know the full
      52              : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
      53              : /// the fully qualified TenantShardId.
      54              : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      55              : pub struct ShardIndex {
      56              :     pub shard_number: ShardNumber,
      57              :     pub shard_count: ShardCount,
      58              : }
      59              : 
      60              : /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
      61              : /// and to check whether that [`ShardNumber`] is the same as the current shard.
      62            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
      63              : pub struct ShardIdentity {
      64              :     pub number: ShardNumber,
      65              :     pub count: ShardCount,
      66              :     pub stripe_size: ShardStripeSize,
      67              :     layout: ShardLayout,
      68              : }
      69              : 
      70              : /// Formatting helper, for generating the `shard_id` label in traces.
      71              : struct ShardSlug<'a>(&'a TenantShardId);
      72              : 
      73              : /// TenantShardId globally identifies a particular shard in a particular tenant.
      74              : ///
      75              : /// These are written as `<TenantId>-<ShardSlug>`, for example:
      76              : ///   # The second shard in a two-shard tenant
      77              : ///   072f1291a5310026820b2fe4b2968934-0102
      78              : ///
      79              : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
      80              : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
      81              : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
      82              : ///
      83              : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
      84              : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
      85              : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      86              : /// as a TenantId.
      87              : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      88              : pub struct TenantShardId {
      89              :     pub tenant_id: TenantId,
      90              :     pub shard_number: ShardNumber,
      91              :     pub shard_count: ShardCount,
      92              : }
      93              : 
      94              : impl ShardCount {
      95              :     pub const MAX: Self = Self(u8::MAX);
      96              : 
      97              :     /// The internal value of a ShardCount may be zero, which means "1 shard, but use
      98              :     /// legacy format for TenantShardId that excludes the shard suffix", also known
      99              :     /// as `TenantShardId::unsharded`.
     100              :     ///
     101              :     /// This method returns the actual number of shards, i.e. if our internal value is
     102              :     /// zero, we return 1 (unsharded tenants have 1 shard).
     103          772 :     pub fn count(&self) -> u8 {
     104          772 :         if self.0 > 0 {
     105           10 :             self.0
     106              :         } else {
     107          762 :             1
     108              :         }
     109          772 :     }
     110              : 
     111              :     /// The literal internal value: this is **not** the number of shards in the
     112              :     /// tenant, as we have a special zero value for legacy unsharded tenants.  Use
     113              :     /// [`Self::count`] if you want to know the cardinality of shards.
     114            4 :     pub fn literal(&self) -> u8 {
     115            4 :         self.0
     116            4 :     }
     117              : 
     118              :     ///
     119            0 :     pub fn is_unsharded(&self) -> bool {
     120            0 :         self.0 == 0
     121            0 :     }
     122              : 
     123              :     /// `v` may be zero, or the number of shards in the tenant.  `v` is what
     124              :     /// [`Self::literal`] would return.
     125          132 :     pub fn new(val: u8) -> Self {
     126          132 :         Self(val)
     127          132 :     }
     128              : }
     129              : 
     130              : impl ShardNumber {
     131              :     pub const MAX: Self = Self(u8::MAX);
     132              : }
     133              : 
     134              : impl TenantShardId {
     135          144 :     pub fn unsharded(tenant_id: TenantId) -> Self {
     136          144 :         Self {
     137          144 :             tenant_id,
     138          144 :             shard_number: ShardNumber(0),
     139          144 :             shard_count: ShardCount(0),
     140          144 :         }
     141          144 :     }
     142              : 
     143              :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
     144              :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
     145            0 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
     146            0 :         RangeInclusive::new(
     147            0 :             Self {
     148            0 :                 tenant_id,
     149            0 :                 shard_number: ShardNumber(0),
     150            0 :                 shard_count: ShardCount(0),
     151            0 :             },
     152            0 :             Self {
     153            0 :                 tenant_id,
     154            0 :                 shard_number: ShardNumber::MAX,
     155            0 :                 shard_count: ShardCount::MAX,
     156            0 :             },
     157            0 :         )
     158            0 :     }
     159              : 
     160         7369 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
     161         7369 :         ShardSlug(self)
     162         7369 :     }
     163              : 
     164              :     /// Convenience for code that has special behavior on the 0th shard.
     165            6 :     pub fn is_shard_zero(&self) -> bool {
     166            6 :         self.shard_number == ShardNumber(0)
     167            6 :     }
     168              : 
     169              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     170              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     171              :     /// a shard suffix.
     172            0 :     pub fn is_unsharded(&self) -> bool {
     173            0 :         self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
     174            0 :     }
     175              : 
     176              :     /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
     177              :     /// is useful when logging from code that is already in a span that includes tenant ID, to
     178              :     /// keep messages reasonably terse.
     179            0 :     pub fn to_index(&self) -> ShardIndex {
     180            0 :         ShardIndex {
     181            0 :             shard_number: self.shard_number,
     182            0 :             shard_count: self.shard_count,
     183            0 :         }
     184            0 :     }
     185              : 
     186              :     /// Calculate the children of this TenantShardId when splitting the overall tenant into
     187              :     /// the given number of shards.
     188            8 :     pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
     189            8 :         let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
     190            8 :         let mut child_shards = Vec::new();
     191           32 :         for shard_number in 0..ShardNumber(new_shard_count.0).0 {
     192              :             // Key mapping is based on a round robin mapping of key hash modulo shard count,
     193              :             // so our child shards are the ones which the same keys would map to.
     194           32 :             if shard_number % effective_old_shard_count == self.shard_number.0 {
     195           24 :                 child_shards.push(TenantShardId {
     196           24 :                     tenant_id: self.tenant_id,
     197           24 :                     shard_number: ShardNumber(shard_number),
     198           24 :                     shard_count: new_shard_count,
     199           24 :                 })
     200            8 :             }
     201              :         }
     202              : 
     203            8 :         child_shards
     204            8 :     }
     205              : }
     206              : 
     207              : impl<'a> std::fmt::Display for ShardSlug<'a> {
     208         7369 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     209         7369 :         write!(
     210         7369 :             f,
     211         7369 :             "{:02x}{:02x}",
     212         7369 :             self.0.shard_number.0, self.0.shard_count.0
     213         7369 :         )
     214         7369 :     }
     215              : }
     216              : 
     217              : impl std::fmt::Display for TenantShardId {
     218         5695 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     219         5695 :         if self.shard_count != ShardCount(0) {
     220            2 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     221              :         } else {
     222              :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     223              :             // is distinct from the normal single shard case (shard count == 1).
     224         5693 :             self.tenant_id.fmt(f)
     225              :         }
     226         5695 :     }
     227              : }
     228              : 
     229              : impl std::fmt::Debug for TenantShardId {
     230            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     231            0 :         // Debug is the same as Display: the compact hex representation
     232            0 :         write!(f, "{}", self)
     233            0 :     }
     234              : }
     235              : 
     236              : impl std::str::FromStr for TenantShardId {
     237              :     type Err = hex::FromHexError;
     238              : 
     239         2429 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     240         2429 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     241         2429 :         if s.len() == 32 {
     242              :             // Legacy case: no shard specified
     243              :             Ok(Self {
     244         2425 :                 tenant_id: TenantId::from_str(s)?,
     245         2425 :                 shard_number: ShardNumber(0),
     246         2425 :                 shard_count: ShardCount(0),
     247              :             })
     248            4 :         } else if s.len() == 37 {
     249            4 :             let bytes = s.as_bytes();
     250            4 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     251            4 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     252            4 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     253            4 :             Ok(Self {
     254            4 :                 tenant_id,
     255            4 :                 shard_number: ShardNumber(shard_parts[0]),
     256            4 :                 shard_count: ShardCount(shard_parts[1]),
     257            4 :             })
     258              :         } else {
     259            0 :             Err(hex::FromHexError::InvalidStringLength)
     260              :         }
     261         2429 :     }
     262              : }
     263              : 
     264              : impl From<[u8; 18]> for TenantShardId {
     265            4 :     fn from(b: [u8; 18]) -> Self {
     266            4 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     267            4 : 
     268            4 :         Self {
     269            4 :             tenant_id: TenantId::from(tenant_id_bytes),
     270            4 :             shard_number: ShardNumber(b[16]),
     271            4 :             shard_count: ShardCount(b[17]),
     272            4 :         }
     273            4 :     }
     274              : }
     275              : 
     276              : impl ShardIndex {
     277            0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     278            0 :         Self {
     279            0 :             shard_number: number,
     280            0 :             shard_count: count,
     281            0 :         }
     282            0 :     }
     283          170 :     pub fn unsharded() -> Self {
     284          170 :         Self {
     285          170 :             shard_number: ShardNumber(0),
     286          170 :             shard_count: ShardCount(0),
     287          170 :         }
     288          170 :     }
     289              : 
     290              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     291              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     292              :     /// a shard suffix.
     293         8644 :     pub fn is_unsharded(&self) -> bool {
     294         8644 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     295         8644 :     }
     296              : 
     297              :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     298              :     /// to get a fully qualified TenantShardId.
     299              :     ///
     300              :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     301              :     /// that the legacy pre-sharding remote key format is preserved.
     302           18 :     pub fn get_suffix(&self) -> String {
     303           18 :         if self.is_unsharded() {
     304           18 :             "".to_string()
     305              :         } else {
     306            0 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     307              :         }
     308           18 :     }
     309              : }
     310              : 
     311              : impl std::fmt::Display for ShardIndex {
     312         1410 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     313         1410 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     314         1410 :     }
     315              : }
     316              : 
     317              : impl std::fmt::Debug for ShardIndex {
     318          956 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     319          956 :         // Debug is the same as Display: the compact hex representation
     320          956 :         write!(f, "{}", self)
     321          956 :     }
     322              : }
     323              : 
     324              : impl std::str::FromStr for ShardIndex {
     325              :     type Err = hex::FromHexError;
     326              : 
     327            2 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     328            2 :         // Expect format: 1 byte shard number, 1 byte shard count
     329            2 :         if s.len() == 4 {
     330            2 :             let bytes = s.as_bytes();
     331            2 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     332            2 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     333            2 :             Ok(Self {
     334            2 :                 shard_number: ShardNumber(shard_parts[0]),
     335            2 :                 shard_count: ShardCount(shard_parts[1]),
     336            2 :             })
     337              :         } else {
     338            0 :             Err(hex::FromHexError::InvalidStringLength)
     339              :         }
     340            2 :     }
     341              : }
     342              : 
     343              : impl From<[u8; 2]> for ShardIndex {
     344            2 :     fn from(b: [u8; 2]) -> Self {
     345            2 :         Self {
     346            2 :             shard_number: ShardNumber(b[0]),
     347            2 :             shard_count: ShardCount(b[1]),
     348            2 :         }
     349            2 :     }
     350              : }
     351              : 
     352              : impl Serialize for TenantShardId {
     353           66 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     354           66 :     where
     355           66 :         S: serde::Serializer,
     356           66 :     {
     357           66 :         if serializer.is_human_readable() {
     358           58 :             serializer.collect_str(self)
     359              :         } else {
     360              :             // Note: while human encoding of [`TenantShardId`] is backward and forward
     361              :             // compatible, this binary encoding is not.
     362            8 :             let mut packed: [u8; 18] = [0; 18];
     363            8 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     364            8 :             packed[16] = self.shard_number.0;
     365            8 :             packed[17] = self.shard_count.0;
     366            8 : 
     367            8 :             packed.serialize(serializer)
     368              :         }
     369           66 :     }
     370              : }
     371              : 
     372              : impl<'de> Deserialize<'de> for TenantShardId {
     373           12 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     374           12 :     where
     375           12 :         D: serde::Deserializer<'de>,
     376           12 :     {
     377           12 :         struct IdVisitor {
     378           12 :             is_human_readable_deserializer: bool,
     379           12 :         }
     380           12 : 
     381           12 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     382           12 :             type Value = TenantShardId;
     383           12 : 
     384           12 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     385            0 :                 if self.is_human_readable_deserializer {
     386           12 :                     formatter.write_str("value in form of hex string")
     387           12 :                 } else {
     388           12 :                     formatter.write_str("value in form of integer array([u8; 18])")
     389           12 :                 }
     390           12 :             }
     391           12 : 
     392           12 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     393            4 :             where
     394            4 :                 A: serde::de::SeqAccess<'de>,
     395            4 :             {
     396            4 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     397           12 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     398           12 :                 Ok(TenantShardId::from(id))
     399           12 :             }
     400           12 : 
     401           12 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     402            8 :             where
     403            8 :                 E: serde::de::Error,
     404            8 :             {
     405            8 :                 TenantShardId::from_str(v).map_err(E::custom)
     406            8 :             }
     407           12 :         }
     408           12 : 
     409           12 :         if deserializer.is_human_readable() {
     410            8 :             deserializer.deserialize_str(IdVisitor {
     411            8 :                 is_human_readable_deserializer: true,
     412            8 :             })
     413              :         } else {
     414            4 :             deserializer.deserialize_tuple(
     415            4 :                 18,
     416            4 :                 IdVisitor {
     417            4 :                     is_human_readable_deserializer: false,
     418            4 :                 },
     419            4 :             )
     420              :         }
     421           12 :     }
     422              : }
     423              : 
     424              : /// Stripe size in number of pages
     425            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     426              : pub struct ShardStripeSize(pub u32);
     427              : 
     428              : /// Layout version: for future upgrades where we might change how the key->shard mapping works
     429            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     430              : pub struct ShardLayout(u8);
     431              : 
     432              : const LAYOUT_V1: ShardLayout = ShardLayout(1);
     433              : /// ShardIdentity uses a magic layout value to indicate if it is unusable
     434              : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
     435              : 
     436              : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
     437              : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
     438              : 
     439            0 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
     440              : pub enum ShardConfigError {
     441              :     #[error("Invalid shard count")]
     442              :     InvalidCount,
     443              :     #[error("Invalid shard number")]
     444              :     InvalidNumber,
     445              :     #[error("Invalid stripe size")]
     446              :     InvalidStripeSize,
     447              : }
     448              : 
     449              : impl ShardIdentity {
     450              :     /// An identity with number=0 count=0 is a "none" identity, which represents legacy
     451              :     /// tenants.  Modern single-shard tenants should not use this: they should
     452              :     /// have number=0 count=1.
     453          108 :     pub fn unsharded() -> Self {
     454          108 :         Self {
     455          108 :             number: ShardNumber(0),
     456          108 :             count: ShardCount(0),
     457          108 :             layout: LAYOUT_V1,
     458          108 :             stripe_size: DEFAULT_STRIPE_SIZE,
     459          108 :         }
     460          108 :     }
     461              : 
     462              :     /// A broken instance of this type is only used for `TenantState::Broken` tenants,
     463              :     /// which are constructed in code paths that don't have access to proper configuration.
     464              :     ///
     465              :     /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
     466              :     /// Enforcement is via assertions, to avoid making our interface fallible for this
     467              :     /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
     468              :     /// state, and by extension to avoid trying to do any page->shard resolution.
     469            0 :     pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
     470            0 :         Self {
     471            0 :             number,
     472            0 :             count,
     473            0 :             layout: LAYOUT_BROKEN,
     474            0 :             stripe_size: DEFAULT_STRIPE_SIZE,
     475            0 :         }
     476            0 :     }
     477              : 
     478              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     479              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     480              :     /// a shard suffix.
     481            0 :     pub fn is_unsharded(&self) -> bool {
     482            0 :         self.number == ShardNumber(0) && self.count == ShardCount(0)
     483            0 :     }
     484              : 
     485              :     /// Count must be nonzero, and number must be < count. To construct
     486              :     /// the legacy case (count==0), use Self::unsharded instead.
     487           38 :     pub fn new(
     488           38 :         number: ShardNumber,
     489           38 :         count: ShardCount,
     490           38 :         stripe_size: ShardStripeSize,
     491           38 :     ) -> Result<Self, ShardConfigError> {
     492           38 :         if count.0 == 0 {
     493            2 :             Err(ShardConfigError::InvalidCount)
     494           36 :         } else if number.0 > count.0 - 1 {
     495            6 :             Err(ShardConfigError::InvalidNumber)
     496           30 :         } else if stripe_size.0 == 0 {
     497            2 :             Err(ShardConfigError::InvalidStripeSize)
     498              :         } else {
     499           28 :             Ok(Self {
     500           28 :                 number,
     501           28 :                 count,
     502           28 :                 layout: LAYOUT_V1,
     503           28 :                 stripe_size,
     504           28 :             })
     505              :         }
     506           38 :     }
     507              : 
     508              :     /// For use when creating ShardIdentity instances for new shards, where a creation request
     509              :     /// specifies the ShardParameters that apply to all shards.
     510          108 :     pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
     511          108 :         Self {
     512          108 :             number,
     513          108 :             count: params.count,
     514          108 :             layout: LAYOUT_V1,
     515          108 :             stripe_size: params.stripe_size,
     516          108 :         }
     517          108 :     }
     518              : 
     519       172470 :     fn is_broken(&self) -> bool {
     520       172470 :         self.layout == LAYOUT_BROKEN
     521       172470 :     }
     522              : 
     523         3004 :     pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
     524         3004 :         assert!(!self.is_broken());
     525         3004 :         key_to_shard_number(self.count, self.stripe_size, key)
     526         3004 :     }
     527              : 
     528              :     /// Return true if the key should be ingested by this shard
     529              :     ///
     530              :     /// Shards must ingest _at least_ keys which return true from this check.
     531       169466 :     pub fn is_key_local(&self, key: &Key) -> bool {
     532       169466 :         assert!(!self.is_broken());
     533       169466 :         if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
     534       169466 :             true
     535              :         } else {
     536            0 :             key_to_shard_number(self.count, self.stripe_size, key) == self.number
     537              :         }
     538       169466 :     }
     539              : 
     540              :     /// Return true if the key should be discarded if found in this shard's
     541              :     /// data store, e.g. during compaction after a split.
     542              :     ///
     543              :     /// Shards _may_ drop keys which return false here, but are not obliged to.
     544      3626335 :     pub fn is_key_disposable(&self, key: &Key) -> bool {
     545      3626335 :         if key_is_shard0(key) {
     546              :             // Q: Why can't we dispose of shard0 content if we're not shard 0?
     547              :             // A1: because the WAL ingestion logic currently ingests some shard 0
     548              :             //     content on all shards, even though it's only read on shard 0.  If we
     549              :             //     dropped it, then subsequent WAL ingest to these keys would encounter
     550              :             //     an error.
     551              :             // A2: because key_is_shard0 also covers relation size keys, which are written
     552              :             //     on all shards even though they're only maintained accurately on shard 0.
     553      3607951 :             false
     554              :         } else {
     555        18384 :             !self.is_key_local(key)
     556              :         }
     557      3626335 :     }
     558              : 
     559            0 :     pub fn shard_slug(&self) -> String {
     560            0 :         if self.count > ShardCount(0) {
     561            0 :             format!("-{:02x}{:02x}", self.number.0, self.count.0)
     562              :         } else {
     563            0 :             String::new()
     564              :         }
     565            0 :     }
     566              : 
     567              :     /// Convenience for checking if this identity is the 0th shard in a tenant,
     568              :     /// for special cases on shard 0 such as ingesting relation sizes.
     569            0 :     pub fn is_shard_zero(&self) -> bool {
     570            0 :         self.number == ShardNumber(0)
     571            0 :     }
     572              : }
     573              : 
     574              : impl Serialize for ShardIndex {
     575            4 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     576            4 :     where
     577            4 :         S: serde::Serializer,
     578            4 :     {
     579            4 :         if serializer.is_human_readable() {
     580            0 :             serializer.collect_str(self)
     581              :         } else {
     582              :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     583              :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     584              :             // compact binary encodings in future.
     585            4 :             let mut packed: [u8; 2] = [0; 2];
     586            4 :             packed[0] = self.shard_number.0;
     587            4 :             packed[1] = self.shard_count.0;
     588            4 :             packed.serialize(serializer)
     589              :         }
     590            4 :     }
     591              : }
     592              : 
     593              : impl<'de> Deserialize<'de> for ShardIndex {
     594            2 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     595            2 :     where
     596            2 :         D: serde::Deserializer<'de>,
     597            2 :     {
     598            2 :         struct IdVisitor {
     599            2 :             is_human_readable_deserializer: bool,
     600            2 :         }
     601            2 : 
     602            2 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     603            2 :             type Value = ShardIndex;
     604            2 : 
     605            2 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     606            0 :                 if self.is_human_readable_deserializer {
     607            2 :                     formatter.write_str("value in form of hex string")
     608            2 :                 } else {
     609            2 :                     formatter.write_str("value in form of integer array([u8; 2])")
     610            2 :                 }
     611            2 :             }
     612            2 : 
     613            2 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     614            2 :             where
     615            2 :                 A: serde::de::SeqAccess<'de>,
     616            2 :             {
     617            2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     618            2 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     619            2 :                 Ok(ShardIndex::from(id))
     620            2 :             }
     621            2 : 
     622            2 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     623            0 :             where
     624            0 :                 E: serde::de::Error,
     625            0 :             {
     626            0 :                 ShardIndex::from_str(v).map_err(E::custom)
     627            0 :             }
     628            2 :         }
     629            2 : 
     630            2 :         if deserializer.is_human_readable() {
     631            0 :             deserializer.deserialize_str(IdVisitor {
     632            0 :                 is_human_readable_deserializer: true,
     633            0 :             })
     634              :         } else {
     635            2 :             deserializer.deserialize_tuple(
     636            2 :                 2,
     637            2 :                 IdVisitor {
     638            2 :                     is_human_readable_deserializer: false,
     639            2 :                 },
     640            2 :             )
     641              :         }
     642            2 :     }
     643              : }
     644              : 
     645              : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
     646              : /// in order to be able to serve basebackup requests without peer communication).
     647      3626337 : fn key_is_shard0(key: &Key) -> bool {
     648      3626337 :     // To decide what to shard out to shards >0, we apply a simple rule that only
     649      3626337 :     // relation pages are distributed to shards other than shard zero. Everything else gets
     650      3626337 :     // stored on shard 0.  This guarantees that shard 0 can independently serve basebackup
     651      3626337 :     // requests, and any request other than those for particular blocks in relations.
     652      3626337 :     !is_rel_block_key(key)
     653      3626337 : }
     654              : 
     655              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     656            6 : fn murmurhash32(mut h: u32) -> u32 {
     657            6 :     h ^= h >> 16;
     658            6 :     h = h.wrapping_mul(0x85ebca6b);
     659            6 :     h ^= h >> 13;
     660            6 :     h = h.wrapping_mul(0xc2b2ae35);
     661            6 :     h ^= h >> 16;
     662            6 :     h
     663            6 : }
     664              : 
     665              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     666            4 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
     667            4 :     b = b.wrapping_add(0x9e3779b9);
     668            4 :     b = b.wrapping_add(a << 6);
     669            4 :     b = b.wrapping_add(a >> 2);
     670            4 : 
     671            4 :     a ^= b;
     672            4 :     a
     673            4 : }
     674              : 
     675              : /// Where a Key is to be distributed across shards, select the shard.  This function
     676              : /// does not account for keys that should be broadcast across shards.
     677              : ///
     678              : /// The hashing in this function must exactly match what we do in postgres smgr
     679              : /// code.  The resulting distribution of pages is intended to preserve locality within
     680              : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
     681              : /// distributing data pseudo-randomly.
     682              : ///
     683              : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
     684              : /// and will be handled at higher levels when shards are split.
     685         3006 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
     686         3006 :     // Fast path for un-sharded tenants or broadcast keys
     687         3006 :     if count < ShardCount(2) || key_is_shard0(key) {
     688         3004 :         return ShardNumber(0);
     689            2 :     }
     690            2 : 
     691            2 :     // relNode
     692            2 :     let mut hash = murmurhash32(key.field4);
     693            2 :     // blockNum/stripe size
     694            2 :     hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
     695            2 : 
     696            2 :     ShardNumber((hash % count.0 as u32) as u8)
     697         3006 : }
     698              : 
     699              : #[cfg(test)]
     700              : mod tests {
     701              :     use utils::Hex;
     702              : 
     703              :     use super::*;
     704              : 
     705              :     const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
     706              : 
     707              :     #[test]
     708            2 :     fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
     709            2 :         let example = TenantShardId {
     710            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     711            2 :             shard_count: ShardCount(10),
     712            2 :             shard_number: ShardNumber(7),
     713            2 :         };
     714            2 : 
     715            2 :         let encoded = format!("{example}");
     716            2 : 
     717            2 :         let expected = format!("{EXAMPLE_TENANT_ID}-070a");
     718            2 :         assert_eq!(&encoded, &expected);
     719              : 
     720            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     721              : 
     722            2 :         assert_eq!(example, decoded);
     723              : 
     724            2 :         Ok(())
     725            2 :     }
     726              : 
     727              :     #[test]
     728            2 :     fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
     729            2 :         let example = TenantShardId {
     730            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     731            2 :             shard_count: ShardCount(10),
     732            2 :             shard_number: ShardNumber(7),
     733            2 :         };
     734            2 : 
     735            2 :         let encoded = bincode::serialize(&example).unwrap();
     736            2 :         let expected: [u8; 18] = [
     737            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     738            2 :             0xf6, 0xfc, 0x07, 0x0a,
     739            2 :         ];
     740            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     741              : 
     742            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     743            2 : 
     744            2 :         assert_eq!(example, decoded);
     745              : 
     746            2 :         Ok(())
     747            2 :     }
     748              : 
     749              :     #[test]
     750            2 :     fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
     751            2 :         // Test that TenantShardId can decode a TenantId in human
     752            2 :         // readable form
     753            2 :         let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     754            2 :         let encoded = format!("{example}");
     755            2 : 
     756            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     757              : 
     758            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     759              : 
     760            2 :         assert_eq!(example, decoded.tenant_id);
     761            2 :         assert_eq!(decoded.shard_count, ShardCount(0));
     762            2 :         assert_eq!(decoded.shard_number, ShardNumber(0));
     763              : 
     764            2 :         Ok(())
     765            2 :     }
     766              : 
     767              :     #[test]
     768            2 :     fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
     769            2 :         // Test that a legacy TenantShardId encodes into a form that
     770            2 :         // can be decoded as TenantId
     771            2 :         let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     772            2 :         let example = TenantShardId::unsharded(example_tenant_id);
     773            2 :         let encoded = format!("{example}");
     774            2 : 
     775            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     776              : 
     777            2 :         let decoded = TenantId::from_str(&encoded)?;
     778              : 
     779            2 :         assert_eq!(example_tenant_id, decoded);
     780              : 
     781            2 :         Ok(())
     782            2 :     }
     783              : 
     784              :     #[test]
     785            2 :     fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
     786            2 :         // Unlike in human readable encoding, binary encoding does not
     787            2 :         // do any special handling of legacy unsharded TenantIds: this test
     788            2 :         // is equivalent to the main test for binary encoding, just verifying
     789            2 :         // that the same behavior applies when we have used `unsharded()` to
     790            2 :         // construct a TenantShardId.
     791            2 :         let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
     792            2 :         let encoded = bincode::serialize(&example).unwrap();
     793            2 : 
     794            2 :         let expected: [u8; 18] = [
     795            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     796            2 :             0xf6, 0xfc, 0x00, 0x00,
     797            2 :         ];
     798            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     799              : 
     800            2 :         let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
     801            2 :         assert_eq!(example, decoded);
     802              : 
     803            2 :         Ok(())
     804            2 :     }
     805              : 
     806              :     #[test]
     807            2 :     fn shard_identity_validation() -> Result<(), ShardConfigError> {
     808            2 :         // Happy cases
     809            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
     810            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
     811            2 :         ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
     812              : 
     813            2 :         assert_eq!(
     814            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
     815            2 :             Err(ShardConfigError::InvalidCount)
     816            2 :         );
     817            2 :         assert_eq!(
     818            2 :             ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
     819            2 :             Err(ShardConfigError::InvalidNumber)
     820            2 :         );
     821            2 :         assert_eq!(
     822            2 :             ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
     823            2 :             Err(ShardConfigError::InvalidNumber)
     824            2 :         );
     825            2 :         assert_eq!(
     826            2 :             ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
     827            2 :             Err(ShardConfigError::InvalidNumber)
     828            2 :         );
     829            2 :         assert_eq!(
     830            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
     831            2 :             Err(ShardConfigError::InvalidStripeSize)
     832            2 :         );
     833              : 
     834            2 :         Ok(())
     835            2 :     }
     836              : 
     837              :     #[test]
     838            2 :     fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
     839            2 :         let example = ShardIndex {
     840            2 :             shard_number: ShardNumber(13),
     841            2 :             shard_count: ShardCount(17),
     842            2 :         };
     843            2 :         let expected: String = "0d11".to_string();
     844            2 :         let encoded = format!("{example}");
     845            2 :         assert_eq!(&encoded, &expected);
     846              : 
     847            2 :         let decoded = ShardIndex::from_str(&encoded)?;
     848            2 :         assert_eq!(example, decoded);
     849            2 :         Ok(())
     850            2 :     }
     851              : 
     852              :     #[test]
     853            2 :     fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
     854            2 :         let example = ShardIndex {
     855            2 :             shard_number: ShardNumber(13),
     856            2 :             shard_count: ShardCount(17),
     857            2 :         };
     858            2 :         let expected: [u8; 2] = [0x0d, 0x11];
     859            2 : 
     860            2 :         let encoded = bincode::serialize(&example).unwrap();
     861            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     862            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     863            2 :         assert_eq!(example, decoded);
     864              : 
     865            2 :         Ok(())
     866            2 :     }
     867              : 
     868              :     // These are only smoke tests to spot check that our implementation doesn't
     869              :     // deviate from a few examples values: not aiming to validate the overall
     870              :     // hashing algorithm.
     871              :     #[test]
     872            2 :     fn murmur_hash() {
     873            2 :         assert_eq!(murmurhash32(0), 0);
     874              : 
     875            2 :         assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
     876            2 :     }
     877              : 
     878              :     #[test]
     879            2 :     fn shard_mapping() {
     880            2 :         let key = Key {
     881            2 :             field1: 0x00,
     882            2 :             field2: 0x67f,
     883            2 :             field3: 0x5,
     884            2 :             field4: 0x400c,
     885            2 :             field5: 0x00,
     886            2 :             field6: 0x7d06,
     887            2 :         };
     888            2 : 
     889            2 :         let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
     890            2 :         assert_eq!(shard, ShardNumber(8));
     891            2 :     }
     892              : 
     893              :     #[test]
     894            2 :     fn shard_id_split() {
     895            2 :         let tenant_id = TenantId::generate();
     896            2 :         let parent = TenantShardId::unsharded(tenant_id);
     897            2 : 
     898            2 :         // Unsharded into 2
     899            2 :         assert_eq!(
     900            2 :             parent.split(ShardCount(2)),
     901            2 :             vec![
     902            2 :                 TenantShardId {
     903            2 :                     tenant_id,
     904            2 :                     shard_count: ShardCount(2),
     905            2 :                     shard_number: ShardNumber(0)
     906            2 :                 },
     907            2 :                 TenantShardId {
     908            2 :                     tenant_id,
     909            2 :                     shard_count: ShardCount(2),
     910            2 :                     shard_number: ShardNumber(1)
     911            2 :                 }
     912            2 :             ]
     913            2 :         );
     914              : 
     915              :         // Unsharded into 4
     916            2 :         assert_eq!(
     917            2 :             parent.split(ShardCount(4)),
     918            2 :             vec![
     919            2 :                 TenantShardId {
     920            2 :                     tenant_id,
     921            2 :                     shard_count: ShardCount(4),
     922            2 :                     shard_number: ShardNumber(0)
     923            2 :                 },
     924            2 :                 TenantShardId {
     925            2 :                     tenant_id,
     926            2 :                     shard_count: ShardCount(4),
     927            2 :                     shard_number: ShardNumber(1)
     928            2 :                 },
     929            2 :                 TenantShardId {
     930            2 :                     tenant_id,
     931            2 :                     shard_count: ShardCount(4),
     932            2 :                     shard_number: ShardNumber(2)
     933            2 :                 },
     934            2 :                 TenantShardId {
     935            2 :                     tenant_id,
     936            2 :                     shard_count: ShardCount(4),
     937            2 :                     shard_number: ShardNumber(3)
     938            2 :                 }
     939            2 :             ]
     940            2 :         );
     941              : 
     942              :         // count=1 into 2 (check this works the same as unsharded.)
     943            2 :         let parent = TenantShardId {
     944            2 :             tenant_id,
     945            2 :             shard_count: ShardCount(1),
     946            2 :             shard_number: ShardNumber(0),
     947            2 :         };
     948            2 :         assert_eq!(
     949            2 :             parent.split(ShardCount(2)),
     950            2 :             vec![
     951            2 :                 TenantShardId {
     952            2 :                     tenant_id,
     953            2 :                     shard_count: ShardCount(2),
     954            2 :                     shard_number: ShardNumber(0)
     955            2 :                 },
     956            2 :                 TenantShardId {
     957            2 :                     tenant_id,
     958            2 :                     shard_count: ShardCount(2),
     959            2 :                     shard_number: ShardNumber(1)
     960            2 :                 }
     961            2 :             ]
     962            2 :         );
     963              : 
     964              :         // count=2 into count=8
     965            2 :         let parent = TenantShardId {
     966            2 :             tenant_id,
     967            2 :             shard_count: ShardCount(2),
     968            2 :             shard_number: ShardNumber(1),
     969            2 :         };
     970            2 :         assert_eq!(
     971            2 :             parent.split(ShardCount(8)),
     972            2 :             vec![
     973            2 :                 TenantShardId {
     974            2 :                     tenant_id,
     975            2 :                     shard_count: ShardCount(8),
     976            2 :                     shard_number: ShardNumber(1)
     977            2 :                 },
     978            2 :                 TenantShardId {
     979            2 :                     tenant_id,
     980            2 :                     shard_count: ShardCount(8),
     981            2 :                     shard_number: ShardNumber(3)
     982            2 :                 },
     983            2 :                 TenantShardId {
     984            2 :                     tenant_id,
     985            2 :                     shard_count: ShardCount(8),
     986            2 :                     shard_number: ShardNumber(5)
     987            2 :                 },
     988            2 :                 TenantShardId {
     989            2 :                     tenant_id,
     990            2 :                     shard_count: ShardCount(8),
     991            2 :                     shard_number: ShardNumber(7)
     992            2 :                 },
     993            2 :             ]
     994            2 :         );
     995            2 :     }
     996              : }
        

Generated by: LCOV version 2.1-beta