Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::{key::Key, models::ShardParameters};
4 : use hex::FromHex;
5 : use postgres_ffi::relfile_utils::INIT_FORKNUM;
6 : use serde::{Deserialize, Serialize};
7 : use utils::id::TenantId;
8 :
9 : /// See docs/rfcs/031-sharding-static.md for an overview of sharding.
10 : ///
11 : /// This module contains a variety of types used to represent the concept of sharding
12 : /// a Neon tenant across multiple physical shards. Since there are quite a few of these,
13 : /// we provide an summary here.
14 : ///
15 : /// Types used to describe shards:
16 : /// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
17 : /// which identifies a tenant which is not shard-aware. This means its storage paths do not include
18 : /// a shard suffix.
19 : /// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
20 : /// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
21 : /// without the tenant ID. This is useful for things that are implicitly scoped to a particular
22 : /// tenant, such as layer files.
23 : /// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
24 : /// detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
25 : /// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
26 : /// four hex digits. An unsharded tenant is `0000`.
27 : /// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
28 : ///
29 : /// Types used to describe the parameters for data distribution in a sharded tenant:
30 : /// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
31 : /// multiple shards. Its value is given in 8kiB pages.
32 : /// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
33 : /// always zero: this is provided for future upgrades that might introduce different
34 : /// data distribution schemes.
35 : ///
36 : /// Examples:
37 : /// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
38 : /// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
39 : /// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
40 : /// and their slugs are 0004, 0104, 0204, and 0304.
41 :
42 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
43 : pub struct ShardNumber(pub u8);
44 :
45 0 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
46 : pub struct ShardCount(u8);
47 :
48 : /// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
49 : /// when we need to know which shard we're dealing with, but do not need to know the full
50 : /// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
51 : /// the fully qualified TenantShardId.
52 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
53 : pub struct ShardIndex {
54 : pub shard_number: ShardNumber,
55 : pub shard_count: ShardCount,
56 : }
57 :
58 : /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
59 : /// and to check whether that [`ShardNumber`] is the same as the current shard.
60 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
61 : pub struct ShardIdentity {
62 : pub number: ShardNumber,
63 : pub count: ShardCount,
64 : pub stripe_size: ShardStripeSize,
65 : layout: ShardLayout,
66 : }
67 :
68 : /// Formatting helper, for generating the `shard_id` label in traces.
69 : struct ShardSlug<'a>(&'a TenantShardId);
70 :
71 : /// TenantShardId globally identifies a particular shard in a particular tenant.
72 : ///
73 : /// These are written as `<TenantId>-<ShardSlug>`, for example:
74 : /// # The second shard in a two-shard tenant
75 : /// 072f1291a5310026820b2fe4b2968934-0102
76 : ///
77 : /// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
78 : /// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
79 : /// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
80 : ///
81 : /// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
82 : /// is both forward and backward compatible with TenantId: a legacy TenantId can be
83 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
84 : /// as a TenantId.
85 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
86 : pub struct TenantShardId {
87 : pub tenant_id: TenantId,
88 : pub shard_number: ShardNumber,
89 : pub shard_count: ShardCount,
90 : }
91 :
92 : impl ShardCount {
93 : pub const MAX: Self = Self(u8::MAX);
94 :
95 : /// The internal value of a ShardCount may be zero, which means "1 shard, but use
96 : /// legacy format for TenantShardId that excludes the shard suffix", also known
97 : /// as [`TenantShardId::unsharded`].
98 : ///
99 : /// This method returns the actual number of shards, i.e. if our internal value is
100 : /// zero, we return 1 (unsharded tenants have 1 shard).
101 4804211 : pub fn count(&self) -> u8 {
102 4804211 : if self.0 > 0 {
103 12 : self.0
104 : } else {
105 4804199 : 1
106 : }
107 4804211 : }
108 :
109 : /// The literal internal value: this is **not** the number of shards in the
110 : /// tenant, as we have a special zero value for legacy unsharded tenants. Use
111 : /// [`Self::count`] if you want to know the cardinality of shards.
112 4 : pub fn literal(&self) -> u8 {
113 4 : self.0
114 4 : }
115 :
116 : /// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
117 : /// uses the legacy format for `TenantShardId`. See also the documentation for
118 : /// [`Self::count`].
119 0 : pub fn is_unsharded(&self) -> bool {
120 0 : self.0 == 0
121 0 : }
122 :
123 : /// `v` may be zero, or the number of shards in the tenant. `v` is what
124 : /// [`Self::literal`] would return.
125 7119 : pub const fn new(val: u8) -> Self {
126 7119 : Self(val)
127 7119 : }
128 : }
129 :
130 : impl ShardNumber {
131 : pub const MAX: Self = Self(u8::MAX);
132 : }
133 :
134 : impl TenantShardId {
135 36 : pub fn unsharded(tenant_id: TenantId) -> Self {
136 36 : Self {
137 36 : tenant_id,
138 36 : shard_number: ShardNumber(0),
139 36 : shard_count: ShardCount(0),
140 36 : }
141 36 : }
142 :
143 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
144 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
145 0 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
146 0 : RangeInclusive::new(
147 0 : Self {
148 0 : tenant_id,
149 0 : shard_number: ShardNumber(0),
150 0 : shard_count: ShardCount(0),
151 0 : },
152 0 : Self {
153 0 : tenant_id,
154 0 : shard_number: ShardNumber::MAX,
155 0 : shard_count: ShardCount::MAX,
156 0 : },
157 0 : )
158 0 : }
159 :
160 10433 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
161 10433 : ShardSlug(self)
162 10433 : }
163 :
164 : /// Convenience for code that has special behavior on the 0th shard.
165 6 : pub fn is_shard_zero(&self) -> bool {
166 6 : self.shard_number == ShardNumber(0)
167 6 : }
168 :
169 : /// The "unsharded" value is distinct from simply having a single shard: it represents
170 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
171 : /// a shard suffix.
172 0 : pub fn is_unsharded(&self) -> bool {
173 0 : self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
174 0 : }
175 :
176 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
177 : /// is useful when logging from code that is already in a span that includes tenant ID, to
178 : /// keep messages reasonably terse.
179 0 : pub fn to_index(&self) -> ShardIndex {
180 0 : ShardIndex {
181 0 : shard_number: self.shard_number,
182 0 : shard_count: self.shard_count,
183 0 : }
184 0 : }
185 :
186 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
187 : /// the given number of shards.
188 8 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
189 8 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
190 8 : let mut child_shards = Vec::new();
191 32 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
192 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
193 : // so our child shards are the ones which the same keys would map to.
194 32 : if shard_number % effective_old_shard_count == self.shard_number.0 {
195 24 : child_shards.push(TenantShardId {
196 24 : tenant_id: self.tenant_id,
197 24 : shard_number: ShardNumber(shard_number),
198 24 : shard_count: new_shard_count,
199 24 : })
200 8 : }
201 : }
202 :
203 8 : child_shards
204 8 : }
205 : }
206 :
207 : impl<'a> std::fmt::Display for ShardSlug<'a> {
208 10433 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 10433 : write!(
210 10433 : f,
211 10433 : "{:02x}{:02x}",
212 10433 : self.0.shard_number.0, self.0.shard_count.0
213 10433 : )
214 10433 : }
215 : }
216 :
217 : impl std::fmt::Display for TenantShardId {
218 8350 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 8350 : if self.shard_count != ShardCount(0) {
220 112 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
221 : } else {
222 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
223 : // is distinct from the normal single shard case (shard count == 1).
224 8238 : self.tenant_id.fmt(f)
225 : }
226 8350 : }
227 : }
228 :
229 : impl std::fmt::Debug for TenantShardId {
230 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 0 : // Debug is the same as Display: the compact hex representation
232 0 : write!(f, "{}", self)
233 0 : }
234 : }
235 :
236 : impl std::str::FromStr for TenantShardId {
237 : type Err = hex::FromHexError;
238 :
239 3883 : fn from_str(s: &str) -> Result<Self, Self::Err> {
240 3883 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
241 3883 : if s.len() == 32 {
242 : // Legacy case: no shard specified
243 : Ok(Self {
244 3855 : tenant_id: TenantId::from_str(s)?,
245 3855 : shard_number: ShardNumber(0),
246 3855 : shard_count: ShardCount(0),
247 : })
248 28 : } else if s.len() == 37 {
249 28 : let bytes = s.as_bytes();
250 28 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
251 28 : let mut shard_parts: [u8; 2] = [0u8; 2];
252 28 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
253 28 : Ok(Self {
254 28 : tenant_id,
255 28 : shard_number: ShardNumber(shard_parts[0]),
256 28 : shard_count: ShardCount(shard_parts[1]),
257 28 : })
258 : } else {
259 0 : Err(hex::FromHexError::InvalidStringLength)
260 : }
261 3883 : }
262 : }
263 :
264 : impl From<[u8; 18]> for TenantShardId {
265 4 : fn from(b: [u8; 18]) -> Self {
266 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
267 4 :
268 4 : Self {
269 4 : tenant_id: TenantId::from(tenant_id_bytes),
270 4 : shard_number: ShardNumber(b[16]),
271 4 : shard_count: ShardCount(b[17]),
272 4 : }
273 4 : }
274 : }
275 :
276 : impl ShardIndex {
277 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
278 0 : Self {
279 0 : shard_number: number,
280 0 : shard_count: count,
281 0 : }
282 0 : }
283 86 : pub fn unsharded() -> Self {
284 86 : Self {
285 86 : shard_number: ShardNumber(0),
286 86 : shard_count: ShardCount(0),
287 86 : }
288 86 : }
289 :
290 : /// The "unsharded" value is distinct from simply having a single shard: it represents
291 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
292 : /// a shard suffix.
293 71748 : pub fn is_unsharded(&self) -> bool {
294 71748 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
295 71748 : }
296 :
297 : /// For use in constructing remote storage paths: concatenate this with a TenantId
298 : /// to get a fully qualified TenantShardId.
299 : ///
300 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
301 : /// that the legacy pre-sharding remote key format is preserved.
302 1394 : pub fn get_suffix(&self) -> String {
303 1394 : if self.is_unsharded() {
304 1386 : "".to_string()
305 : } else {
306 8 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
307 : }
308 1394 : }
309 : }
310 :
311 : impl std::fmt::Display for ShardIndex {
312 1909 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 1909 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
314 1909 : }
315 : }
316 :
317 : impl std::fmt::Debug for ShardIndex {
318 1464 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 1464 : // Debug is the same as Display: the compact hex representation
320 1464 : write!(f, "{}", self)
321 1464 : }
322 : }
323 :
324 : impl std::str::FromStr for ShardIndex {
325 : type Err = hex::FromHexError;
326 :
327 2 : fn from_str(s: &str) -> Result<Self, Self::Err> {
328 2 : // Expect format: 1 byte shard number, 1 byte shard count
329 2 : if s.len() == 4 {
330 2 : let bytes = s.as_bytes();
331 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
332 2 : hex::decode_to_slice(bytes, &mut shard_parts)?;
333 2 : Ok(Self {
334 2 : shard_number: ShardNumber(shard_parts[0]),
335 2 : shard_count: ShardCount(shard_parts[1]),
336 2 : })
337 : } else {
338 0 : Err(hex::FromHexError::InvalidStringLength)
339 : }
340 2 : }
341 : }
342 :
343 : impl From<[u8; 2]> for ShardIndex {
344 2 : fn from(b: [u8; 2]) -> Self {
345 2 : Self {
346 2 : shard_number: ShardNumber(b[0]),
347 2 : shard_count: ShardCount(b[1]),
348 2 : }
349 2 : }
350 : }
351 :
352 : impl Serialize for TenantShardId {
353 52 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
354 52 : where
355 52 : S: serde::Serializer,
356 52 : {
357 52 : if serializer.is_human_readable() {
358 44 : serializer.collect_str(self)
359 : } else {
360 : // Note: while human encoding of [`TenantShardId`] is backward and forward
361 : // compatible, this binary encoding is not.
362 8 : let mut packed: [u8; 18] = [0; 18];
363 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
364 8 : packed[16] = self.shard_number.0;
365 8 : packed[17] = self.shard_count.0;
366 8 :
367 8 : packed.serialize(serializer)
368 : }
369 52 : }
370 : }
371 :
372 : impl<'de> Deserialize<'de> for TenantShardId {
373 12 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
374 12 : where
375 12 : D: serde::Deserializer<'de>,
376 12 : {
377 12 : struct IdVisitor {
378 12 : is_human_readable_deserializer: bool,
379 12 : }
380 12 :
381 12 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
382 12 : type Value = TenantShardId;
383 12 :
384 12 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
385 0 : if self.is_human_readable_deserializer {
386 12 : formatter.write_str("value in form of hex string")
387 12 : } else {
388 12 : formatter.write_str("value in form of integer array([u8; 18])")
389 12 : }
390 12 : }
391 12 :
392 12 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
393 4 : where
394 4 : A: serde::de::SeqAccess<'de>,
395 4 : {
396 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
397 12 : let id: [u8; 18] = Deserialize::deserialize(s)?;
398 12 : Ok(TenantShardId::from(id))
399 12 : }
400 12 :
401 12 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
402 8 : where
403 8 : E: serde::de::Error,
404 8 : {
405 8 : TenantShardId::from_str(v).map_err(E::custom)
406 8 : }
407 12 : }
408 12 :
409 12 : if deserializer.is_human_readable() {
410 8 : deserializer.deserialize_str(IdVisitor {
411 8 : is_human_readable_deserializer: true,
412 8 : })
413 : } else {
414 4 : deserializer.deserialize_tuple(
415 4 : 18,
416 4 : IdVisitor {
417 4 : is_human_readable_deserializer: false,
418 4 : },
419 4 : )
420 : }
421 12 : }
422 : }
423 :
424 : /// Stripe size in number of pages
425 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
426 : pub struct ShardStripeSize(pub u32);
427 :
428 : impl Default for ShardStripeSize {
429 0 : fn default() -> Self {
430 0 : DEFAULT_STRIPE_SIZE
431 0 : }
432 : }
433 :
434 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
435 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
436 : pub struct ShardLayout(u8);
437 :
438 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
439 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
440 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
441 :
442 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
443 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
444 :
445 0 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
446 : pub enum ShardConfigError {
447 : #[error("Invalid shard count")]
448 : InvalidCount,
449 : #[error("Invalid shard number")]
450 : InvalidNumber,
451 : #[error("Invalid stripe size")]
452 : InvalidStripeSize,
453 : }
454 :
455 : impl ShardIdentity {
456 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
457 : /// tenants. Modern single-shard tenants should not use this: they should
458 : /// have number=0 count=1.
459 1350 : pub const fn unsharded() -> Self {
460 1350 : Self {
461 1350 : number: ShardNumber(0),
462 1350 : count: ShardCount(0),
463 1350 : layout: LAYOUT_V1,
464 1350 : stripe_size: DEFAULT_STRIPE_SIZE,
465 1350 : }
466 1350 : }
467 :
468 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
469 : /// which are constructed in code paths that don't have access to proper configuration.
470 : ///
471 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
472 : /// Enforcement is via assertions, to avoid making our interface fallible for this
473 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
474 : /// state, and by extension to avoid trying to do any page->shard resolution.
475 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
476 0 : Self {
477 0 : number,
478 0 : count,
479 0 : layout: LAYOUT_BROKEN,
480 0 : stripe_size: DEFAULT_STRIPE_SIZE,
481 0 : }
482 0 : }
483 :
484 : /// The "unsharded" value is distinct from simply having a single shard: it represents
485 : /// a tenant which is not shard-aware at all, and whose storage paths will not include
486 : /// a shard suffix.
487 0 : pub fn is_unsharded(&self) -> bool {
488 0 : self.number == ShardNumber(0) && self.count == ShardCount(0)
489 0 : }
490 :
491 : /// Count must be nonzero, and number must be < count. To construct
492 : /// the legacy case (count==0), use Self::unsharded instead.
493 1032 : pub fn new(
494 1032 : number: ShardNumber,
495 1032 : count: ShardCount,
496 1032 : stripe_size: ShardStripeSize,
497 1032 : ) -> Result<Self, ShardConfigError> {
498 1032 : if count.0 == 0 {
499 2 : Err(ShardConfigError::InvalidCount)
500 1030 : } else if number.0 > count.0 - 1 {
501 6 : Err(ShardConfigError::InvalidNumber)
502 1024 : } else if stripe_size.0 == 0 {
503 2 : Err(ShardConfigError::InvalidStripeSize)
504 : } else {
505 1022 : Ok(Self {
506 1022 : number,
507 1022 : count,
508 1022 : layout: LAYOUT_V1,
509 1022 : stripe_size,
510 1022 : })
511 : }
512 1032 : }
513 :
514 : /// For use when creating ShardIdentity instances for new shards, where a creation request
515 : /// specifies the ShardParameters that apply to all shards.
516 165 : pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
517 165 : Self {
518 165 : number,
519 165 : count: params.count,
520 165 : layout: LAYOUT_V1,
521 165 : stripe_size: params.stripe_size,
522 165 : }
523 165 : }
524 :
525 1226480 : fn is_broken(&self) -> bool {
526 1226480 : self.layout == LAYOUT_BROKEN
527 1226480 : }
528 :
529 3136 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
530 3136 : assert!(!self.is_broken());
531 3136 : key_to_shard_number(self.count, self.stripe_size, key)
532 3136 : }
533 :
534 : /// Return true if the key should be ingested by this shard
535 : ///
536 : /// Shards must ingest _at least_ keys which return true from this check.
537 1223344 : pub fn is_key_local(&self, key: &Key) -> bool {
538 1223344 : assert!(!self.is_broken());
539 1223344 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
540 171816 : true
541 : } else {
542 1051528 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
543 : }
544 1223344 : }
545 :
546 : /// Return true if the key should be discarded if found in this shard's
547 : /// data store, e.g. during compaction after a split.
548 : ///
549 : /// Shards _may_ drop keys which return false here, but are not obliged to.
550 3746104 : pub fn is_key_disposable(&self, key: &Key) -> bool {
551 3746104 : if key_is_shard0(key) {
552 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
553 : // A1: because the WAL ingestion logic currently ingests some shard 0
554 : // content on all shards, even though it's only read on shard 0. If we
555 : // dropped it, then subsequent WAL ingest to these keys would encounter
556 : // an error.
557 : // A2: because key_is_shard0 also covers relation size keys, which are written
558 : // on all shards even though they're only maintained accurately on shard 0.
559 2673842 : false
560 : } else {
561 1072262 : !self.is_key_local(key)
562 : }
563 3746104 : }
564 :
565 : /// Obtains the shard number and count combined into a `ShardIndex`.
566 157 : pub fn shard_index(&self) -> ShardIndex {
567 157 : ShardIndex {
568 157 : shard_count: self.count,
569 157 : shard_number: self.number,
570 157 : }
571 157 : }
572 :
573 8 : pub fn shard_slug(&self) -> String {
574 8 : if self.count > ShardCount(0) {
575 8 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
576 : } else {
577 0 : String::new()
578 : }
579 8 : }
580 :
581 : /// Convenience for checking if this identity is the 0th shard in a tenant,
582 : /// for special cases on shard 0 such as ingesting relation sizes.
583 0 : pub fn is_shard_zero(&self) -> bool {
584 0 : self.number == ShardNumber(0)
585 0 : }
586 : }
587 :
588 : impl Serialize for ShardIndex {
589 20 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
590 20 : where
591 20 : S: serde::Serializer,
592 20 : {
593 20 : if serializer.is_human_readable() {
594 16 : serializer.collect_str(self)
595 : } else {
596 : // Binary encoding is not used in index_part.json, but is included in anticipation of
597 : // switching various structures (e.g. inter-process communication, remote metadata) to more
598 : // compact binary encodings in future.
599 4 : let mut packed: [u8; 2] = [0; 2];
600 4 : packed[0] = self.shard_number.0;
601 4 : packed[1] = self.shard_count.0;
602 4 : packed.serialize(serializer)
603 : }
604 20 : }
605 : }
606 :
607 : impl<'de> Deserialize<'de> for ShardIndex {
608 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
609 2 : where
610 2 : D: serde::Deserializer<'de>,
611 2 : {
612 2 : struct IdVisitor {
613 2 : is_human_readable_deserializer: bool,
614 2 : }
615 2 :
616 2 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
617 2 : type Value = ShardIndex;
618 2 :
619 2 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
620 0 : if self.is_human_readable_deserializer {
621 2 : formatter.write_str("value in form of hex string")
622 2 : } else {
623 2 : formatter.write_str("value in form of integer array([u8; 2])")
624 2 : }
625 2 : }
626 2 :
627 2 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
628 2 : where
629 2 : A: serde::de::SeqAccess<'de>,
630 2 : {
631 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
632 2 : let id: [u8; 2] = Deserialize::deserialize(s)?;
633 2 : Ok(ShardIndex::from(id))
634 2 : }
635 2 :
636 2 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
637 0 : where
638 0 : E: serde::de::Error,
639 0 : {
640 0 : ShardIndex::from_str(v).map_err(E::custom)
641 0 : }
642 2 : }
643 2 :
644 2 : if deserializer.is_human_readable() {
645 0 : deserializer.deserialize_str(IdVisitor {
646 0 : is_human_readable_deserializer: true,
647 0 : })
648 : } else {
649 2 : deserializer.deserialize_tuple(
650 2 : 2,
651 2 : IdVisitor {
652 2 : is_human_readable_deserializer: false,
653 2 : },
654 2 : )
655 : }
656 2 : }
657 : }
658 :
659 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
660 : /// in order to be able to serve basebackup requests without peer communication).
661 5849162 : fn key_is_shard0(key: &Key) -> bool {
662 5849162 : // To decide what to shard out to shards >0, we apply a simple rule that only
663 5849162 : // relation pages are distributed to shards other than shard zero. Everything else gets
664 5849162 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
665 5849162 : // requests, and any request other than those for particular blocks in relations.
666 5849162 : //
667 5849162 : // The only exception to this rule is "initfork" data -- this relates to postgres's UNLOGGED table
668 5849162 : // type. These are special relations, usually with only 0 or 1 blocks, and we store them on shard 0
669 5849162 : // because they must be included in basebackups.
670 5849162 : let is_initfork = key.field5 == INIT_FORKNUM;
671 5849162 :
672 5849162 : !key.is_rel_block_key() || is_initfork
673 5849162 : }
674 :
675 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
676 2103062 : fn murmurhash32(mut h: u32) -> u32 {
677 2103062 : h ^= h >> 16;
678 2103062 : h = h.wrapping_mul(0x85ebca6b);
679 2103062 : h ^= h >> 13;
680 2103062 : h = h.wrapping_mul(0xc2b2ae35);
681 2103062 : h ^= h >> 16;
682 2103062 : h
683 2103062 : }
684 :
685 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
686 1051532 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
687 1051532 : b = b.wrapping_add(0x9e3779b9);
688 1051532 : b = b.wrapping_add(a << 6);
689 1051532 : b = b.wrapping_add(a >> 2);
690 1051532 :
691 1051532 : a ^= b;
692 1051532 : a
693 1051532 : }
694 :
695 : /// Where a Key is to be distributed across shards, select the shard. This function
696 : /// does not account for keys that should be broadcast across shards.
697 : ///
698 : /// The hashing in this function must exactly match what we do in postgres smgr
699 : /// code. The resulting distribution of pages is intended to preserve locality within
700 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
701 : /// distributing data pseudo-randomly.
702 : ///
703 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
704 : /// and will be handled at higher levels when shards are split.
705 1054666 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
706 1054666 : // Fast path for un-sharded tenants or broadcast keys
707 1054666 : if count < ShardCount(2) || key_is_shard0(key) {
708 3136 : return ShardNumber(0);
709 1051530 : }
710 1051530 :
711 1051530 : // relNode
712 1051530 : let mut hash = murmurhash32(key.field4);
713 1051530 : // blockNum/stripe size
714 1051530 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
715 1051530 :
716 1051530 : ShardNumber((hash % count.0 as u32) as u8)
717 1054666 : }
718 :
719 : /// For debugging, while not exposing the internals.
720 : #[derive(Debug)]
721 : #[allow(unused)] // used by debug formatting by pagectl
722 : struct KeyShardingInfo {
723 : shard0: bool,
724 : shard_number: ShardNumber,
725 : }
726 :
727 0 : pub fn describe(
728 0 : key: &Key,
729 0 : shard_count: ShardCount,
730 0 : stripe_size: ShardStripeSize,
731 0 : ) -> impl std::fmt::Debug {
732 0 : KeyShardingInfo {
733 0 : shard0: key_is_shard0(key),
734 0 : shard_number: key_to_shard_number(shard_count, stripe_size, key),
735 0 : }
736 0 : }
737 :
738 : #[cfg(test)]
739 : mod tests {
740 : use utils::Hex;
741 :
742 : use super::*;
743 :
744 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
745 :
746 : #[test]
747 2 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
748 2 : let example = TenantShardId {
749 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
750 2 : shard_count: ShardCount(10),
751 2 : shard_number: ShardNumber(7),
752 2 : };
753 2 :
754 2 : let encoded = format!("{example}");
755 2 :
756 2 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
757 2 : assert_eq!(&encoded, &expected);
758 :
759 2 : let decoded = TenantShardId::from_str(&encoded)?;
760 :
761 2 : assert_eq!(example, decoded);
762 :
763 2 : Ok(())
764 2 : }
765 :
766 : #[test]
767 2 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
768 2 : let example = TenantShardId {
769 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
770 2 : shard_count: ShardCount(10),
771 2 : shard_number: ShardNumber(7),
772 2 : };
773 2 :
774 2 : let encoded = bincode::serialize(&example).unwrap();
775 2 : let expected: [u8; 18] = [
776 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
777 2 : 0xf6, 0xfc, 0x07, 0x0a,
778 2 : ];
779 2 : assert_eq!(Hex(&encoded), Hex(&expected));
780 :
781 2 : let decoded = bincode::deserialize(&encoded).unwrap();
782 2 :
783 2 : assert_eq!(example, decoded);
784 :
785 2 : Ok(())
786 2 : }
787 :
788 : #[test]
789 2 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
790 2 : // Test that TenantShardId can decode a TenantId in human
791 2 : // readable form
792 2 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
793 2 : let encoded = format!("{example}");
794 2 :
795 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
796 :
797 2 : let decoded = TenantShardId::from_str(&encoded)?;
798 :
799 2 : assert_eq!(example, decoded.tenant_id);
800 2 : assert_eq!(decoded.shard_count, ShardCount(0));
801 2 : assert_eq!(decoded.shard_number, ShardNumber(0));
802 :
803 2 : Ok(())
804 2 : }
805 :
806 : #[test]
807 2 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
808 2 : // Test that a legacy TenantShardId encodes into a form that
809 2 : // can be decoded as TenantId
810 2 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
811 2 : let example = TenantShardId::unsharded(example_tenant_id);
812 2 : let encoded = format!("{example}");
813 2 :
814 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
815 :
816 2 : let decoded = TenantId::from_str(&encoded)?;
817 :
818 2 : assert_eq!(example_tenant_id, decoded);
819 :
820 2 : Ok(())
821 2 : }
822 :
823 : #[test]
824 2 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
825 2 : // Unlike in human readable encoding, binary encoding does not
826 2 : // do any special handling of legacy unsharded TenantIds: this test
827 2 : // is equivalent to the main test for binary encoding, just verifying
828 2 : // that the same behavior applies when we have used `unsharded()` to
829 2 : // construct a TenantShardId.
830 2 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
831 2 : let encoded = bincode::serialize(&example).unwrap();
832 2 :
833 2 : let expected: [u8; 18] = [
834 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
835 2 : 0xf6, 0xfc, 0x00, 0x00,
836 2 : ];
837 2 : assert_eq!(Hex(&encoded), Hex(&expected));
838 :
839 2 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
840 2 : assert_eq!(example, decoded);
841 :
842 2 : Ok(())
843 2 : }
844 :
845 : #[test]
846 2 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
847 2 : // Happy cases
848 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
849 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
850 2 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
851 :
852 2 : assert_eq!(
853 2 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
854 2 : Err(ShardConfigError::InvalidCount)
855 2 : );
856 2 : assert_eq!(
857 2 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
858 2 : Err(ShardConfigError::InvalidNumber)
859 2 : );
860 2 : assert_eq!(
861 2 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
862 2 : Err(ShardConfigError::InvalidNumber)
863 2 : );
864 2 : assert_eq!(
865 2 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
866 2 : Err(ShardConfigError::InvalidNumber)
867 2 : );
868 2 : assert_eq!(
869 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
870 2 : Err(ShardConfigError::InvalidStripeSize)
871 2 : );
872 :
873 2 : Ok(())
874 2 : }
875 :
876 : #[test]
877 2 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
878 2 : let example = ShardIndex {
879 2 : shard_number: ShardNumber(13),
880 2 : shard_count: ShardCount(17),
881 2 : };
882 2 : let expected: String = "0d11".to_string();
883 2 : let encoded = format!("{example}");
884 2 : assert_eq!(&encoded, &expected);
885 :
886 2 : let decoded = ShardIndex::from_str(&encoded)?;
887 2 : assert_eq!(example, decoded);
888 2 : Ok(())
889 2 : }
890 :
891 : #[test]
892 2 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
893 2 : let example = ShardIndex {
894 2 : shard_number: ShardNumber(13),
895 2 : shard_count: ShardCount(17),
896 2 : };
897 2 : let expected: [u8; 2] = [0x0d, 0x11];
898 2 :
899 2 : let encoded = bincode::serialize(&example).unwrap();
900 2 : assert_eq!(Hex(&encoded), Hex(&expected));
901 2 : let decoded = bincode::deserialize(&encoded).unwrap();
902 2 : assert_eq!(example, decoded);
903 :
904 2 : Ok(())
905 2 : }
906 :
907 : // These are only smoke tests to spot check that our implementation doesn't
908 : // deviate from a few examples values: not aiming to validate the overall
909 : // hashing algorithm.
910 : #[test]
911 2 : fn murmur_hash() {
912 2 : assert_eq!(murmurhash32(0), 0);
913 :
914 2 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
915 2 : }
916 :
917 : #[test]
918 2 : fn shard_mapping() {
919 2 : let key = Key {
920 2 : field1: 0x00,
921 2 : field2: 0x67f,
922 2 : field3: 0x5,
923 2 : field4: 0x400c,
924 2 : field5: 0x00,
925 2 : field6: 0x7d06,
926 2 : };
927 2 :
928 2 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
929 2 : assert_eq!(shard, ShardNumber(8));
930 2 : }
931 :
932 : #[test]
933 2 : fn shard_id_split() {
934 2 : let tenant_id = TenantId::generate();
935 2 : let parent = TenantShardId::unsharded(tenant_id);
936 2 :
937 2 : // Unsharded into 2
938 2 : assert_eq!(
939 2 : parent.split(ShardCount(2)),
940 2 : vec![
941 2 : TenantShardId {
942 2 : tenant_id,
943 2 : shard_count: ShardCount(2),
944 2 : shard_number: ShardNumber(0)
945 2 : },
946 2 : TenantShardId {
947 2 : tenant_id,
948 2 : shard_count: ShardCount(2),
949 2 : shard_number: ShardNumber(1)
950 2 : }
951 2 : ]
952 2 : );
953 :
954 : // Unsharded into 4
955 2 : assert_eq!(
956 2 : parent.split(ShardCount(4)),
957 2 : vec![
958 2 : TenantShardId {
959 2 : tenant_id,
960 2 : shard_count: ShardCount(4),
961 2 : shard_number: ShardNumber(0)
962 2 : },
963 2 : TenantShardId {
964 2 : tenant_id,
965 2 : shard_count: ShardCount(4),
966 2 : shard_number: ShardNumber(1)
967 2 : },
968 2 : TenantShardId {
969 2 : tenant_id,
970 2 : shard_count: ShardCount(4),
971 2 : shard_number: ShardNumber(2)
972 2 : },
973 2 : TenantShardId {
974 2 : tenant_id,
975 2 : shard_count: ShardCount(4),
976 2 : shard_number: ShardNumber(3)
977 2 : }
978 2 : ]
979 2 : );
980 :
981 : // count=1 into 2 (check this works the same as unsharded.)
982 2 : let parent = TenantShardId {
983 2 : tenant_id,
984 2 : shard_count: ShardCount(1),
985 2 : shard_number: ShardNumber(0),
986 2 : };
987 2 : assert_eq!(
988 2 : parent.split(ShardCount(2)),
989 2 : vec![
990 2 : TenantShardId {
991 2 : tenant_id,
992 2 : shard_count: ShardCount(2),
993 2 : shard_number: ShardNumber(0)
994 2 : },
995 2 : TenantShardId {
996 2 : tenant_id,
997 2 : shard_count: ShardCount(2),
998 2 : shard_number: ShardNumber(1)
999 2 : }
1000 2 : ]
1001 2 : );
1002 :
1003 : // count=2 into count=8
1004 2 : let parent = TenantShardId {
1005 2 : tenant_id,
1006 2 : shard_count: ShardCount(2),
1007 2 : shard_number: ShardNumber(1),
1008 2 : };
1009 2 : assert_eq!(
1010 2 : parent.split(ShardCount(8)),
1011 2 : vec![
1012 2 : TenantShardId {
1013 2 : tenant_id,
1014 2 : shard_count: ShardCount(8),
1015 2 : shard_number: ShardNumber(1)
1016 2 : },
1017 2 : TenantShardId {
1018 2 : tenant_id,
1019 2 : shard_count: ShardCount(8),
1020 2 : shard_number: ShardNumber(3)
1021 2 : },
1022 2 : TenantShardId {
1023 2 : tenant_id,
1024 2 : shard_count: ShardCount(8),
1025 2 : shard_number: ShardNumber(5)
1026 2 : },
1027 2 : TenantShardId {
1028 2 : tenant_id,
1029 2 : shard_count: ShardCount(8),
1030 2 : shard_number: ShardNumber(7)
1031 2 : },
1032 2 : ]
1033 2 : );
1034 2 : }
1035 : }
|