Line data Source code
1 : //! See `pageserver_api::shard` for description on sharding.
2 :
3 : use std::{ops::RangeInclusive, str::FromStr};
4 :
5 : use hex::FromHex;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use crate::id::TenantId;
9 :
10 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
11 : pub struct ShardNumber(pub u8);
12 :
13 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
14 : pub struct ShardCount(pub u8);
15 :
16 : /// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
17 : /// when we need to know which shard we're dealing with, but do not need to know the full
18 : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
19 : /// the fully qualified TenantShardId.
20 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
21 : pub struct ShardIndex {
22 : pub shard_number: ShardNumber,
23 : pub shard_count: ShardCount,
24 : }
25 :
26 : /// Formatting helper, for generating the `shard_id` label in traces.
27 : pub struct ShardSlug<'a>(&'a TenantShardId);
28 :
29 : /// TenantShardId globally identifies a particular shard in a particular tenant.
30 : ///
31 : /// These are written as `<TenantId>-<ShardSlug>`, for example:
32 : /// # The second shard in a two-shard tenant
33 : /// 072f1291a5310026820b2fe4b2968934-0102
34 : ///
35 : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
36 : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
37 : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
38 : ///
39 : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
40 : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
41 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
42 : /// as a TenantId.
43 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
44 : pub struct TenantShardId {
45 : pub tenant_id: TenantId,
46 : pub shard_number: ShardNumber,
47 : pub shard_count: ShardCount,
48 : }
49 :
50 : impl ShardCount {
51 : pub const MAX: Self = Self(u8::MAX);
52 :
53 : /// The internal value of a ShardCount may be zero, which means "1 shard, but use
54 : /// legacy format for TenantShardId that excludes the shard suffix", also known
55 : /// as [`TenantShardId::unsharded`].
56 : ///
57 : /// This method returns the actual number of shards, i.e. if our internal value is
58 : /// zero, we return 1 (unsharded tenants have 1 shard).
59 4804220 : pub fn count(&self) -> u8 {
60 4804220 : if self.0 > 0 {
61 12 : self.0
62 : } else {
63 4804208 : 1
64 : }
65 4804220 : }
66 :
67 : /// The literal internal value: this is **not** the number of shards in the
68 : /// tenant, as we have a special zero value for legacy unsharded tenants. Use
69 : /// [`Self::count`] if you want to know the cardinality of shards.
70 4 : pub fn literal(&self) -> u8 {
71 4 : self.0
72 4 : }
73 :
74 : /// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
75 : /// uses the legacy format for `TenantShardId`. See also the documentation for
76 : /// [`Self::count`].
77 0 : pub fn is_unsharded(&self) -> bool {
78 0 : self.0 == 0
79 0 : }
80 :
81 : /// `v` may be zero, or the number of shards in the tenant. `v` is what
82 : /// [`Self::literal`] would return.
83 7142 : pub const fn new(val: u8) -> Self {
84 7142 : Self(val)
85 7142 : }
86 : }
87 :
88 : impl ShardNumber {
89 : pub const MAX: Self = Self(u8::MAX);
90 : }
91 :
92 : impl TenantShardId {
93 36 : pub fn unsharded(tenant_id: TenantId) -> Self {
94 36 : Self {
95 36 : tenant_id,
96 36 : shard_number: ShardNumber(0),
97 36 : shard_count: ShardCount(0),
98 36 : }
99 36 : }
100 :
101 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
102 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
103 0 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
104 0 : RangeInclusive::new(
105 0 : Self {
106 0 : tenant_id,
107 0 : shard_number: ShardNumber(0),
108 0 : shard_count: ShardCount(0),
109 0 : },
110 0 : Self {
111 0 : tenant_id,
112 0 : shard_number: ShardNumber::MAX,
113 0 : shard_count: ShardCount::MAX,
114 0 : },
115 0 : )
116 0 : }
117 :
118 10671 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
119 10671 : ShardSlug(self)
120 10671 : }
121 :
122 : /// Convenience for code that has special behavior on the 0th shard.
123 6 : pub fn is_shard_zero(&self) -> bool {
124 6 : self.shard_number == ShardNumber(0)
125 6 : }
126 :
127 : /// The "unsharded" value is distinct from simply having a single shard: it represents
128 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
129 : /// a shard suffix.
130 0 : pub fn is_unsharded(&self) -> bool {
131 0 : self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
132 0 : }
133 :
134 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
135 : /// is useful when logging from code that is already in a span that includes tenant ID, to
136 : /// keep messages reasonably terse.
137 0 : pub fn to_index(&self) -> ShardIndex {
138 0 : ShardIndex {
139 0 : shard_number: self.shard_number,
140 0 : shard_count: self.shard_count,
141 0 : }
142 0 : }
143 :
144 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
145 : /// the given number of shards.
146 8 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
147 8 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
148 8 : let mut child_shards = Vec::new();
149 32 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
150 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
151 : // so our child shards are the ones which the same keys would map to.
152 32 : if shard_number % effective_old_shard_count == self.shard_number.0 {
153 24 : child_shards.push(TenantShardId {
154 24 : tenant_id: self.tenant_id,
155 24 : shard_number: ShardNumber(shard_number),
156 24 : shard_count: new_shard_count,
157 24 : })
158 8 : }
159 : }
160 :
161 8 : child_shards
162 8 : }
163 : }
164 :
165 : impl<'a> std::fmt::Display for ShardSlug<'a> {
166 10671 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 10671 : write!(
168 10671 : f,
169 10671 : "{:02x}{:02x}",
170 10671 : self.0.shard_number.0, self.0.shard_count.0
171 10671 : )
172 10671 : }
173 : }
174 :
175 : impl std::fmt::Display for TenantShardId {
176 8727 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 8727 : if self.shard_count != ShardCount(0) {
178 120 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
179 : } else {
180 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
181 : // is distinct from the normal single shard case (shard count == 1).
182 8607 : self.tenant_id.fmt(f)
183 : }
184 8727 : }
185 : }
186 :
187 : impl std::fmt::Debug for TenantShardId {
188 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 0 : // Debug is the same as Display: the compact hex representation
190 0 : write!(f, "{}", self)
191 0 : }
192 : }
193 :
194 : impl std::str::FromStr for TenantShardId {
195 : type Err = hex::FromHexError;
196 :
197 3981 : fn from_str(s: &str) -> Result<Self, Self::Err> {
198 3981 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
199 3981 : if s.len() == 32 {
200 : // Legacy case: no shard specified
201 : Ok(Self {
202 3953 : tenant_id: TenantId::from_str(s)?,
203 3953 : shard_number: ShardNumber(0),
204 3953 : shard_count: ShardCount(0),
205 : })
206 28 : } else if s.len() == 37 {
207 28 : let bytes = s.as_bytes();
208 28 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
209 28 : let mut shard_parts: [u8; 2] = [0u8; 2];
210 28 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
211 28 : Ok(Self {
212 28 : tenant_id,
213 28 : shard_number: ShardNumber(shard_parts[0]),
214 28 : shard_count: ShardCount(shard_parts[1]),
215 28 : })
216 : } else {
217 0 : Err(hex::FromHexError::InvalidStringLength)
218 : }
219 3981 : }
220 : }
221 :
222 : impl From<[u8; 18]> for TenantShardId {
223 4 : fn from(b: [u8; 18]) -> Self {
224 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
225 4 :
226 4 : Self {
227 4 : tenant_id: TenantId::from(tenant_id_bytes),
228 4 : shard_number: ShardNumber(b[16]),
229 4 : shard_count: ShardCount(b[17]),
230 4 : }
231 4 : }
232 : }
233 :
234 : impl ShardIndex {
235 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
236 0 : Self {
237 0 : shard_number: number,
238 0 : shard_count: count,
239 0 : }
240 0 : }
241 86 : pub fn unsharded() -> Self {
242 86 : Self {
243 86 : shard_number: ShardNumber(0),
244 86 : shard_count: ShardCount(0),
245 86 : }
246 86 : }
247 :
248 : /// The "unsharded" value is distinct from simply having a single shard: it represents
249 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
250 : /// a shard suffix.
251 71817 : pub fn is_unsharded(&self) -> bool {
252 71817 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
253 71817 : }
254 :
255 : /// For use in constructing remote storage paths: concatenate this with a TenantId
256 : /// to get a fully qualified TenantShardId.
257 : ///
258 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
259 : /// that the legacy pre-sharding remote key format is preserved.
260 1353 : pub fn get_suffix(&self) -> String {
261 1353 : if self.is_unsharded() {
262 1345 : "".to_string()
263 : } else {
264 8 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
265 : }
266 1353 : }
267 : }
268 :
269 : impl std::fmt::Display for ShardIndex {
270 1938 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 1938 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
272 1938 : }
273 : }
274 :
275 : impl std::fmt::Debug for ShardIndex {
276 1484 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 1484 : // Debug is the same as Display: the compact hex representation
278 1484 : write!(f, "{}", self)
279 1484 : }
280 : }
281 :
282 : impl std::str::FromStr for ShardIndex {
283 : type Err = hex::FromHexError;
284 :
285 2 : fn from_str(s: &str) -> Result<Self, Self::Err> {
286 2 : // Expect format: 1 byte shard number, 1 byte shard count
287 2 : if s.len() == 4 {
288 2 : let bytes = s.as_bytes();
289 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
290 2 : hex::decode_to_slice(bytes, &mut shard_parts)?;
291 2 : Ok(Self {
292 2 : shard_number: ShardNumber(shard_parts[0]),
293 2 : shard_count: ShardCount(shard_parts[1]),
294 2 : })
295 : } else {
296 0 : Err(hex::FromHexError::InvalidStringLength)
297 : }
298 2 : }
299 : }
300 :
301 : impl From<[u8; 2]> for ShardIndex {
302 2 : fn from(b: [u8; 2]) -> Self {
303 2 : Self {
304 2 : shard_number: ShardNumber(b[0]),
305 2 : shard_count: ShardCount(b[1]),
306 2 : }
307 2 : }
308 : }
309 :
310 : impl Serialize for TenantShardId {
311 52 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
312 52 : where
313 52 : S: serde::Serializer,
314 52 : {
315 52 : if serializer.is_human_readable() {
316 44 : serializer.collect_str(self)
317 : } else {
318 : // Note: while human encoding of [`TenantShardId`] is backward and forward
319 : // compatible, this binary encoding is not.
320 8 : let mut packed: [u8; 18] = [0; 18];
321 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
322 8 : packed[16] = self.shard_number.0;
323 8 : packed[17] = self.shard_count.0;
324 8 :
325 8 : packed.serialize(serializer)
326 : }
327 52 : }
328 : }
329 :
330 : impl<'de> Deserialize<'de> for TenantShardId {
331 12 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
332 12 : where
333 12 : D: serde::Deserializer<'de>,
334 12 : {
335 12 : struct IdVisitor {
336 12 : is_human_readable_deserializer: bool,
337 12 : }
338 12 :
339 12 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
340 12 : type Value = TenantShardId;
341 12 :
342 12 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
343 0 : if self.is_human_readable_deserializer {
344 12 : formatter.write_str("value in form of hex string")
345 12 : } else {
346 12 : formatter.write_str("value in form of integer array([u8; 18])")
347 12 : }
348 12 : }
349 12 :
350 12 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
351 4 : where
352 4 : A: serde::de::SeqAccess<'de>,
353 4 : {
354 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
355 12 : let id: [u8; 18] = Deserialize::deserialize(s)?;
356 12 : Ok(TenantShardId::from(id))
357 12 : }
358 12 :
359 12 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
360 8 : where
361 8 : E: serde::de::Error,
362 8 : {
363 8 : TenantShardId::from_str(v).map_err(E::custom)
364 8 : }
365 12 : }
366 12 :
367 12 : if deserializer.is_human_readable() {
368 8 : deserializer.deserialize_str(IdVisitor {
369 8 : is_human_readable_deserializer: true,
370 8 : })
371 : } else {
372 4 : deserializer.deserialize_tuple(
373 4 : 18,
374 4 : IdVisitor {
375 4 : is_human_readable_deserializer: false,
376 4 : },
377 4 : )
378 : }
379 12 : }
380 : }
381 :
382 : impl Serialize for ShardIndex {
383 20 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
384 20 : where
385 20 : S: serde::Serializer,
386 20 : {
387 20 : if serializer.is_human_readable() {
388 16 : serializer.collect_str(self)
389 : } else {
390 : // Binary encoding is not used in index_part.json, but is included in anticipation of
391 : // switching various structures (e.g. inter-process communication, remote metadata) to more
392 : // compact binary encodings in future.
393 4 : let mut packed: [u8; 2] = [0; 2];
394 4 : packed[0] = self.shard_number.0;
395 4 : packed[1] = self.shard_count.0;
396 4 : packed.serialize(serializer)
397 : }
398 20 : }
399 : }
400 :
401 : impl<'de> Deserialize<'de> for ShardIndex {
402 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
403 2 : where
404 2 : D: serde::Deserializer<'de>,
405 2 : {
406 2 : struct IdVisitor {
407 2 : is_human_readable_deserializer: bool,
408 2 : }
409 2 :
410 2 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
411 2 : type Value = ShardIndex;
412 2 :
413 2 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
414 0 : if self.is_human_readable_deserializer {
415 2 : formatter.write_str("value in form of hex string")
416 2 : } else {
417 2 : formatter.write_str("value in form of integer array([u8; 2])")
418 2 : }
419 2 : }
420 2 :
421 2 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
422 2 : where
423 2 : A: serde::de::SeqAccess<'de>,
424 2 : {
425 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
426 2 : let id: [u8; 2] = Deserialize::deserialize(s)?;
427 2 : Ok(ShardIndex::from(id))
428 2 : }
429 2 :
430 2 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
431 0 : where
432 0 : E: serde::de::Error,
433 0 : {
434 0 : ShardIndex::from_str(v).map_err(E::custom)
435 0 : }
436 2 : }
437 2 :
438 2 : if deserializer.is_human_readable() {
439 0 : deserializer.deserialize_str(IdVisitor {
440 0 : is_human_readable_deserializer: true,
441 0 : })
442 : } else {
443 2 : deserializer.deserialize_tuple(
444 2 : 2,
445 2 : IdVisitor {
446 2 : is_human_readable_deserializer: false,
447 2 : },
448 2 : )
449 : }
450 2 : }
451 : }
|