LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - shard.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 88.1 % 629 554
Test Date: 2024-05-10 13:18:37 Functions: 40.4 % 136 55

            Line data    Source code
       1              : use std::{ops::RangeInclusive, str::FromStr};
       2              : 
       3              : use crate::{
       4              :     key::{is_rel_block_key, Key},
       5              :     models::ShardParameters,
       6              : };
       7              : use hex::FromHex;
       8              : use postgres_ffi::relfile_utils::INIT_FORKNUM;
       9              : use serde::{Deserialize, Serialize};
      10              : use utils::id::TenantId;
      11              : 
      12              : /// See docs/rfcs/031-sharding-static.md for an overview of sharding.
      13              : ///
      14              : /// This module contains a variety of types used to represent the concept of sharding
      15              : /// a Neon tenant across multiple physical shards.  Since there are quite a few of these,
      16              : /// we provide an summary here.
      17              : ///
      18              : /// Types used to describe shards:
      19              : /// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
      20              : ///   which identifies a tenant which is not shard-aware.  This means its storage paths do not include
      21              : ///   a shard suffix.
      22              : /// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
      23              : /// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
      24              : ///   without the tenant ID.  This is useful for things that are implicitly scoped to a particular
      25              : ///   tenant, such as layer files.
      26              : /// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
      27              : ///   detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
      28              : /// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
      29              : ///   four hex digits.  An unsharded tenant is `0000`.
      30              : /// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
      31              : ///
      32              : /// Types used to describe the parameters for data distribution in a sharded tenant:
      33              : /// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
      34              : ///   multiple shards.  Its value is given in 8kiB pages.
      35              : /// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
      36              : ///   always zero: this is provided for future upgrades that might introduce different
      37              : ///   data distribution schemes.
      38              : ///
      39              : /// Examples:
      40              : /// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
      41              : /// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
      42              : /// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
      43              : ///   and their slugs are 0004, 0104, 0204, and 0304.
      44              : 
      45            0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      46              : pub struct ShardNumber(pub u8);
      47              : 
      48            0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      49              : pub struct ShardCount(u8);
      50              : 
      51              : /// Combination of ShardNumber and ShardCount.  For use within the context of a particular tenant,
      52              : /// when we need to know which shard we're dealing with, but do not need to know the full
      53              : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
      54              : /// the fully qualified TenantShardId.
      55              : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      56              : pub struct ShardIndex {
      57              :     pub shard_number: ShardNumber,
      58              :     pub shard_count: ShardCount,
      59              : }
      60              : 
      61              : /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
      62              : /// and to check whether that [`ShardNumber`] is the same as the current shard.
      63            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
      64              : pub struct ShardIdentity {
      65              :     pub number: ShardNumber,
      66              :     pub count: ShardCount,
      67              :     pub stripe_size: ShardStripeSize,
      68              :     layout: ShardLayout,
      69              : }
      70              : 
      71              : /// Formatting helper, for generating the `shard_id` label in traces.
      72              : struct ShardSlug<'a>(&'a TenantShardId);
      73              : 
      74              : /// TenantShardId globally identifies a particular shard in a particular tenant.
      75              : ///
      76              : /// These are written as `<TenantId>-<ShardSlug>`, for example:
      77              : ///   # The second shard in a two-shard tenant
      78              : ///   072f1291a5310026820b2fe4b2968934-0102
      79              : ///
      80              : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
      81              : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
      82              : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
      83              : ///
      84              : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
      85              : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
      86              : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      87              : /// as a TenantId.
      88              : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      89              : pub struct TenantShardId {
      90              :     pub tenant_id: TenantId,
      91              :     pub shard_number: ShardNumber,
      92              :     pub shard_count: ShardCount,
      93              : }
      94              : 
      95              : impl ShardCount {
      96              :     pub const MAX: Self = Self(u8::MAX);
      97              : 
      98              :     /// The internal value of a ShardCount may be zero, which means "1 shard, but use
      99              :     /// legacy format for TenantShardId that excludes the shard suffix", also known
     100              :     /// as [`TenantShardId::unsharded`].
     101              :     ///
     102              :     /// This method returns the actual number of shards, i.e. if our internal value is
     103              :     /// zero, we return 1 (unsharded tenants have 1 shard).
     104         1010 :     pub fn count(&self) -> u8 {
     105         1010 :         if self.0 > 0 {
     106           10 :             self.0
     107              :         } else {
     108         1000 :             1
     109              :         }
     110         1010 :     }
     111              : 
     112              :     /// The literal internal value: this is **not** the number of shards in the
     113              :     /// tenant, as we have a special zero value for legacy unsharded tenants.  Use
     114              :     /// [`Self::count`] if you want to know the cardinality of shards.
     115            4 :     pub fn literal(&self) -> u8 {
     116            4 :         self.0
     117            4 :     }
     118              : 
     119              :     /// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
     120              :     /// uses the legacy format for `TenantShardId`. See also the documentation for
     121              :     /// [`Self::count`].
     122            0 :     pub fn is_unsharded(&self) -> bool {
     123            0 :         self.0 == 0
     124            0 :     }
     125              : 
     126              :     /// `v` may be zero, or the number of shards in the tenant.  `v` is what
     127              :     /// [`Self::literal`] would return.
     128         6984 :     pub fn new(val: u8) -> Self {
     129         6984 :         Self(val)
     130         6984 :     }
     131              : }
     132              : 
     133              : impl ShardNumber {
     134              :     pub const MAX: Self = Self(u8::MAX);
     135              : }
     136              : 
     137              : impl TenantShardId {
     138          154 :     pub fn unsharded(tenant_id: TenantId) -> Self {
     139          154 :         Self {
     140          154 :             tenant_id,
     141          154 :             shard_number: ShardNumber(0),
     142          154 :             shard_count: ShardCount(0),
     143          154 :         }
     144          154 :     }
     145              : 
     146              :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
     147              :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
     148            0 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
     149            0 :         RangeInclusive::new(
     150            0 :             Self {
     151            0 :                 tenant_id,
     152            0 :                 shard_number: ShardNumber(0),
     153            0 :                 shard_count: ShardCount(0),
     154            0 :             },
     155            0 :             Self {
     156            0 :                 tenant_id,
     157            0 :                 shard_number: ShardNumber::MAX,
     158            0 :                 shard_count: ShardCount::MAX,
     159            0 :             },
     160            0 :         )
     161            0 :     }
     162              : 
     163         8452 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
     164         8452 :         ShardSlug(self)
     165         8452 :     }
     166              : 
     167              :     /// Convenience for code that has special behavior on the 0th shard.
     168            6 :     pub fn is_shard_zero(&self) -> bool {
     169            6 :         self.shard_number == ShardNumber(0)
     170            6 :     }
     171              : 
     172              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     173              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     174              :     /// a shard suffix.
     175            0 :     pub fn is_unsharded(&self) -> bool {
     176            0 :         self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
     177            0 :     }
     178              : 
     179              :     /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
     180              :     /// is useful when logging from code that is already in a span that includes tenant ID, to
     181              :     /// keep messages reasonably terse.
     182            0 :     pub fn to_index(&self) -> ShardIndex {
     183            0 :         ShardIndex {
     184            0 :             shard_number: self.shard_number,
     185            0 :             shard_count: self.shard_count,
     186            0 :         }
     187            0 :     }
     188              : 
     189              :     /// Calculate the children of this TenantShardId when splitting the overall tenant into
     190              :     /// the given number of shards.
     191            8 :     pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
     192            8 :         let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
     193            8 :         let mut child_shards = Vec::new();
     194           32 :         for shard_number in 0..ShardNumber(new_shard_count.0).0 {
     195              :             // Key mapping is based on a round robin mapping of key hash modulo shard count,
     196              :             // so our child shards are the ones which the same keys would map to.
     197           32 :             if shard_number % effective_old_shard_count == self.shard_number.0 {
     198           24 :                 child_shards.push(TenantShardId {
     199           24 :                     tenant_id: self.tenant_id,
     200           24 :                     shard_number: ShardNumber(shard_number),
     201           24 :                     shard_count: new_shard_count,
     202           24 :                 })
     203            8 :             }
     204              :         }
     205              : 
     206            8 :         child_shards
     207            8 :     }
     208              : }
     209              : 
     210              : impl<'a> std::fmt::Display for ShardSlug<'a> {
     211         8452 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     212         8452 :         write!(
     213         8452 :             f,
     214         8452 :             "{:02x}{:02x}",
     215         8452 :             self.0.shard_number.0, self.0.shard_count.0
     216         8452 :         )
     217         8452 :     }
     218              : }
     219              : 
     220              : impl std::fmt::Display for TenantShardId {
     221         6744 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     222         6744 :         if self.shard_count != ShardCount(0) {
     223            2 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     224              :         } else {
     225              :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     226              :             // is distinct from the normal single shard case (shard count == 1).
     227         6742 :             self.tenant_id.fmt(f)
     228              :         }
     229         6744 :     }
     230              : }
     231              : 
     232              : impl std::fmt::Debug for TenantShardId {
     233            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     234            0 :         // Debug is the same as Display: the compact hex representation
     235            0 :         write!(f, "{}", self)
     236            0 :     }
     237              : }
     238              : 
     239              : impl std::str::FromStr for TenantShardId {
     240              :     type Err = hex::FromHexError;
     241              : 
     242         3099 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     243         3099 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     244         3099 :         if s.len() == 32 {
     245              :             // Legacy case: no shard specified
     246              :             Ok(Self {
     247         3095 :                 tenant_id: TenantId::from_str(s)?,
     248         3095 :                 shard_number: ShardNumber(0),
     249         3095 :                 shard_count: ShardCount(0),
     250              :             })
     251            4 :         } else if s.len() == 37 {
     252            4 :             let bytes = s.as_bytes();
     253            4 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     254            4 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     255            4 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     256            4 :             Ok(Self {
     257            4 :                 tenant_id,
     258            4 :                 shard_number: ShardNumber(shard_parts[0]),
     259            4 :                 shard_count: ShardCount(shard_parts[1]),
     260            4 :             })
     261              :         } else {
     262            0 :             Err(hex::FromHexError::InvalidStringLength)
     263              :         }
     264         3099 :     }
     265              : }
     266              : 
     267              : impl From<[u8; 18]> for TenantShardId {
     268            4 :     fn from(b: [u8; 18]) -> Self {
     269            4 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     270            4 : 
     271            4 :         Self {
     272            4 :             tenant_id: TenantId::from(tenant_id_bytes),
     273            4 :             shard_number: ShardNumber(b[16]),
     274            4 :             shard_count: ShardCount(b[17]),
     275            4 :         }
     276            4 :     }
     277              : }
     278              : 
     279              : impl ShardIndex {
     280            0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     281            0 :         Self {
     282            0 :             shard_number: number,
     283            0 :             shard_count: count,
     284            0 :         }
     285            0 :     }
     286          180 :     pub fn unsharded() -> Self {
     287          180 :         Self {
     288          180 :             shard_number: ShardNumber(0),
     289          180 :             shard_count: ShardCount(0),
     290          180 :         }
     291          180 :     }
     292              : 
     293              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     294              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     295              :     /// a shard suffix.
     296        27500 :     pub fn is_unsharded(&self) -> bool {
     297        27500 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     298        27500 :     }
     299              : 
     300              :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     301              :     /// to get a fully qualified TenantShardId.
     302              :     ///
     303              :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     304              :     /// that the legacy pre-sharding remote key format is preserved.
     305         1090 :     pub fn get_suffix(&self) -> String {
     306         1090 :         if self.is_unsharded() {
     307         1090 :             "".to_string()
     308              :         } else {
     309            0 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     310              :         }
     311         1090 :     }
     312              : }
     313              : 
     314              : impl std::fmt::Display for ShardIndex {
     315         1448 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     316         1448 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     317         1448 :     }
     318              : }
     319              : 
     320              : impl std::fmt::Debug for ShardIndex {
     321         1174 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     322         1174 :         // Debug is the same as Display: the compact hex representation
     323         1174 :         write!(f, "{}", self)
     324         1174 :     }
     325              : }
     326              : 
     327              : impl std::str::FromStr for ShardIndex {
     328              :     type Err = hex::FromHexError;
     329              : 
     330            2 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     331            2 :         // Expect format: 1 byte shard number, 1 byte shard count
     332            2 :         if s.len() == 4 {
     333            2 :             let bytes = s.as_bytes();
     334            2 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     335            2 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     336            2 :             Ok(Self {
     337            2 :                 shard_number: ShardNumber(shard_parts[0]),
     338            2 :                 shard_count: ShardCount(shard_parts[1]),
     339            2 :             })
     340              :         } else {
     341            0 :             Err(hex::FromHexError::InvalidStringLength)
     342              :         }
     343            2 :     }
     344              : }
     345              : 
     346              : impl From<[u8; 2]> for ShardIndex {
     347            2 :     fn from(b: [u8; 2]) -> Self {
     348            2 :         Self {
     349            2 :             shard_number: ShardNumber(b[0]),
     350            2 :             shard_count: ShardCount(b[1]),
     351            2 :         }
     352            2 :     }
     353              : }
     354              : 
     355              : impl Serialize for TenantShardId {
     356           48 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     357           48 :     where
     358           48 :         S: serde::Serializer,
     359           48 :     {
     360           48 :         if serializer.is_human_readable() {
     361           40 :             serializer.collect_str(self)
     362              :         } else {
     363              :             // Note: while human encoding of [`TenantShardId`] is backward and forward
     364              :             // compatible, this binary encoding is not.
     365            8 :             let mut packed: [u8; 18] = [0; 18];
     366            8 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     367            8 :             packed[16] = self.shard_number.0;
     368            8 :             packed[17] = self.shard_count.0;
     369            8 : 
     370            8 :             packed.serialize(serializer)
     371              :         }
     372           48 :     }
     373              : }
     374              : 
     375              : impl<'de> Deserialize<'de> for TenantShardId {
     376           12 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     377           12 :     where
     378           12 :         D: serde::Deserializer<'de>,
     379           12 :     {
     380           12 :         struct IdVisitor {
     381           12 :             is_human_readable_deserializer: bool,
     382           12 :         }
     383           12 : 
     384           12 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     385           12 :             type Value = TenantShardId;
     386           12 : 
     387           12 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     388            0 :                 if self.is_human_readable_deserializer {
     389           12 :                     formatter.write_str("value in form of hex string")
     390           12 :                 } else {
     391           12 :                     formatter.write_str("value in form of integer array([u8; 18])")
     392           12 :                 }
     393           12 :             }
     394           12 : 
     395           12 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     396            4 :             where
     397            4 :                 A: serde::de::SeqAccess<'de>,
     398            4 :             {
     399            4 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     400           12 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     401           12 :                 Ok(TenantShardId::from(id))
     402           12 :             }
     403           12 : 
     404           12 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     405            8 :             where
     406            8 :                 E: serde::de::Error,
     407            8 :             {
     408            8 :                 TenantShardId::from_str(v).map_err(E::custom)
     409            8 :             }
     410           12 :         }
     411           12 : 
     412           12 :         if deserializer.is_human_readable() {
     413            8 :             deserializer.deserialize_str(IdVisitor {
     414            8 :                 is_human_readable_deserializer: true,
     415            8 :             })
     416              :         } else {
     417            4 :             deserializer.deserialize_tuple(
     418            4 :                 18,
     419            4 :                 IdVisitor {
     420            4 :                     is_human_readable_deserializer: false,
     421            4 :                 },
     422            4 :             )
     423              :         }
     424           12 :     }
     425              : }
     426              : 
     427              : /// Stripe size in number of pages
     428            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     429              : pub struct ShardStripeSize(pub u32);
     430              : 
     431              : /// Layout version: for future upgrades where we might change how the key->shard mapping works
     432            0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
     433              : pub struct ShardLayout(u8);
     434              : 
     435              : const LAYOUT_V1: ShardLayout = ShardLayout(1);
     436              : /// ShardIdentity uses a magic layout value to indicate if it is unusable
     437              : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
     438              : 
     439              : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
     440              : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
     441              : 
     442            0 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
     443              : pub enum ShardConfigError {
     444              :     #[error("Invalid shard count")]
     445              :     InvalidCount,
     446              :     #[error("Invalid shard number")]
     447              :     InvalidNumber,
     448              :     #[error("Invalid stripe size")]
     449              :     InvalidStripeSize,
     450              : }
     451              : 
     452              : impl ShardIdentity {
     453              :     /// An identity with number=0 count=0 is a "none" identity, which represents legacy
     454              :     /// tenants.  Modern single-shard tenants should not use this: they should
     455              :     /// have number=0 count=1.
     456         1162 :     pub const fn unsharded() -> Self {
     457         1162 :         Self {
     458         1162 :             number: ShardNumber(0),
     459         1162 :             count: ShardCount(0),
     460         1162 :             layout: LAYOUT_V1,
     461         1162 :             stripe_size: DEFAULT_STRIPE_SIZE,
     462         1162 :         }
     463         1162 :     }
     464              : 
     465              :     /// A broken instance of this type is only used for `TenantState::Broken` tenants,
     466              :     /// which are constructed in code paths that don't have access to proper configuration.
     467              :     ///
     468              :     /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
     469              :     /// Enforcement is via assertions, to avoid making our interface fallible for this
     470              :     /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
     471              :     /// state, and by extension to avoid trying to do any page->shard resolution.
     472            0 :     pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
     473            0 :         Self {
     474            0 :             number,
     475            0 :             count,
     476            0 :             layout: LAYOUT_BROKEN,
     477            0 :             stripe_size: DEFAULT_STRIPE_SIZE,
     478            0 :         }
     479            0 :     }
     480              : 
     481              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     482              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     483              :     /// a shard suffix.
     484            0 :     pub fn is_unsharded(&self) -> bool {
     485            0 :         self.number == ShardNumber(0) && self.count == ShardCount(0)
     486            0 :     }
     487              : 
     488              :     /// Count must be nonzero, and number must be < count. To construct
     489              :     /// the legacy case (count==0), use Self::unsharded instead.
     490         1024 :     pub fn new(
     491         1024 :         number: ShardNumber,
     492         1024 :         count: ShardCount,
     493         1024 :         stripe_size: ShardStripeSize,
     494         1024 :     ) -> Result<Self, ShardConfigError> {
     495         1024 :         if count.0 == 0 {
     496            2 :             Err(ShardConfigError::InvalidCount)
     497         1022 :         } else if number.0 > count.0 - 1 {
     498            6 :             Err(ShardConfigError::InvalidNumber)
     499         1016 :         } else if stripe_size.0 == 0 {
     500            2 :             Err(ShardConfigError::InvalidStripeSize)
     501              :         } else {
     502         1014 :             Ok(Self {
     503         1014 :                 number,
     504         1014 :                 count,
     505         1014 :                 layout: LAYOUT_V1,
     506         1014 :                 stripe_size,
     507         1014 :             })
     508              :         }
     509         1024 :     }
     510              : 
     511              :     /// For use when creating ShardIdentity instances for new shards, where a creation request
     512              :     /// specifies the ShardParameters that apply to all shards.
     513          118 :     pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
     514          118 :         Self {
     515          118 :             number,
     516          118 :             count: params.count,
     517          118 :             layout: LAYOUT_V1,
     518          118 :             stripe_size: params.stripe_size,
     519          118 :         }
     520          118 :     }
     521              : 
     522       177882 :     fn is_broken(&self) -> bool {
     523       177882 :         self.layout == LAYOUT_BROKEN
     524       177882 :     }
     525              : 
     526         3114 :     pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
     527         3114 :         assert!(!self.is_broken());
     528         3114 :         key_to_shard_number(self.count, self.stripe_size, key)
     529         3114 :     }
     530              : 
     531              :     /// Return true if the key should be ingested by this shard
     532              :     ///
     533              :     /// Shards must ingest _at least_ keys which return true from this check.
     534       174768 :     pub fn is_key_local(&self, key: &Key) -> bool {
     535       174768 :         assert!(!self.is_broken());
     536       174768 :         if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
     537       171816 :             true
     538              :         } else {
     539         2952 :             key_to_shard_number(self.count, self.stripe_size, key) == self.number
     540              :         }
     541       174768 :     }
     542              : 
     543              :     /// Return true if the key should be discarded if found in this shard's
     544              :     /// data store, e.g. during compaction after a split.
     545              :     ///
     546              :     /// Shards _may_ drop keys which return false here, but are not obliged to.
     547      2671937 :     pub fn is_key_disposable(&self, key: &Key) -> bool {
     548      2671937 :         if key_is_shard0(key) {
     549              :             // Q: Why can't we dispose of shard0 content if we're not shard 0?
     550              :             // A1: because the WAL ingestion logic currently ingests some shard 0
     551              :             //     content on all shards, even though it's only read on shard 0.  If we
     552              :             //     dropped it, then subsequent WAL ingest to these keys would encounter
     553              :             //     an error.
     554              :             // A2: because key_is_shard0 also covers relation size keys, which are written
     555              :             //     on all shards even though they're only maintained accurately on shard 0.
     556      2648251 :             false
     557              :         } else {
     558        23686 :             !self.is_key_local(key)
     559              :         }
     560      2671937 :     }
     561              : 
     562            0 :     pub fn shard_slug(&self) -> String {
     563            0 :         if self.count > ShardCount(0) {
     564            0 :             format!("-{:02x}{:02x}", self.number.0, self.count.0)
     565              :         } else {
     566            0 :             String::new()
     567              :         }
     568            0 :     }
     569              : 
     570              :     /// Convenience for checking if this identity is the 0th shard in a tenant,
     571              :     /// for special cases on shard 0 such as ingesting relation sizes.
     572            0 :     pub fn is_shard_zero(&self) -> bool {
     573            0 :         self.number == ShardNumber(0)
     574            0 :     }
     575              : }
     576              : 
     577              : impl Serialize for ShardIndex {
     578            4 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     579            4 :     where
     580            4 :         S: serde::Serializer,
     581            4 :     {
     582            4 :         if serializer.is_human_readable() {
     583            0 :             serializer.collect_str(self)
     584              :         } else {
     585              :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     586              :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     587              :             // compact binary encodings in future.
     588            4 :             let mut packed: [u8; 2] = [0; 2];
     589            4 :             packed[0] = self.shard_number.0;
     590            4 :             packed[1] = self.shard_count.0;
     591            4 :             packed.serialize(serializer)
     592              :         }
     593            4 :     }
     594              : }
     595              : 
     596              : impl<'de> Deserialize<'de> for ShardIndex {
     597            2 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     598            2 :     where
     599            2 :         D: serde::Deserializer<'de>,
     600            2 :     {
     601            2 :         struct IdVisitor {
     602            2 :             is_human_readable_deserializer: bool,
     603            2 :         }
     604            2 : 
     605            2 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     606            2 :             type Value = ShardIndex;
     607            2 : 
     608            2 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     609            0 :                 if self.is_human_readable_deserializer {
     610            2 :                     formatter.write_str("value in form of hex string")
     611            2 :                 } else {
     612            2 :                     formatter.write_str("value in form of integer array([u8; 2])")
     613            2 :                 }
     614            2 :             }
     615            2 : 
     616            2 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     617            2 :             where
     618            2 :                 A: serde::de::SeqAccess<'de>,
     619            2 :             {
     620            2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     621            2 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     622            2 :                 Ok(ShardIndex::from(id))
     623            2 :             }
     624            2 : 
     625            2 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     626            0 :             where
     627            0 :                 E: serde::de::Error,
     628            0 :             {
     629            0 :                 ShardIndex::from_str(v).map_err(E::custom)
     630            0 :             }
     631            2 :         }
     632            2 : 
     633            2 :         if deserializer.is_human_readable() {
     634            0 :             deserializer.deserialize_str(IdVisitor {
     635            0 :                 is_human_readable_deserializer: true,
     636            0 :             })
     637              :         } else {
     638            2 :             deserializer.deserialize_tuple(
     639            2 :                 2,
     640            2 :                 IdVisitor {
     641            2 :                     is_human_readable_deserializer: false,
     642            2 :                 },
     643            2 :             )
     644              :         }
     645            2 :     }
     646              : }
     647              : 
     648              : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
     649              : /// in order to be able to serve basebackup requests without peer communication).
     650      2677843 : fn key_is_shard0(key: &Key) -> bool {
     651      2677843 :     // To decide what to shard out to shards >0, we apply a simple rule that only
     652      2677843 :     // relation pages are distributed to shards other than shard zero. Everything else gets
     653      2677843 :     // stored on shard 0.  This guarantees that shard 0 can independently serve basebackup
     654      2677843 :     // requests, and any request other than those for particular blocks in relations.
     655      2677843 :     //
     656      2677843 :     // The only exception to this rule is "initfork" data -- this relates to postgres's UNLOGGED table
     657      2677843 :     // type. These are special relations, usually with only 0 or 1 blocks, and we store them on shard 0
     658      2677843 :     // because they must be included in basebackups.
     659      2677843 :     let is_initfork = key.field5 == INIT_FORKNUM;
     660      2677843 : 
     661      2677843 :     !is_rel_block_key(key) || is_initfork
     662      2677843 : }
     663              : 
     664              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     665         5910 : fn murmurhash32(mut h: u32) -> u32 {
     666         5910 :     h ^= h >> 16;
     667         5910 :     h = h.wrapping_mul(0x85ebca6b);
     668         5910 :     h ^= h >> 13;
     669         5910 :     h = h.wrapping_mul(0xc2b2ae35);
     670         5910 :     h ^= h >> 16;
     671         5910 :     h
     672         5910 : }
     673              : 
     674              : /// Provide the same result as the function in postgres `hashfn.h` with the same name
     675         2956 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
     676         2956 :     b = b.wrapping_add(0x9e3779b9);
     677         2956 :     b = b.wrapping_add(a << 6);
     678         2956 :     b = b.wrapping_add(a >> 2);
     679         2956 : 
     680         2956 :     a ^= b;
     681         2956 :     a
     682         2956 : }
     683              : 
     684              : /// Where a Key is to be distributed across shards, select the shard.  This function
     685              : /// does not account for keys that should be broadcast across shards.
     686              : ///
     687              : /// The hashing in this function must exactly match what we do in postgres smgr
     688              : /// code.  The resulting distribution of pages is intended to preserve locality within
     689              : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
     690              : /// distributing data pseudo-randomly.
     691              : ///
     692              : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
     693              : /// and will be handled at higher levels when shards are split.
     694         6068 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
     695         6068 :     // Fast path for un-sharded tenants or broadcast keys
     696         6068 :     if count < ShardCount(2) || key_is_shard0(key) {
     697         3114 :         return ShardNumber(0);
     698         2954 :     }
     699         2954 : 
     700         2954 :     // relNode
     701         2954 :     let mut hash = murmurhash32(key.field4);
     702         2954 :     // blockNum/stripe size
     703         2954 :     hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
     704         2954 : 
     705         2954 :     ShardNumber((hash % count.0 as u32) as u8)
     706         6068 : }
     707              : 
     708              : #[cfg(test)]
     709              : mod tests {
     710              :     use utils::Hex;
     711              : 
     712              :     use super::*;
     713              : 
     714              :     const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
     715              : 
     716              :     #[test]
     717            2 :     fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
     718            2 :         let example = TenantShardId {
     719            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     720            2 :             shard_count: ShardCount(10),
     721            2 :             shard_number: ShardNumber(7),
     722            2 :         };
     723            2 : 
     724            2 :         let encoded = format!("{example}");
     725            2 : 
     726            2 :         let expected = format!("{EXAMPLE_TENANT_ID}-070a");
     727            2 :         assert_eq!(&encoded, &expected);
     728              : 
     729            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     730              : 
     731            2 :         assert_eq!(example, decoded);
     732              : 
     733            2 :         Ok(())
     734            2 :     }
     735              : 
     736              :     #[test]
     737            2 :     fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
     738            2 :         let example = TenantShardId {
     739            2 :             tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
     740            2 :             shard_count: ShardCount(10),
     741            2 :             shard_number: ShardNumber(7),
     742            2 :         };
     743            2 : 
     744            2 :         let encoded = bincode::serialize(&example).unwrap();
     745            2 :         let expected: [u8; 18] = [
     746            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     747            2 :             0xf6, 0xfc, 0x07, 0x0a,
     748            2 :         ];
     749            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     750              : 
     751            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     752            2 : 
     753            2 :         assert_eq!(example, decoded);
     754              : 
     755            2 :         Ok(())
     756            2 :     }
     757              : 
     758              :     #[test]
     759            2 :     fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
     760            2 :         // Test that TenantShardId can decode a TenantId in human
     761            2 :         // readable form
     762            2 :         let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     763            2 :         let encoded = format!("{example}");
     764            2 : 
     765            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     766              : 
     767            2 :         let decoded = TenantShardId::from_str(&encoded)?;
     768              : 
     769            2 :         assert_eq!(example, decoded.tenant_id);
     770            2 :         assert_eq!(decoded.shard_count, ShardCount(0));
     771            2 :         assert_eq!(decoded.shard_number, ShardNumber(0));
     772              : 
     773            2 :         Ok(())
     774            2 :     }
     775              : 
     776              :     #[test]
     777            2 :     fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
     778            2 :         // Test that a legacy TenantShardId encodes into a form that
     779            2 :         // can be decoded as TenantId
     780            2 :         let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
     781            2 :         let example = TenantShardId::unsharded(example_tenant_id);
     782            2 :         let encoded = format!("{example}");
     783            2 : 
     784            2 :         assert_eq!(&encoded, EXAMPLE_TENANT_ID);
     785              : 
     786            2 :         let decoded = TenantId::from_str(&encoded)?;
     787              : 
     788            2 :         assert_eq!(example_tenant_id, decoded);
     789              : 
     790            2 :         Ok(())
     791            2 :     }
     792              : 
     793              :     #[test]
     794            2 :     fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
     795            2 :         // Unlike in human readable encoding, binary encoding does not
     796            2 :         // do any special handling of legacy unsharded TenantIds: this test
     797            2 :         // is equivalent to the main test for binary encoding, just verifying
     798            2 :         // that the same behavior applies when we have used `unsharded()` to
     799            2 :         // construct a TenantShardId.
     800            2 :         let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
     801            2 :         let encoded = bincode::serialize(&example).unwrap();
     802            2 : 
     803            2 :         let expected: [u8; 18] = [
     804            2 :             0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
     805            2 :             0xf6, 0xfc, 0x00, 0x00,
     806            2 :         ];
     807            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     808              : 
     809            2 :         let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
     810            2 :         assert_eq!(example, decoded);
     811              : 
     812            2 :         Ok(())
     813            2 :     }
     814              : 
     815              :     #[test]
     816            2 :     fn shard_identity_validation() -> Result<(), ShardConfigError> {
     817            2 :         // Happy cases
     818            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
     819            2 :         ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
     820            2 :         ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
     821              : 
     822            2 :         assert_eq!(
     823            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
     824            2 :             Err(ShardConfigError::InvalidCount)
     825            2 :         );
     826            2 :         assert_eq!(
     827            2 :             ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
     828            2 :             Err(ShardConfigError::InvalidNumber)
     829            2 :         );
     830            2 :         assert_eq!(
     831            2 :             ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
     832            2 :             Err(ShardConfigError::InvalidNumber)
     833            2 :         );
     834            2 :         assert_eq!(
     835            2 :             ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
     836            2 :             Err(ShardConfigError::InvalidNumber)
     837            2 :         );
     838            2 :         assert_eq!(
     839            2 :             ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
     840            2 :             Err(ShardConfigError::InvalidStripeSize)
     841            2 :         );
     842              : 
     843            2 :         Ok(())
     844            2 :     }
     845              : 
     846              :     #[test]
     847            2 :     fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
     848            2 :         let example = ShardIndex {
     849            2 :             shard_number: ShardNumber(13),
     850            2 :             shard_count: ShardCount(17),
     851            2 :         };
     852            2 :         let expected: String = "0d11".to_string();
     853            2 :         let encoded = format!("{example}");
     854            2 :         assert_eq!(&encoded, &expected);
     855              : 
     856            2 :         let decoded = ShardIndex::from_str(&encoded)?;
     857            2 :         assert_eq!(example, decoded);
     858            2 :         Ok(())
     859            2 :     }
     860              : 
     861              :     #[test]
     862            2 :     fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
     863            2 :         let example = ShardIndex {
     864            2 :             shard_number: ShardNumber(13),
     865            2 :             shard_count: ShardCount(17),
     866            2 :         };
     867            2 :         let expected: [u8; 2] = [0x0d, 0x11];
     868            2 : 
     869            2 :         let encoded = bincode::serialize(&example).unwrap();
     870            2 :         assert_eq!(Hex(&encoded), Hex(&expected));
     871            2 :         let decoded = bincode::deserialize(&encoded).unwrap();
     872            2 :         assert_eq!(example, decoded);
     873              : 
     874            2 :         Ok(())
     875            2 :     }
     876              : 
     877              :     // These are only smoke tests to spot check that our implementation doesn't
     878              :     // deviate from a few examples values: not aiming to validate the overall
     879              :     // hashing algorithm.
     880              :     #[test]
     881            2 :     fn murmur_hash() {
     882            2 :         assert_eq!(murmurhash32(0), 0);
     883              : 
     884            2 :         assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
     885            2 :     }
     886              : 
     887              :     #[test]
     888            2 :     fn shard_mapping() {
     889            2 :         let key = Key {
     890            2 :             field1: 0x00,
     891            2 :             field2: 0x67f,
     892            2 :             field3: 0x5,
     893            2 :             field4: 0x400c,
     894            2 :             field5: 0x00,
     895            2 :             field6: 0x7d06,
     896            2 :         };
     897            2 : 
     898            2 :         let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
     899            2 :         assert_eq!(shard, ShardNumber(8));
     900            2 :     }
     901              : 
     902              :     #[test]
     903            2 :     fn shard_id_split() {
     904            2 :         let tenant_id = TenantId::generate();
     905            2 :         let parent = TenantShardId::unsharded(tenant_id);
     906            2 : 
     907            2 :         // Unsharded into 2
     908            2 :         assert_eq!(
     909            2 :             parent.split(ShardCount(2)),
     910            2 :             vec![
     911            2 :                 TenantShardId {
     912            2 :                     tenant_id,
     913            2 :                     shard_count: ShardCount(2),
     914            2 :                     shard_number: ShardNumber(0)
     915            2 :                 },
     916            2 :                 TenantShardId {
     917            2 :                     tenant_id,
     918            2 :                     shard_count: ShardCount(2),
     919            2 :                     shard_number: ShardNumber(1)
     920            2 :                 }
     921            2 :             ]
     922            2 :         );
     923              : 
     924              :         // Unsharded into 4
     925            2 :         assert_eq!(
     926            2 :             parent.split(ShardCount(4)),
     927            2 :             vec![
     928            2 :                 TenantShardId {
     929            2 :                     tenant_id,
     930            2 :                     shard_count: ShardCount(4),
     931            2 :                     shard_number: ShardNumber(0)
     932            2 :                 },
     933            2 :                 TenantShardId {
     934            2 :                     tenant_id,
     935            2 :                     shard_count: ShardCount(4),
     936            2 :                     shard_number: ShardNumber(1)
     937            2 :                 },
     938            2 :                 TenantShardId {
     939            2 :                     tenant_id,
     940            2 :                     shard_count: ShardCount(4),
     941            2 :                     shard_number: ShardNumber(2)
     942            2 :                 },
     943            2 :                 TenantShardId {
     944            2 :                     tenant_id,
     945            2 :                     shard_count: ShardCount(4),
     946            2 :                     shard_number: ShardNumber(3)
     947            2 :                 }
     948            2 :             ]
     949            2 :         );
     950              : 
     951              :         // count=1 into 2 (check this works the same as unsharded.)
     952            2 :         let parent = TenantShardId {
     953            2 :             tenant_id,
     954            2 :             shard_count: ShardCount(1),
     955            2 :             shard_number: ShardNumber(0),
     956            2 :         };
     957            2 :         assert_eq!(
     958            2 :             parent.split(ShardCount(2)),
     959            2 :             vec![
     960            2 :                 TenantShardId {
     961            2 :                     tenant_id,
     962            2 :                     shard_count: ShardCount(2),
     963            2 :                     shard_number: ShardNumber(0)
     964            2 :                 },
     965            2 :                 TenantShardId {
     966            2 :                     tenant_id,
     967            2 :                     shard_count: ShardCount(2),
     968            2 :                     shard_number: ShardNumber(1)
     969            2 :                 }
     970            2 :             ]
     971            2 :         );
     972              : 
     973              :         // count=2 into count=8
     974            2 :         let parent = TenantShardId {
     975            2 :             tenant_id,
     976            2 :             shard_count: ShardCount(2),
     977            2 :             shard_number: ShardNumber(1),
     978            2 :         };
     979            2 :         assert_eq!(
     980            2 :             parent.split(ShardCount(8)),
     981            2 :             vec![
     982            2 :                 TenantShardId {
     983            2 :                     tenant_id,
     984            2 :                     shard_count: ShardCount(8),
     985            2 :                     shard_number: ShardNumber(1)
     986            2 :                 },
     987            2 :                 TenantShardId {
     988            2 :                     tenant_id,
     989            2 :                     shard_count: ShardCount(8),
     990            2 :                     shard_number: ShardNumber(3)
     991            2 :                 },
     992            2 :                 TenantShardId {
     993            2 :                     tenant_id,
     994            2 :                     shard_count: ShardCount(8),
     995            2 :                     shard_number: ShardNumber(5)
     996            2 :                 },
     997            2 :                 TenantShardId {
     998            2 :                     tenant_id,
     999            2 :                     shard_count: ShardCount(8),
    1000            2 :                     shard_number: ShardNumber(7)
    1001            2 :                 },
    1002            2 :             ]
    1003            2 :         );
    1004            2 :     }
    1005              : }
        

Generated by: LCOV version 2.1-beta