Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::{
4 : key::{is_rel_block_key, Key},
5 : models::ShardParameters,
6 : };
7 : use hex::FromHex;
8 : use postgres_ffi::relfile_utils::INIT_FORKNUM;
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::TenantId;
11 :
12 : /// See docs/rfcs/031-sharding-static.md for an overview of sharding.
13 : ///
14 : /// This module contains a variety of types used to represent the concept of sharding
15 : /// a Neon tenant across multiple physical shards. Since there are quite a few of these,
16 : /// we provide an summary here.
17 : ///
18 : /// Types used to describe shards:
19 : /// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
20 : /// which identifies a tenant which is not shard-aware. This means its storage paths do not include
21 : /// a shard suffix.
22 : /// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
23 : /// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
24 : /// without the tenant ID. This is useful for things that are implicitly scoped to a particular
25 : /// tenant, such as layer files.
26 : /// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
27 : /// detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
28 : /// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
29 : /// four hex digits. An unsharded tenant is `0000`.
30 : /// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
31 : ///
32 : /// Types used to describe the parameters for data distribution in a sharded tenant:
33 : /// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
34 : /// multiple shards. Its value is given in 8kiB pages.
35 : /// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
36 : /// always zero: this is provided for future upgrades that might introduce different
37 : /// data distribution schemes.
38 : ///
39 : /// Examples:
40 : /// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
41 : /// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
42 : /// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
43 : /// and their slugs are 0004, 0104, 0204, and 0304.
44 :
45 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
46 : pub struct ShardNumber(pub u8);
47 :
48 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
49 : pub struct ShardCount(u8);
50 :
51 : /// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
52 : /// when we need to know which shard we're dealing with, but do not need to know the full
53 : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
54 : /// the fully qualified TenantShardId.
55 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
56 : pub struct ShardIndex {
57 : pub shard_number: ShardNumber,
58 : pub shard_count: ShardCount,
59 : }
60 :
61 : /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
62 : /// and to check whether that [`ShardNumber`] is the same as the current shard.
63 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
64 : pub struct ShardIdentity {
65 : pub number: ShardNumber,
66 : pub count: ShardCount,
67 : pub stripe_size: ShardStripeSize,
68 : layout: ShardLayout,
69 : }
70 :
71 : /// Formatting helper, for generating the `shard_id` label in traces.
72 : struct ShardSlug<'a>(&'a TenantShardId);
73 :
74 : /// TenantShardId globally identifies a particular shard in a particular tenant.
75 : ///
76 : /// These are written as `<TenantId>-<ShardSlug>`, for example:
77 : /// # The second shard in a two-shard tenant
78 : /// 072f1291a5310026820b2fe4b2968934-0102
79 : ///
80 : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
81 : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
82 : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
83 : ///
84 : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
85 : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
86 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
87 : /// as a TenantId.
88 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
89 : pub struct TenantShardId {
90 : pub tenant_id: TenantId,
91 : pub shard_number: ShardNumber,
92 : pub shard_count: ShardCount,
93 : }
94 :
95 : impl ShardCount {
96 : pub const MAX: Self = Self(u8::MAX);
97 :
98 : /// The internal value of a ShardCount may be zero, which means "1 shard, but use
99 : /// legacy format for TenantShardId that excludes the shard suffix", also known
100 : /// as [`TenantShardId::unsharded`].
101 : ///
102 : /// This method returns the actual number of shards, i.e. if our internal value is
103 : /// zero, we return 1 (unsharded tenants have 1 shard).
104 1136 : pub fn count(&self) -> u8 {
105 1136 : if self.0 > 0 {
106 10 : self.0
107 : } else {
108 1126 : 1
109 : }
110 1136 : }
111 :
112 : /// The literal internal value: this is **not** the number of shards in the
113 : /// tenant, as we have a special zero value for legacy unsharded tenants. Use
114 : /// [`Self::count`] if you want to know the cardinality of shards.
115 4 : pub fn literal(&self) -> u8 {
116 4 : self.0
117 4 : }
118 :
119 : /// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
120 : /// uses the legacy format for `TenantShardId`. See also the documentation for
121 : /// [`Self::count`].
122 0 : pub fn is_unsharded(&self) -> bool {
123 0 : self.0 == 0
124 0 : }
125 :
126 : /// `v` may be zero, or the number of shards in the tenant. `v` is what
127 : /// [`Self::literal`] would return.
128 7066 : pub const fn new(val: u8) -> Self {
129 7066 : Self(val)
130 7066 : }
131 : }
132 :
133 : impl ShardNumber {
134 : pub const MAX: Self = Self(u8::MAX);
135 : }
136 :
137 : impl TenantShardId {
138 166 : pub fn unsharded(tenant_id: TenantId) -> Self {
139 166 : Self {
140 166 : tenant_id,
141 166 : shard_number: ShardNumber(0),
142 166 : shard_count: ShardCount(0),
143 166 : }
144 166 : }
145 :
146 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
147 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
148 0 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
149 0 : RangeInclusive::new(
150 0 : Self {
151 0 : tenant_id,
152 0 : shard_number: ShardNumber(0),
153 0 : shard_count: ShardCount(0),
154 0 : },
155 0 : Self {
156 0 : tenant_id,
157 0 : shard_number: ShardNumber::MAX,
158 0 : shard_count: ShardCount::MAX,
159 0 : },
160 0 : )
161 0 : }
162 :
163 9693 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
164 9693 : ShardSlug(self)
165 9693 : }
166 :
167 : /// Convenience for code that has special behavior on the 0th shard.
168 6 : pub fn is_shard_zero(&self) -> bool {
169 6 : self.shard_number == ShardNumber(0)
170 6 : }
171 :
172 : /// The "unsharded" value is distinct from simply having a single shard: it represents
173 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
174 : /// a shard suffix.
175 0 : pub fn is_unsharded(&self) -> bool {
176 0 : self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
177 0 : }
178 :
179 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
180 : /// is useful when logging from code that is already in a span that includes tenant ID, to
181 : /// keep messages reasonably terse.
182 0 : pub fn to_index(&self) -> ShardIndex {
183 0 : ShardIndex {
184 0 : shard_number: self.shard_number,
185 0 : shard_count: self.shard_count,
186 0 : }
187 0 : }
188 :
189 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
190 : /// the given number of shards.
191 8 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
192 8 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
193 8 : let mut child_shards = Vec::new();
194 32 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
195 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
196 : // so our child shards are the ones which the same keys would map to.
197 32 : if shard_number % effective_old_shard_count == self.shard_number.0 {
198 24 : child_shards.push(TenantShardId {
199 24 : tenant_id: self.tenant_id,
200 24 : shard_number: ShardNumber(shard_number),
201 24 : shard_count: new_shard_count,
202 24 : })
203 8 : }
204 : }
205 :
206 8 : child_shards
207 8 : }
208 : }
209 :
210 : impl<'a> std::fmt::Display for ShardSlug<'a> {
211 9693 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 9693 : write!(
213 9693 : f,
214 9693 : "{:02x}{:02x}",
215 9693 : self.0.shard_number.0, self.0.shard_count.0
216 9693 : )
217 9693 : }
218 : }
219 :
220 : impl std::fmt::Display for TenantShardId {
221 7717 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 7717 : if self.shard_count != ShardCount(0) {
223 2 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
224 : } else {
225 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
226 : // is distinct from the normal single shard case (shard count == 1).
227 7715 : self.tenant_id.fmt(f)
228 : }
229 7717 : }
230 : }
231 :
232 : impl std::fmt::Debug for TenantShardId {
233 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 0 : // Debug is the same as Display: the compact hex representation
235 0 : write!(f, "{}", self)
236 0 : }
237 : }
238 :
239 : impl std::str::FromStr for TenantShardId {
240 : type Err = hex::FromHexError;
241 :
242 3763 : fn from_str(s: &str) -> Result<Self, Self::Err> {
243 3763 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
244 3763 : if s.len() == 32 {
245 : // Legacy case: no shard specified
246 : Ok(Self {
247 3759 : tenant_id: TenantId::from_str(s)?,
248 3759 : shard_number: ShardNumber(0),
249 3759 : shard_count: ShardCount(0),
250 : })
251 4 : } else if s.len() == 37 {
252 4 : let bytes = s.as_bytes();
253 4 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
254 4 : let mut shard_parts: [u8; 2] = [0u8; 2];
255 4 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
256 4 : Ok(Self {
257 4 : tenant_id,
258 4 : shard_number: ShardNumber(shard_parts[0]),
259 4 : shard_count: ShardCount(shard_parts[1]),
260 4 : })
261 : } else {
262 0 : Err(hex::FromHexError::InvalidStringLength)
263 : }
264 3763 : }
265 : }
266 :
267 : impl From<[u8; 18]> for TenantShardId {
268 4 : fn from(b: [u8; 18]) -> Self {
269 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
270 4 :
271 4 : Self {
272 4 : tenant_id: TenantId::from(tenant_id_bytes),
273 4 : shard_number: ShardNumber(b[16]),
274 4 : shard_count: ShardCount(b[17]),
275 4 : }
276 4 : }
277 : }
278 :
279 : impl ShardIndex {
280 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
281 0 : Self {
282 0 : shard_number: number,
283 0 : shard_count: count,
284 0 : }
285 0 : }
286 208 : pub fn unsharded() -> Self {
287 208 : Self {
288 208 : shard_number: ShardNumber(0),
289 208 : shard_count: ShardCount(0),
290 208 : }
291 208 : }
292 :
293 : /// The "unsharded" value is distinct from simply having a single shard: it represents
294 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
295 : /// a shard suffix.
296 29194 : pub fn is_unsharded(&self) -> bool {
297 29194 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
298 29194 : }
299 :
300 : /// For use in constructing remote storage paths: concatenate this with a TenantId
301 : /// to get a fully qualified TenantShardId.
302 : ///
303 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
304 : /// that the legacy pre-sharding remote key format is preserved.
305 1202 : pub fn get_suffix(&self) -> String {
306 1202 : if self.is_unsharded() {
307 1202 : "".to_string()
308 : } else {
309 0 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
310 : }
311 1202 : }
312 : }
313 :
314 : impl std::fmt::Display for ShardIndex {
315 1629 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
316 1629 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
317 1629 : }
318 : }
319 :
320 : impl std::fmt::Debug for ShardIndex {
321 1290 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 1290 : // Debug is the same as Display: the compact hex representation
323 1290 : write!(f, "{}", self)
324 1290 : }
325 : }
326 :
327 : impl std::str::FromStr for ShardIndex {
328 : type Err = hex::FromHexError;
329 :
330 2 : fn from_str(s: &str) -> Result<Self, Self::Err> {
331 2 : // Expect format: 1 byte shard number, 1 byte shard count
332 2 : if s.len() == 4 {
333 2 : let bytes = s.as_bytes();
334 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
335 2 : hex::decode_to_slice(bytes, &mut shard_parts)?;
336 2 : Ok(Self {
337 2 : shard_number: ShardNumber(shard_parts[0]),
338 2 : shard_count: ShardCount(shard_parts[1]),
339 2 : })
340 : } else {
341 0 : Err(hex::FromHexError::InvalidStringLength)
342 : }
343 2 : }
344 : }
345 :
346 : impl From<[u8; 2]> for ShardIndex {
347 2 : fn from(b: [u8; 2]) -> Self {
348 2 : Self {
349 2 : shard_number: ShardNumber(b[0]),
350 2 : shard_count: ShardCount(b[1]),
351 2 : }
352 2 : }
353 : }
354 :
355 : impl Serialize for TenantShardId {
356 52 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
357 52 : where
358 52 : S: serde::Serializer,
359 52 : {
360 52 : if serializer.is_human_readable() {
361 44 : serializer.collect_str(self)
362 : } else {
363 : // Note: while human encoding of [`TenantShardId`] is backward and forward
364 : // compatible, this binary encoding is not.
365 8 : let mut packed: [u8; 18] = [0; 18];
366 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
367 8 : packed[16] = self.shard_number.0;
368 8 : packed[17] = self.shard_count.0;
369 8 :
370 8 : packed.serialize(serializer)
371 : }
372 52 : }
373 : }
374 :
375 : impl<'de> Deserialize<'de> for TenantShardId {
376 12 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
377 12 : where
378 12 : D: serde::Deserializer<'de>,
379 12 : {
380 12 : struct IdVisitor {
381 12 : is_human_readable_deserializer: bool,
382 12 : }
383 12 :
384 12 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
385 12 : type Value = TenantShardId;
386 12 :
387 12 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
388 0 : if self.is_human_readable_deserializer {
389 12 : formatter.write_str("value in form of hex string")
390 12 : } else {
391 12 : formatter.write_str("value in form of integer array([u8; 18])")
392 12 : }
393 12 : }
394 12 :
395 12 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
396 4 : where
397 4 : A: serde::de::SeqAccess<'de>,
398 4 : {
399 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
400 12 : let id: [u8; 18] = Deserialize::deserialize(s)?;
401 12 : Ok(TenantShardId::from(id))
402 12 : }
403 12 :
404 12 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
405 8 : where
406 8 : E: serde::de::Error,
407 8 : {
408 8 : TenantShardId::from_str(v).map_err(E::custom)
409 8 : }
410 12 : }
411 12 :
412 12 : if deserializer.is_human_readable() {
413 8 : deserializer.deserialize_str(IdVisitor {
414 8 : is_human_readable_deserializer: true,
415 8 : })
416 : } else {
417 4 : deserializer.deserialize_tuple(
418 4 : 18,
419 4 : IdVisitor {
420 4 : is_human_readable_deserializer: false,
421 4 : },
422 4 : )
423 : }
424 12 : }
425 : }
426 :
427 : /// Stripe size in number of pages
428 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
429 : pub struct ShardStripeSize(pub u32);
430 :
431 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
432 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
433 : pub struct ShardLayout(u8);
434 :
435 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
436 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
437 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
438 :
439 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
440 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
441 :
442 0 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
443 : pub enum ShardConfigError {
444 : #[error("Invalid shard count")]
445 : InvalidCount,
446 : #[error("Invalid shard number")]
447 : InvalidNumber,
448 : #[error("Invalid stripe size")]
449 : InvalidStripeSize,
450 : }
451 :
452 : impl ShardIdentity {
453 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
454 : /// tenants. Modern single-shard tenants should not use this: they should
455 : /// have number=0 count=1.
456 1174 : pub const fn unsharded() -> Self {
457 1174 : Self {
458 1174 : number: ShardNumber(0),
459 1174 : count: ShardCount(0),
460 1174 : layout: LAYOUT_V1,
461 1174 : stripe_size: DEFAULT_STRIPE_SIZE,
462 1174 : }
463 1174 : }
464 :
465 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
466 : /// which are constructed in code paths that don't have access to proper configuration.
467 : ///
468 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
469 : /// Enforcement is via assertions, to avoid making our interface fallible for this
470 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
471 : /// state, and by extension to avoid trying to do any page->shard resolution.
472 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
473 0 : Self {
474 0 : number,
475 0 : count,
476 0 : layout: LAYOUT_BROKEN,
477 0 : stripe_size: DEFAULT_STRIPE_SIZE,
478 0 : }
479 0 : }
480 :
481 : /// The "unsharded" value is distinct from simply having a single shard: it represents
482 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
483 : /// a shard suffix.
484 0 : pub fn is_unsharded(&self) -> bool {
485 0 : self.number == ShardNumber(0) && self.count == ShardCount(0)
486 0 : }
487 :
488 : /// Count must be nonzero, and number must be < count. To construct
489 : /// the legacy case (count==0), use Self::unsharded instead.
490 1024 : pub fn new(
491 1024 : number: ShardNumber,
492 1024 : count: ShardCount,
493 1024 : stripe_size: ShardStripeSize,
494 1024 : ) -> Result<Self, ShardConfigError> {
495 1024 : if count.0 == 0 {
496 2 : Err(ShardConfigError::InvalidCount)
497 1022 : } else if number.0 > count.0 - 1 {
498 6 : Err(ShardConfigError::InvalidNumber)
499 1016 : } else if stripe_size.0 == 0 {
500 2 : Err(ShardConfigError::InvalidStripeSize)
501 : } else {
502 1014 : Ok(Self {
503 1014 : number,
504 1014 : count,
505 1014 : layout: LAYOUT_V1,
506 1014 : stripe_size,
507 1014 : })
508 : }
509 1024 : }
510 :
511 : /// For use when creating ShardIdentity instances for new shards, where a creation request
512 : /// specifies the ShardParameters that apply to all shards.
513 138 : pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
514 138 : Self {
515 138 : number,
516 138 : count: params.count,
517 138 : layout: LAYOUT_V1,
518 138 : stripe_size: params.stripe_size,
519 138 : }
520 138 : }
521 :
522 177906 : fn is_broken(&self) -> bool {
523 177906 : self.layout == LAYOUT_BROKEN
524 177906 : }
525 :
526 3130 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
527 3130 : assert!(!self.is_broken());
528 3130 : key_to_shard_number(self.count, self.stripe_size, key)
529 3130 : }
530 :
531 : /// Return true if the key should be ingested by this shard
532 : ///
533 : /// Shards must ingest _at least_ keys which return true from this check.
534 174776 : pub fn is_key_local(&self, key: &Key) -> bool {
535 174776 : assert!(!self.is_broken());
536 174776 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
537 171824 : true
538 : } else {
539 2952 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
540 : }
541 174776 : }
542 :
543 : /// Return true if the key should be discarded if found in this shard's
544 : /// data store, e.g. during compaction after a split.
545 : ///
546 : /// Shards _may_ drop keys which return false here, but are not obliged to.
547 2696809 : pub fn is_key_disposable(&self, key: &Key) -> bool {
548 2696809 : if key_is_shard0(key) {
549 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
550 : // A1: because the WAL ingestion logic currently ingests some shard 0
551 : // content on all shards, even though it's only read on shard 0. If we
552 : // dropped it, then subsequent WAL ingest to these keys would encounter
553 : // an error.
554 : // A2: because key_is_shard0 also covers relation size keys, which are written
555 : // on all shards even though they're only maintained accurately on shard 0.
556 2673115 : false
557 : } else {
558 23694 : !self.is_key_local(key)
559 : }
560 2696809 : }
561 :
562 0 : pub fn shard_slug(&self) -> String {
563 0 : if self.count > ShardCount(0) {
564 0 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
565 : } else {
566 0 : String::new()
567 : }
568 0 : }
569 :
570 : /// Convenience for checking if this identity is the 0th shard in a tenant,
571 : /// for special cases on shard 0 such as ingesting relation sizes.
572 0 : pub fn is_shard_zero(&self) -> bool {
573 0 : self.number == ShardNumber(0)
574 0 : }
575 : }
576 :
577 : impl Serialize for ShardIndex {
578 4 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
579 4 : where
580 4 : S: serde::Serializer,
581 4 : {
582 4 : if serializer.is_human_readable() {
583 0 : serializer.collect_str(self)
584 : } else {
585 : // Binary encoding is not used in index_part.json, but is included in anticipation of
586 : // switching various structures (e.g. inter-process communication, remote metadata) to more
587 : // compact binary encodings in future.
588 4 : let mut packed: [u8; 2] = [0; 2];
589 4 : packed[0] = self.shard_number.0;
590 4 : packed[1] = self.shard_count.0;
591 4 : packed.serialize(serializer)
592 : }
593 4 : }
594 : }
595 :
596 : impl<'de> Deserialize<'de> for ShardIndex {
597 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
598 2 : where
599 2 : D: serde::Deserializer<'de>,
600 2 : {
601 2 : struct IdVisitor {
602 2 : is_human_readable_deserializer: bool,
603 2 : }
604 2 :
605 2 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
606 2 : type Value = ShardIndex;
607 2 :
608 2 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
609 0 : if self.is_human_readable_deserializer {
610 2 : formatter.write_str("value in form of hex string")
611 2 : } else {
612 2 : formatter.write_str("value in form of integer array([u8; 2])")
613 2 : }
614 2 : }
615 2 :
616 2 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
617 2 : where
618 2 : A: serde::de::SeqAccess<'de>,
619 2 : {
620 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
621 2 : let id: [u8; 2] = Deserialize::deserialize(s)?;
622 2 : Ok(ShardIndex::from(id))
623 2 : }
624 2 :
625 2 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
626 0 : where
627 0 : E: serde::de::Error,
628 0 : {
629 0 : ShardIndex::from_str(v).map_err(E::custom)
630 0 : }
631 2 : }
632 2 :
633 2 : if deserializer.is_human_readable() {
634 0 : deserializer.deserialize_str(IdVisitor {
635 0 : is_human_readable_deserializer: true,
636 0 : })
637 : } else {
638 2 : deserializer.deserialize_tuple(
639 2 : 2,
640 2 : IdVisitor {
641 2 : is_human_readable_deserializer: false,
642 2 : },
643 2 : )
644 : }
645 2 : }
646 : }
647 :
648 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
649 : /// in order to be able to serve basebackup requests without peer communication).
650 2702715 : fn key_is_shard0(key: &Key) -> bool {
651 2702715 : // To decide what to shard out to shards >0, we apply a simple rule that only
652 2702715 : // relation pages are distributed to shards other than shard zero. Everything else gets
653 2702715 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
654 2702715 : // requests, and any request other than those for particular blocks in relations.
655 2702715 : //
656 2702715 : // The only exception to this rule is "initfork" data -- this relates to postgres's UNLOGGED table
657 2702715 : // type. These are special relations, usually with only 0 or 1 blocks, and we store them on shard 0
658 2702715 : // because they must be included in basebackups.
659 2702715 : let is_initfork = key.field5 == INIT_FORKNUM;
660 2702715 :
661 2702715 : !is_rel_block_key(key) || is_initfork
662 2702715 : }
663 :
664 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
665 5910 : fn murmurhash32(mut h: u32) -> u32 {
666 5910 : h ^= h >> 16;
667 5910 : h = h.wrapping_mul(0x85ebca6b);
668 5910 : h ^= h >> 13;
669 5910 : h = h.wrapping_mul(0xc2b2ae35);
670 5910 : h ^= h >> 16;
671 5910 : h
672 5910 : }
673 :
674 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
675 2956 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
676 2956 : b = b.wrapping_add(0x9e3779b9);
677 2956 : b = b.wrapping_add(a << 6);
678 2956 : b = b.wrapping_add(a >> 2);
679 2956 :
680 2956 : a ^= b;
681 2956 : a
682 2956 : }
683 :
684 : /// Where a Key is to be distributed across shards, select the shard. This function
685 : /// does not account for keys that should be broadcast across shards.
686 : ///
687 : /// The hashing in this function must exactly match what we do in postgres smgr
688 : /// code. The resulting distribution of pages is intended to preserve locality within
689 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
690 : /// distributing data pseudo-randomly.
691 : ///
692 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
693 : /// and will be handled at higher levels when shards are split.
694 6084 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
695 6084 : // Fast path for un-sharded tenants or broadcast keys
696 6084 : if count < ShardCount(2) || key_is_shard0(key) {
697 3130 : return ShardNumber(0);
698 2954 : }
699 2954 :
700 2954 : // relNode
701 2954 : let mut hash = murmurhash32(key.field4);
702 2954 : // blockNum/stripe size
703 2954 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
704 2954 :
705 2954 : ShardNumber((hash % count.0 as u32) as u8)
706 6084 : }
707 :
708 : #[cfg(test)]
709 : mod tests {
710 : use utils::Hex;
711 :
712 : use super::*;
713 :
714 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
715 :
716 : #[test]
717 2 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
718 2 : let example = TenantShardId {
719 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
720 2 : shard_count: ShardCount(10),
721 2 : shard_number: ShardNumber(7),
722 2 : };
723 2 :
724 2 : let encoded = format!("{example}");
725 2 :
726 2 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
727 2 : assert_eq!(&encoded, &expected);
728 :
729 2 : let decoded = TenantShardId::from_str(&encoded)?;
730 :
731 2 : assert_eq!(example, decoded);
732 :
733 2 : Ok(())
734 2 : }
735 :
736 : #[test]
737 2 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
738 2 : let example = TenantShardId {
739 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
740 2 : shard_count: ShardCount(10),
741 2 : shard_number: ShardNumber(7),
742 2 : };
743 2 :
744 2 : let encoded = bincode::serialize(&example).unwrap();
745 2 : let expected: [u8; 18] = [
746 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
747 2 : 0xf6, 0xfc, 0x07, 0x0a,
748 2 : ];
749 2 : assert_eq!(Hex(&encoded), Hex(&expected));
750 :
751 2 : let decoded = bincode::deserialize(&encoded).unwrap();
752 2 :
753 2 : assert_eq!(example, decoded);
754 :
755 2 : Ok(())
756 2 : }
757 :
758 : #[test]
759 2 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
760 2 : // Test that TenantShardId can decode a TenantId in human
761 2 : // readable form
762 2 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
763 2 : let encoded = format!("{example}");
764 2 :
765 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
766 :
767 2 : let decoded = TenantShardId::from_str(&encoded)?;
768 :
769 2 : assert_eq!(example, decoded.tenant_id);
770 2 : assert_eq!(decoded.shard_count, ShardCount(0));
771 2 : assert_eq!(decoded.shard_number, ShardNumber(0));
772 :
773 2 : Ok(())
774 2 : }
775 :
776 : #[test]
777 2 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
778 2 : // Test that a legacy TenantShardId encodes into a form that
779 2 : // can be decoded as TenantId
780 2 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
781 2 : let example = TenantShardId::unsharded(example_tenant_id);
782 2 : let encoded = format!("{example}");
783 2 :
784 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
785 :
786 2 : let decoded = TenantId::from_str(&encoded)?;
787 :
788 2 : assert_eq!(example_tenant_id, decoded);
789 :
790 2 : Ok(())
791 2 : }
792 :
793 : #[test]
794 2 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
795 2 : // Unlike in human readable encoding, binary encoding does not
796 2 : // do any special handling of legacy unsharded TenantIds: this test
797 2 : // is equivalent to the main test for binary encoding, just verifying
798 2 : // that the same behavior applies when we have used `unsharded()` to
799 2 : // construct a TenantShardId.
800 2 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
801 2 : let encoded = bincode::serialize(&example).unwrap();
802 2 :
803 2 : let expected: [u8; 18] = [
804 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
805 2 : 0xf6, 0xfc, 0x00, 0x00,
806 2 : ];
807 2 : assert_eq!(Hex(&encoded), Hex(&expected));
808 :
809 2 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
810 2 : assert_eq!(example, decoded);
811 :
812 2 : Ok(())
813 2 : }
814 :
815 : #[test]
816 2 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
817 2 : // Happy cases
818 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
819 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
820 2 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
821 :
822 2 : assert_eq!(
823 2 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
824 2 : Err(ShardConfigError::InvalidCount)
825 2 : );
826 2 : assert_eq!(
827 2 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
828 2 : Err(ShardConfigError::InvalidNumber)
829 2 : );
830 2 : assert_eq!(
831 2 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
832 2 : Err(ShardConfigError::InvalidNumber)
833 2 : );
834 2 : assert_eq!(
835 2 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
836 2 : Err(ShardConfigError::InvalidNumber)
837 2 : );
838 2 : assert_eq!(
839 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
840 2 : Err(ShardConfigError::InvalidStripeSize)
841 2 : );
842 :
843 2 : Ok(())
844 2 : }
845 :
846 : #[test]
847 2 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
848 2 : let example = ShardIndex {
849 2 : shard_number: ShardNumber(13),
850 2 : shard_count: ShardCount(17),
851 2 : };
852 2 : let expected: String = "0d11".to_string();
853 2 : let encoded = format!("{example}");
854 2 : assert_eq!(&encoded, &expected);
855 :
856 2 : let decoded = ShardIndex::from_str(&encoded)?;
857 2 : assert_eq!(example, decoded);
858 2 : Ok(())
859 2 : }
860 :
861 : #[test]
862 2 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
863 2 : let example = ShardIndex {
864 2 : shard_number: ShardNumber(13),
865 2 : shard_count: ShardCount(17),
866 2 : };
867 2 : let expected: [u8; 2] = [0x0d, 0x11];
868 2 :
869 2 : let encoded = bincode::serialize(&example).unwrap();
870 2 : assert_eq!(Hex(&encoded), Hex(&expected));
871 2 : let decoded = bincode::deserialize(&encoded).unwrap();
872 2 : assert_eq!(example, decoded);
873 :
874 2 : Ok(())
875 2 : }
876 :
877 : // These are only smoke tests to spot check that our implementation doesn't
878 : // deviate from a few examples values: not aiming to validate the overall
879 : // hashing algorithm.
880 : #[test]
881 2 : fn murmur_hash() {
882 2 : assert_eq!(murmurhash32(0), 0);
883 :
884 2 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
885 2 : }
886 :
887 : #[test]
888 2 : fn shard_mapping() {
889 2 : let key = Key {
890 2 : field1: 0x00,
891 2 : field2: 0x67f,
892 2 : field3: 0x5,
893 2 : field4: 0x400c,
894 2 : field5: 0x00,
895 2 : field6: 0x7d06,
896 2 : };
897 2 :
898 2 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
899 2 : assert_eq!(shard, ShardNumber(8));
900 2 : }
901 :
902 : #[test]
903 2 : fn shard_id_split() {
904 2 : let tenant_id = TenantId::generate();
905 2 : let parent = TenantShardId::unsharded(tenant_id);
906 2 :
907 2 : // Unsharded into 2
908 2 : assert_eq!(
909 2 : parent.split(ShardCount(2)),
910 2 : vec![
911 2 : TenantShardId {
912 2 : tenant_id,
913 2 : shard_count: ShardCount(2),
914 2 : shard_number: ShardNumber(0)
915 2 : },
916 2 : TenantShardId {
917 2 : tenant_id,
918 2 : shard_count: ShardCount(2),
919 2 : shard_number: ShardNumber(1)
920 2 : }
921 2 : ]
922 2 : );
923 :
924 : // Unsharded into 4
925 2 : assert_eq!(
926 2 : parent.split(ShardCount(4)),
927 2 : vec![
928 2 : TenantShardId {
929 2 : tenant_id,
930 2 : shard_count: ShardCount(4),
931 2 : shard_number: ShardNumber(0)
932 2 : },
933 2 : TenantShardId {
934 2 : tenant_id,
935 2 : shard_count: ShardCount(4),
936 2 : shard_number: ShardNumber(1)
937 2 : },
938 2 : TenantShardId {
939 2 : tenant_id,
940 2 : shard_count: ShardCount(4),
941 2 : shard_number: ShardNumber(2)
942 2 : },
943 2 : TenantShardId {
944 2 : tenant_id,
945 2 : shard_count: ShardCount(4),
946 2 : shard_number: ShardNumber(3)
947 2 : }
948 2 : ]
949 2 : );
950 :
951 : // count=1 into 2 (check this works the same as unsharded.)
952 2 : let parent = TenantShardId {
953 2 : tenant_id,
954 2 : shard_count: ShardCount(1),
955 2 : shard_number: ShardNumber(0),
956 2 : };
957 2 : assert_eq!(
958 2 : parent.split(ShardCount(2)),
959 2 : vec![
960 2 : TenantShardId {
961 2 : tenant_id,
962 2 : shard_count: ShardCount(2),
963 2 : shard_number: ShardNumber(0)
964 2 : },
965 2 : TenantShardId {
966 2 : tenant_id,
967 2 : shard_count: ShardCount(2),
968 2 : shard_number: ShardNumber(1)
969 2 : }
970 2 : ]
971 2 : );
972 :
973 : // count=2 into count=8
974 2 : let parent = TenantShardId {
975 2 : tenant_id,
976 2 : shard_count: ShardCount(2),
977 2 : shard_number: ShardNumber(1),
978 2 : };
979 2 : assert_eq!(
980 2 : parent.split(ShardCount(8)),
981 2 : vec![
982 2 : TenantShardId {
983 2 : tenant_id,
984 2 : shard_count: ShardCount(8),
985 2 : shard_number: ShardNumber(1)
986 2 : },
987 2 : TenantShardId {
988 2 : tenant_id,
989 2 : shard_count: ShardCount(8),
990 2 : shard_number: ShardNumber(3)
991 2 : },
992 2 : TenantShardId {
993 2 : tenant_id,
994 2 : shard_count: ShardCount(8),
995 2 : shard_number: ShardNumber(5)
996 2 : },
997 2 : TenantShardId {
998 2 : tenant_id,
999 2 : shard_count: ShardCount(8),
1000 2 : shard_number: ShardNumber(7)
1001 2 : },
1002 2 : ]
1003 2 : );
1004 2 : }
1005 : }
|