Line data Source code
1 : //! See `pageserver_api::shard` for description on sharding.
2 :
3 : use std::{ops::RangeInclusive, str::FromStr};
4 :
5 : use hex::FromHex;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use crate::id::TenantId;
9 :
10 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
11 : pub struct ShardNumber(pub u8);
12 :
13 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
14 : pub struct ShardCount(pub u8);
15 :
16 : /// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
17 : /// when we need to know which shard we're dealing with, but do not need to know the full
18 : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
19 : /// the fully qualified TenantShardId.
20 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
21 : pub struct ShardIndex {
22 : pub shard_number: ShardNumber,
23 : pub shard_count: ShardCount,
24 : }
25 :
26 : /// Formatting helper, for generating the `shard_id` label in traces.
27 : pub struct ShardSlug<'a>(&'a TenantShardId);
28 :
29 : /// TenantShardId globally identifies a particular shard in a particular tenant.
30 : ///
31 : /// These are written as `<TenantId>-<ShardSlug>`, for example:
32 : /// # The second shard in a two-shard tenant
33 : /// 072f1291a5310026820b2fe4b2968934-0102
34 : ///
35 : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
36 : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
37 : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
38 : ///
39 : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
40 : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
41 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
42 : /// as a TenantId.
43 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
44 : pub struct TenantShardId {
45 : pub tenant_id: TenantId,
46 : pub shard_number: ShardNumber,
47 : pub shard_count: ShardCount,
48 : }
49 :
50 : impl ShardCount {
51 : pub const MAX: Self = Self(u8::MAX);
52 : pub const MIN: Self = Self(0);
53 :
54 : /// The internal value of a ShardCount may be zero, which means "1 shard, but use
55 : /// legacy format for TenantShardId that excludes the shard suffix", also known
56 : /// as [`TenantShardId::unsharded`].
57 : ///
58 : /// This method returns the actual number of shards, i.e. if our internal value is
59 : /// zero, we return 1 (unsharded tenants have 1 shard).
60 4804222 : pub fn count(&self) -> u8 {
61 4804222 : if self.0 > 0 {
62 12 : self.0
63 : } else {
64 4804210 : 1
65 : }
66 4804222 : }
67 :
68 : /// The literal internal value: this is **not** the number of shards in the
69 : /// tenant, as we have a special zero value for legacy unsharded tenants. Use
70 : /// [`Self::count`] if you want to know the cardinality of shards.
71 4 : pub fn literal(&self) -> u8 {
72 4 : self.0
73 4 : }
74 :
75 : /// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
76 : /// uses the legacy format for `TenantShardId`. See also the documentation for
77 : /// [`Self::count`].
78 0 : pub fn is_unsharded(&self) -> bool {
79 0 : self.0 == 0
80 0 : }
81 :
82 : /// `v` may be zero, or the number of shards in the tenant. `v` is what
83 : /// [`Self::literal`] would return.
84 7154 : pub const fn new(val: u8) -> Self {
85 7154 : Self(val)
86 7154 : }
87 : }
88 :
89 : impl ShardNumber {
90 : pub const MAX: Self = Self(u8::MAX);
91 : }
92 :
93 : impl TenantShardId {
94 40 : pub fn unsharded(tenant_id: TenantId) -> Self {
95 40 : Self {
96 40 : tenant_id,
97 40 : shard_number: ShardNumber(0),
98 40 : shard_count: ShardCount(0),
99 40 : }
100 40 : }
101 :
102 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
103 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
104 0 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
105 0 : RangeInclusive::new(
106 0 : Self {
107 0 : tenant_id,
108 0 : shard_number: ShardNumber(0),
109 0 : shard_count: ShardCount(0),
110 0 : },
111 0 : Self {
112 0 : tenant_id,
113 0 : shard_number: ShardNumber::MAX,
114 0 : shard_count: ShardCount::MAX,
115 0 : },
116 0 : )
117 0 : }
118 :
119 10906 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
120 10906 : ShardSlug(self)
121 10906 : }
122 :
123 : /// Convenience for code that has special behavior on the 0th shard.
124 6 : pub fn is_shard_zero(&self) -> bool {
125 6 : self.shard_number == ShardNumber(0)
126 6 : }
127 :
128 : /// The "unsharded" value is distinct from simply having a single shard: it represents
129 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
130 : /// a shard suffix.
131 0 : pub fn is_unsharded(&self) -> bool {
132 0 : self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
133 0 : }
134 :
135 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
136 : /// is useful when logging from code that is already in a span that includes tenant ID, to
137 : /// keep messages reasonably terse.
138 0 : pub fn to_index(&self) -> ShardIndex {
139 0 : ShardIndex {
140 0 : shard_number: self.shard_number,
141 0 : shard_count: self.shard_count,
142 0 : }
143 0 : }
144 :
145 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
146 : /// the given number of shards.
147 8 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
148 8 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
149 8 : let mut child_shards = Vec::new();
150 32 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
151 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
152 : // so our child shards are the ones which the same keys would map to.
153 32 : if shard_number % effective_old_shard_count == self.shard_number.0 {
154 24 : child_shards.push(TenantShardId {
155 24 : tenant_id: self.tenant_id,
156 24 : shard_number: ShardNumber(shard_number),
157 24 : shard_count: new_shard_count,
158 24 : })
159 8 : }
160 : }
161 :
162 8 : child_shards
163 8 : }
164 : }
165 :
166 : impl<'a> std::fmt::Display for ShardSlug<'a> {
167 10906 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 10906 : write!(
169 10906 : f,
170 10906 : "{:02x}{:02x}",
171 10906 : self.0.shard_number.0, self.0.shard_count.0
172 10906 : )
173 10906 : }
174 : }
175 :
176 : impl std::fmt::Display for TenantShardId {
177 11874 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 11874 : if self.shard_count != ShardCount(0) {
179 112 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
180 : } else {
181 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
182 : // is distinct from the normal single shard case (shard count == 1).
183 11762 : self.tenant_id.fmt(f)
184 : }
185 11874 : }
186 : }
187 :
188 : impl std::fmt::Debug for TenantShardId {
189 3128 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 3128 : // Debug is the same as Display: the compact hex representation
191 3128 : write!(f, "{}", self)
192 3128 : }
193 : }
194 :
195 : impl std::str::FromStr for TenantShardId {
196 : type Err = hex::FromHexError;
197 :
198 4081 : fn from_str(s: &str) -> Result<Self, Self::Err> {
199 4081 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
200 4081 : if s.len() == 32 {
201 : // Legacy case: no shard specified
202 : Ok(Self {
203 4053 : tenant_id: TenantId::from_str(s)?,
204 4053 : shard_number: ShardNumber(0),
205 4053 : shard_count: ShardCount(0),
206 : })
207 28 : } else if s.len() == 37 {
208 28 : let bytes = s.as_bytes();
209 28 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
210 28 : let mut shard_parts: [u8; 2] = [0u8; 2];
211 28 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
212 28 : Ok(Self {
213 28 : tenant_id,
214 28 : shard_number: ShardNumber(shard_parts[0]),
215 28 : shard_count: ShardCount(shard_parts[1]),
216 28 : })
217 : } else {
218 0 : Err(hex::FromHexError::InvalidStringLength)
219 : }
220 4081 : }
221 : }
222 :
223 : impl From<[u8; 18]> for TenantShardId {
224 4 : fn from(b: [u8; 18]) -> Self {
225 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
226 4 :
227 4 : Self {
228 4 : tenant_id: TenantId::from(tenant_id_bytes),
229 4 : shard_number: ShardNumber(b[16]),
230 4 : shard_count: ShardCount(b[17]),
231 4 : }
232 4 : }
233 : }
234 :
235 : impl ShardIndex {
236 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
237 0 : Self {
238 0 : shard_number: number,
239 0 : shard_count: count,
240 0 : }
241 0 : }
242 94 : pub fn unsharded() -> Self {
243 94 : Self {
244 94 : shard_number: ShardNumber(0),
245 94 : shard_count: ShardCount(0),
246 94 : }
247 94 : }
248 :
249 : /// The "unsharded" value is distinct from simply having a single shard: it represents
250 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
251 : /// a shard suffix.
252 72155 : pub fn is_unsharded(&self) -> bool {
253 72155 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
254 72155 : }
255 :
256 : /// For use in constructing remote storage paths: concatenate this with a TenantId
257 : /// to get a fully qualified TenantShardId.
258 : ///
259 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
260 : /// that the legacy pre-sharding remote key format is preserved.
261 1383 : pub fn get_suffix(&self) -> String {
262 1383 : if self.is_unsharded() {
263 1375 : "".to_string()
264 : } else {
265 8 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
266 : }
267 1383 : }
268 : }
269 :
270 : impl std::fmt::Display for ShardIndex {
271 1990 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 1990 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
273 1990 : }
274 : }
275 :
276 : impl std::fmt::Debug for ShardIndex {
277 1516 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 1516 : // Debug is the same as Display: the compact hex representation
279 1516 : write!(f, "{}", self)
280 1516 : }
281 : }
282 :
283 : impl std::str::FromStr for ShardIndex {
284 : type Err = hex::FromHexError;
285 :
286 3130 : fn from_str(s: &str) -> Result<Self, Self::Err> {
287 3130 : // Expect format: 1 byte shard number, 1 byte shard count
288 3130 : if s.len() == 4 {
289 3130 : let bytes = s.as_bytes();
290 3130 : let mut shard_parts: [u8; 2] = [0u8; 2];
291 3130 : hex::decode_to_slice(bytes, &mut shard_parts)?;
292 3130 : Ok(Self {
293 3130 : shard_number: ShardNumber(shard_parts[0]),
294 3130 : shard_count: ShardCount(shard_parts[1]),
295 3130 : })
296 : } else {
297 0 : Err(hex::FromHexError::InvalidStringLength)
298 : }
299 3130 : }
300 : }
301 :
302 : impl From<[u8; 2]> for ShardIndex {
303 2 : fn from(b: [u8; 2]) -> Self {
304 2 : Self {
305 2 : shard_number: ShardNumber(b[0]),
306 2 : shard_count: ShardCount(b[1]),
307 2 : }
308 2 : }
309 : }
310 :
311 : impl Serialize for TenantShardId {
312 52 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
313 52 : where
314 52 : S: serde::Serializer,
315 52 : {
316 52 : if serializer.is_human_readable() {
317 44 : serializer.collect_str(self)
318 : } else {
319 : // Note: while human encoding of [`TenantShardId`] is backward and forward
320 : // compatible, this binary encoding is not.
321 8 : let mut packed: [u8; 18] = [0; 18];
322 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
323 8 : packed[16] = self.shard_number.0;
324 8 : packed[17] = self.shard_count.0;
325 8 :
326 8 : packed.serialize(serializer)
327 : }
328 52 : }
329 : }
330 :
331 : impl<'de> Deserialize<'de> for TenantShardId {
332 12 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
333 12 : where
334 12 : D: serde::Deserializer<'de>,
335 12 : {
336 12 : struct IdVisitor {
337 12 : is_human_readable_deserializer: bool,
338 12 : }
339 12 :
340 12 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
341 12 : type Value = TenantShardId;
342 12 :
343 12 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
344 0 : if self.is_human_readable_deserializer {
345 12 : formatter.write_str("value in form of hex string")
346 12 : } else {
347 12 : formatter.write_str("value in form of integer array([u8; 18])")
348 12 : }
349 12 : }
350 12 :
351 12 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
352 4 : where
353 4 : A: serde::de::SeqAccess<'de>,
354 4 : {
355 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
356 12 : let id: [u8; 18] = Deserialize::deserialize(s)?;
357 12 : Ok(TenantShardId::from(id))
358 12 : }
359 12 :
360 12 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
361 8 : where
362 8 : E: serde::de::Error,
363 8 : {
364 8 : TenantShardId::from_str(v).map_err(E::custom)
365 8 : }
366 12 : }
367 12 :
368 12 : if deserializer.is_human_readable() {
369 8 : deserializer.deserialize_str(IdVisitor {
370 8 : is_human_readable_deserializer: true,
371 8 : })
372 : } else {
373 4 : deserializer.deserialize_tuple(
374 4 : 18,
375 4 : IdVisitor {
376 4 : is_human_readable_deserializer: false,
377 4 : },
378 4 : )
379 : }
380 12 : }
381 : }
382 :
383 : impl Serialize for ShardIndex {
384 20 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
385 20 : where
386 20 : S: serde::Serializer,
387 20 : {
388 20 : if serializer.is_human_readable() {
389 16 : serializer.collect_str(self)
390 : } else {
391 : // Binary encoding is not used in index_part.json, but is included in anticipation of
392 : // switching various structures (e.g. inter-process communication, remote metadata) to more
393 : // compact binary encodings in future.
394 4 : let mut packed: [u8; 2] = [0; 2];
395 4 : packed[0] = self.shard_number.0;
396 4 : packed[1] = self.shard_count.0;
397 4 : packed.serialize(serializer)
398 : }
399 20 : }
400 : }
401 :
402 : impl<'de> Deserialize<'de> for ShardIndex {
403 3130 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
404 3130 : where
405 3130 : D: serde::Deserializer<'de>,
406 3130 : {
407 3130 : struct IdVisitor {
408 3130 : is_human_readable_deserializer: bool,
409 3130 : }
410 3130 :
411 3130 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
412 3130 : type Value = ShardIndex;
413 3130 :
414 3130 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
415 0 : if self.is_human_readable_deserializer {
416 3130 : formatter.write_str("value in form of hex string")
417 3130 : } else {
418 3130 : formatter.write_str("value in form of integer array([u8; 2])")
419 3130 : }
420 3130 : }
421 3130 :
422 3130 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
423 2 : where
424 2 : A: serde::de::SeqAccess<'de>,
425 2 : {
426 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
427 3130 : let id: [u8; 2] = Deserialize::deserialize(s)?;
428 3130 : Ok(ShardIndex::from(id))
429 3130 : }
430 3130 :
431 3130 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
432 3128 : where
433 3128 : E: serde::de::Error,
434 3128 : {
435 3128 : ShardIndex::from_str(v).map_err(E::custom)
436 3128 : }
437 3130 : }
438 3130 :
439 3130 : if deserializer.is_human_readable() {
440 3128 : deserializer.deserialize_str(IdVisitor {
441 3128 : is_human_readable_deserializer: true,
442 3128 : })
443 : } else {
444 2 : deserializer.deserialize_tuple(
445 2 : 2,
446 2 : IdVisitor {
447 2 : is_human_readable_deserializer: false,
448 2 : },
449 2 : )
450 : }
451 3130 : }
452 : }
|