LCOV - code coverage report
Current view: top level - libs/utils/src - shard.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 86.0 % 271 233
Test Date: 2024-08-02 21:34:27 Functions: 43.7 % 87 38

            Line data    Source code
       1              : //! See `pageserver_api::shard` for description on sharding.
       2              : 
       3              : use std::{ops::RangeInclusive, str::FromStr};
       4              : 
       5              : use hex::FromHex;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use crate::id::TenantId;
       9              : 
      10            0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      11              : pub struct ShardNumber(pub u8);
      12              : 
      13            0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
      14              : pub struct ShardCount(pub u8);
      15              : 
      16              : /// Combination of ShardNumber and ShardCount.  For use within the context of a particular tenant,
      17              : /// when we need to know which shard we're dealing with, but do not need to know the full
      18              : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
      19              : /// the fully qualified TenantShardId.
      20              : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      21              : pub struct ShardIndex {
      22              :     pub shard_number: ShardNumber,
      23              :     pub shard_count: ShardCount,
      24              : }
      25              : 
      26              : /// Formatting helper, for generating the `shard_id` label in traces.
      27              : pub struct ShardSlug<'a>(&'a TenantShardId);
      28              : 
      29              : /// TenantShardId globally identifies a particular shard in a particular tenant.
      30              : ///
      31              : /// These are written as `<TenantId>-<ShardSlug>`, for example:
      32              : ///   # The second shard in a two-shard tenant
      33              : ///   072f1291a5310026820b2fe4b2968934-0102
      34              : ///
      35              : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
      36              : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
      37              : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
      38              : ///
      39              : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
      40              : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
      41              : /// decoded as a TenantShardId, and when re-encoded it will be parseable
      42              : /// as a TenantId.
      43              : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
      44              : pub struct TenantShardId {
      45              :     pub tenant_id: TenantId,
      46              :     pub shard_number: ShardNumber,
      47              :     pub shard_count: ShardCount,
      48              : }
      49              : 
      50              : impl ShardCount {
      51              :     pub const MAX: Self = Self(u8::MAX);
      52              :     pub const MIN: Self = Self(0);
      53              : 
      54              :     /// The internal value of a ShardCount may be zero, which means "1 shard, but use
      55              :     /// legacy format for TenantShardId that excludes the shard suffix", also known
      56              :     /// as [`TenantShardId::unsharded`].
      57              :     ///
      58              :     /// This method returns the actual number of shards, i.e. if our internal value is
      59              :     /// zero, we return 1 (unsharded tenants have 1 shard).
      60      4804222 :     pub fn count(&self) -> u8 {
      61      4804222 :         if self.0 > 0 {
      62           12 :             self.0
      63              :         } else {
      64      4804210 :             1
      65              :         }
      66      4804222 :     }
      67              : 
      68              :     /// The literal internal value: this is **not** the number of shards in the
      69              :     /// tenant, as we have a special zero value for legacy unsharded tenants.  Use
      70              :     /// [`Self::count`] if you want to know the cardinality of shards.
      71            4 :     pub fn literal(&self) -> u8 {
      72            4 :         self.0
      73            4 :     }
      74              : 
      75              :     /// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
      76              :     /// uses the legacy format for `TenantShardId`. See also the documentation for
      77              :     /// [`Self::count`].
      78            0 :     pub fn is_unsharded(&self) -> bool {
      79            0 :         self.0 == 0
      80            0 :     }
      81              : 
      82              :     /// `v` may be zero, or the number of shards in the tenant.  `v` is what
      83              :     /// [`Self::literal`] would return.
      84         7154 :     pub const fn new(val: u8) -> Self {
      85         7154 :         Self(val)
      86         7154 :     }
      87              : }
      88              : 
      89              : impl ShardNumber {
      90              :     pub const MAX: Self = Self(u8::MAX);
      91              : }
      92              : 
      93              : impl TenantShardId {
      94           40 :     pub fn unsharded(tenant_id: TenantId) -> Self {
      95           40 :         Self {
      96           40 :             tenant_id,
      97           40 :             shard_number: ShardNumber(0),
      98           40 :             shard_count: ShardCount(0),
      99           40 :         }
     100           40 :     }
     101              : 
     102              :     /// The range of all TenantShardId that belong to a particular TenantId.  This is useful when
     103              :     /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
     104            0 :     pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
     105            0 :         RangeInclusive::new(
     106            0 :             Self {
     107            0 :                 tenant_id,
     108            0 :                 shard_number: ShardNumber(0),
     109            0 :                 shard_count: ShardCount(0),
     110            0 :             },
     111            0 :             Self {
     112            0 :                 tenant_id,
     113            0 :                 shard_number: ShardNumber::MAX,
     114            0 :                 shard_count: ShardCount::MAX,
     115            0 :             },
     116            0 :         )
     117            0 :     }
     118              : 
     119        10906 :     pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
     120        10906 :         ShardSlug(self)
     121        10906 :     }
     122              : 
     123              :     /// Convenience for code that has special behavior on the 0th shard.
     124            6 :     pub fn is_shard_zero(&self) -> bool {
     125            6 :         self.shard_number == ShardNumber(0)
     126            6 :     }
     127              : 
     128              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     129              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     130              :     /// a shard suffix.
     131            0 :     pub fn is_unsharded(&self) -> bool {
     132            0 :         self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
     133            0 :     }
     134              : 
     135              :     /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
     136              :     /// is useful when logging from code that is already in a span that includes tenant ID, to
     137              :     /// keep messages reasonably terse.
     138            0 :     pub fn to_index(&self) -> ShardIndex {
     139            0 :         ShardIndex {
     140            0 :             shard_number: self.shard_number,
     141            0 :             shard_count: self.shard_count,
     142            0 :         }
     143            0 :     }
     144              : 
     145              :     /// Calculate the children of this TenantShardId when splitting the overall tenant into
     146              :     /// the given number of shards.
     147            8 :     pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
     148            8 :         let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
     149            8 :         let mut child_shards = Vec::new();
     150           32 :         for shard_number in 0..ShardNumber(new_shard_count.0).0 {
     151              :             // Key mapping is based on a round robin mapping of key hash modulo shard count,
     152              :             // so our child shards are the ones which the same keys would map to.
     153           32 :             if shard_number % effective_old_shard_count == self.shard_number.0 {
     154           24 :                 child_shards.push(TenantShardId {
     155           24 :                     tenant_id: self.tenant_id,
     156           24 :                     shard_number: ShardNumber(shard_number),
     157           24 :                     shard_count: new_shard_count,
     158           24 :                 })
     159            8 :             }
     160              :         }
     161              : 
     162            8 :         child_shards
     163            8 :     }
     164              : }
     165              : 
     166              : impl<'a> std::fmt::Display for ShardSlug<'a> {
     167        10906 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     168        10906 :         write!(
     169        10906 :             f,
     170        10906 :             "{:02x}{:02x}",
     171        10906 :             self.0.shard_number.0, self.0.shard_count.0
     172        10906 :         )
     173        10906 :     }
     174              : }
     175              : 
     176              : impl std::fmt::Display for TenantShardId {
     177        11874 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     178        11874 :         if self.shard_count != ShardCount(0) {
     179          112 :             write!(f, "{}-{}", self.tenant_id, self.shard_slug())
     180              :         } else {
     181              :             // Legacy case (shard_count == 0) -- format as just the tenant id.  Note that this
     182              :             // is distinct from the normal single shard case (shard count == 1).
     183        11762 :             self.tenant_id.fmt(f)
     184              :         }
     185        11874 :     }
     186              : }
     187              : 
     188              : impl std::fmt::Debug for TenantShardId {
     189         3128 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     190         3128 :         // Debug is the same as Display: the compact hex representation
     191         3128 :         write!(f, "{}", self)
     192         3128 :     }
     193              : }
     194              : 
     195              : impl std::str::FromStr for TenantShardId {
     196              :     type Err = hex::FromHexError;
     197              : 
     198         4081 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     199         4081 :         // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
     200         4081 :         if s.len() == 32 {
     201              :             // Legacy case: no shard specified
     202              :             Ok(Self {
     203         4053 :                 tenant_id: TenantId::from_str(s)?,
     204         4053 :                 shard_number: ShardNumber(0),
     205         4053 :                 shard_count: ShardCount(0),
     206              :             })
     207           28 :         } else if s.len() == 37 {
     208           28 :             let bytes = s.as_bytes();
     209           28 :             let tenant_id = TenantId::from_hex(&bytes[0..32])?;
     210           28 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     211           28 :             hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
     212           28 :             Ok(Self {
     213           28 :                 tenant_id,
     214           28 :                 shard_number: ShardNumber(shard_parts[0]),
     215           28 :                 shard_count: ShardCount(shard_parts[1]),
     216           28 :             })
     217              :         } else {
     218            0 :             Err(hex::FromHexError::InvalidStringLength)
     219              :         }
     220         4081 :     }
     221              : }
     222              : 
     223              : impl From<[u8; 18]> for TenantShardId {
     224            4 :     fn from(b: [u8; 18]) -> Self {
     225            4 :         let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
     226            4 : 
     227            4 :         Self {
     228            4 :             tenant_id: TenantId::from(tenant_id_bytes),
     229            4 :             shard_number: ShardNumber(b[16]),
     230            4 :             shard_count: ShardCount(b[17]),
     231            4 :         }
     232            4 :     }
     233              : }
     234              : 
     235              : impl ShardIndex {
     236            0 :     pub fn new(number: ShardNumber, count: ShardCount) -> Self {
     237            0 :         Self {
     238            0 :             shard_number: number,
     239            0 :             shard_count: count,
     240            0 :         }
     241            0 :     }
     242           94 :     pub fn unsharded() -> Self {
     243           94 :         Self {
     244           94 :             shard_number: ShardNumber(0),
     245           94 :             shard_count: ShardCount(0),
     246           94 :         }
     247           94 :     }
     248              : 
     249              :     /// The "unsharded" value is distinct from simply having a single shard: it represents
     250              :     /// a tenant which is not shard-aware at all, and whose storage paths will not include
     251              :     /// a shard suffix.
     252        72155 :     pub fn is_unsharded(&self) -> bool {
     253        72155 :         self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
     254        72155 :     }
     255              : 
     256              :     /// For use in constructing remote storage paths: concatenate this with a TenantId
     257              :     /// to get a fully qualified TenantShardId.
     258              :     ///
     259              :     /// Backward compat: this function returns an empty string if Self::is_unsharded, such
     260              :     /// that the legacy pre-sharding remote key format is preserved.
     261         1383 :     pub fn get_suffix(&self) -> String {
     262         1383 :         if self.is_unsharded() {
     263         1375 :             "".to_string()
     264              :         } else {
     265            8 :             format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     266              :         }
     267         1383 :     }
     268              : }
     269              : 
     270              : impl std::fmt::Display for ShardIndex {
     271         1990 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     272         1990 :         write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
     273         1990 :     }
     274              : }
     275              : 
     276              : impl std::fmt::Debug for ShardIndex {
     277         1516 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     278         1516 :         // Debug is the same as Display: the compact hex representation
     279         1516 :         write!(f, "{}", self)
     280         1516 :     }
     281              : }
     282              : 
     283              : impl std::str::FromStr for ShardIndex {
     284              :     type Err = hex::FromHexError;
     285              : 
     286         3130 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     287         3130 :         // Expect format: 1 byte shard number, 1 byte shard count
     288         3130 :         if s.len() == 4 {
     289         3130 :             let bytes = s.as_bytes();
     290         3130 :             let mut shard_parts: [u8; 2] = [0u8; 2];
     291         3130 :             hex::decode_to_slice(bytes, &mut shard_parts)?;
     292         3130 :             Ok(Self {
     293         3130 :                 shard_number: ShardNumber(shard_parts[0]),
     294         3130 :                 shard_count: ShardCount(shard_parts[1]),
     295         3130 :             })
     296              :         } else {
     297            0 :             Err(hex::FromHexError::InvalidStringLength)
     298              :         }
     299         3130 :     }
     300              : }
     301              : 
     302              : impl From<[u8; 2]> for ShardIndex {
     303            2 :     fn from(b: [u8; 2]) -> Self {
     304            2 :         Self {
     305            2 :             shard_number: ShardNumber(b[0]),
     306            2 :             shard_count: ShardCount(b[1]),
     307            2 :         }
     308            2 :     }
     309              : }
     310              : 
     311              : impl Serialize for TenantShardId {
     312           52 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     313           52 :     where
     314           52 :         S: serde::Serializer,
     315           52 :     {
     316           52 :         if serializer.is_human_readable() {
     317           44 :             serializer.collect_str(self)
     318              :         } else {
     319              :             // Note: while human encoding of [`TenantShardId`] is backward and forward
     320              :             // compatible, this binary encoding is not.
     321            8 :             let mut packed: [u8; 18] = [0; 18];
     322            8 :             packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
     323            8 :             packed[16] = self.shard_number.0;
     324            8 :             packed[17] = self.shard_count.0;
     325            8 : 
     326            8 :             packed.serialize(serializer)
     327              :         }
     328           52 :     }
     329              : }
     330              : 
     331              : impl<'de> Deserialize<'de> for TenantShardId {
     332           12 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     333           12 :     where
     334           12 :         D: serde::Deserializer<'de>,
     335           12 :     {
     336           12 :         struct IdVisitor {
     337           12 :             is_human_readable_deserializer: bool,
     338           12 :         }
     339           12 : 
     340           12 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     341           12 :             type Value = TenantShardId;
     342           12 : 
     343           12 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     344            0 :                 if self.is_human_readable_deserializer {
     345           12 :                     formatter.write_str("value in form of hex string")
     346           12 :                 } else {
     347           12 :                     formatter.write_str("value in form of integer array([u8; 18])")
     348           12 :                 }
     349           12 :             }
     350           12 : 
     351           12 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     352            4 :             where
     353            4 :                 A: serde::de::SeqAccess<'de>,
     354            4 :             {
     355            4 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     356           12 :                 let id: [u8; 18] = Deserialize::deserialize(s)?;
     357           12 :                 Ok(TenantShardId::from(id))
     358           12 :             }
     359           12 : 
     360           12 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     361            8 :             where
     362            8 :                 E: serde::de::Error,
     363            8 :             {
     364            8 :                 TenantShardId::from_str(v).map_err(E::custom)
     365            8 :             }
     366           12 :         }
     367           12 : 
     368           12 :         if deserializer.is_human_readable() {
     369            8 :             deserializer.deserialize_str(IdVisitor {
     370            8 :                 is_human_readable_deserializer: true,
     371            8 :             })
     372              :         } else {
     373            4 :             deserializer.deserialize_tuple(
     374            4 :                 18,
     375            4 :                 IdVisitor {
     376            4 :                     is_human_readable_deserializer: false,
     377            4 :                 },
     378            4 :             )
     379              :         }
     380           12 :     }
     381              : }
     382              : 
     383              : impl Serialize for ShardIndex {
     384           20 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     385           20 :     where
     386           20 :         S: serde::Serializer,
     387           20 :     {
     388           20 :         if serializer.is_human_readable() {
     389           16 :             serializer.collect_str(self)
     390              :         } else {
     391              :             // Binary encoding is not used in index_part.json, but is included in anticipation of
     392              :             // switching various structures (e.g. inter-process communication, remote metadata) to more
     393              :             // compact binary encodings in future.
     394            4 :             let mut packed: [u8; 2] = [0; 2];
     395            4 :             packed[0] = self.shard_number.0;
     396            4 :             packed[1] = self.shard_count.0;
     397            4 :             packed.serialize(serializer)
     398              :         }
     399           20 :     }
     400              : }
     401              : 
     402              : impl<'de> Deserialize<'de> for ShardIndex {
     403         3130 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     404         3130 :     where
     405         3130 :         D: serde::Deserializer<'de>,
     406         3130 :     {
     407         3130 :         struct IdVisitor {
     408         3130 :             is_human_readable_deserializer: bool,
     409         3130 :         }
     410         3130 : 
     411         3130 :         impl<'de> serde::de::Visitor<'de> for IdVisitor {
     412         3130 :             type Value = ShardIndex;
     413         3130 : 
     414         3130 :             fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
     415            0 :                 if self.is_human_readable_deserializer {
     416         3130 :                     formatter.write_str("value in form of hex string")
     417         3130 :                 } else {
     418         3130 :                     formatter.write_str("value in form of integer array([u8; 2])")
     419         3130 :                 }
     420         3130 :             }
     421         3130 : 
     422         3130 :             fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
     423            2 :             where
     424            2 :                 A: serde::de::SeqAccess<'de>,
     425            2 :             {
     426            2 :                 let s = serde::de::value::SeqAccessDeserializer::new(seq);
     427         3130 :                 let id: [u8; 2] = Deserialize::deserialize(s)?;
     428         3130 :                 Ok(ShardIndex::from(id))
     429         3130 :             }
     430         3130 : 
     431         3130 :             fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
     432         3128 :             where
     433         3128 :                 E: serde::de::Error,
     434         3128 :             {
     435         3128 :                 ShardIndex::from_str(v).map_err(E::custom)
     436         3128 :             }
     437         3130 :         }
     438         3130 : 
     439         3130 :         if deserializer.is_human_readable() {
     440         3128 :             deserializer.deserialize_str(IdVisitor {
     441         3128 :                 is_human_readable_deserializer: true,
     442         3128 :             })
     443              :         } else {
     444            2 :             deserializer.deserialize_tuple(
     445            2 :                 2,
     446            2 :                 IdVisitor {
     447            2 :                     is_human_readable_deserializer: false,
     448            2 :                 },
     449            2 :             )
     450              :         }
     451         3130 :     }
     452              : }
        

Generated by: LCOV version 2.1-beta