Line data Source code
1 : use std::{ops::RangeInclusive, str::FromStr};
2 :
3 : use crate::{
4 : key::{is_rel_block_key, Key},
5 : models::ShardParameters,
6 : };
7 : use hex::FromHex;
8 : use serde::{Deserialize, Serialize};
9 : use thiserror;
10 : use utils::id::TenantId;
11 :
12 100752 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
13 : pub struct ShardNumber(pub u8);
14 :
15 167032 : #[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
16 : pub struct ShardCount(u8);
17 :
18 : impl ShardCount {
19 : pub const MAX: Self = Self(u8::MAX);
20 :
21 : /// The internal value of a ShardCount may be zero, which means "1 shard, but use
22 : /// legacy format for TenantShardId that excludes the shard suffix", also known
23 : /// as `TenantShardId::unsharded`.
24 : ///
25 : /// This method returns the actual number of shards, i.e. if our internal value is
26 : /// zero, we return 1 (unsharded tenants have 1 shard).
27 0 : pub fn count(&self) -> u8 {
28 0 : if self.0 > 0 {
29 0 : self.0
30 : } else {
31 0 : 1
32 : }
33 0 : }
34 :
35 : /// The literal internal value: this is **not** the number of shards in the
36 : /// tenant, as we have a special zero value for legacy unsharded tenants. Use
37 : /// [`Self::count`] if you want to know the cardinality of shards.
38 0 : pub fn literal(&self) -> u8 {
39 0 : self.0
40 0 : }
41 :
42 0 : pub fn is_unsharded(&self) -> bool {
43 0 : self.0 == 0
44 0 : }
45 :
46 : /// `v` may be zero, or the number of shards in the tenant. `v` is what
47 : /// [`Self::literal`] would return.
48 92 : pub fn new(val: u8) -> Self {
49 92 : Self(val)
50 92 : }
51 : }
52 :
53 : impl ShardNumber {
54 : pub const MAX: Self = Self(u8::MAX);
55 : }
56 :
57 : /// TenantShardId identify the units of work for the Pageserver.
58 : ///
59 : /// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
60 : ///
61 : /// # The second shard in a two-shard tenant
62 : /// 072f1291a5310026820b2fe4b2968934-0102
63 : ///
64 : /// Historically, tenants could not have multiple shards, and were identified
65 : /// by TenantId. To support this, TenantShardId has a special legacy
66 : /// mode where `shard_count` is equal to zero: this represents a single-sharded
67 : /// tenant which should be written as a TenantId with no suffix.
68 : ///
69 : /// The human-readable encoding of TenantShardId, such as used in API URLs,
70 : /// is both forward and backward compatible: a legacy TenantId can be
71 : /// decoded as a TenantShardId, and when re-encoded it will be parseable
72 : /// as a TenantId.
73 : ///
74 : /// Note that the binary encoding is _not_ backward compatible, because
75 : /// at the time sharding is introduced, there are no existing binary structures
76 : /// containing TenantId that we need to handle.
77 503586 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
78 : pub struct TenantShardId {
79 : pub tenant_id: TenantId,
80 : pub shard_number: ShardNumber,
81 : pub shard_count: ShardCount,
82 : }
83 :
84 : impl TenantShardId {
85 125 : pub fn unsharded(tenant_id: TenantId) -> Self {
86 125 : Self {
87 125 : tenant_id,
88 125 : shard_number: ShardNumber(0),
89 125 : shard_count: ShardCount(0),
90 125 : }
91 125 : }
92 :
93 : /// The range of all TenantShardId that belong to a particular TenantId. This is useful when
94 : /// you have a BTreeMap of TenantShardId, and are querying by TenantId.
95 0 : pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
96 0 : RangeInclusive::new(
97 0 : Self {
98 0 : tenant_id,
99 0 : shard_number: ShardNumber(0),
100 0 : shard_count: ShardCount(0),
101 0 : },
102 0 : Self {
103 0 : tenant_id,
104 0 : shard_number: ShardNumber::MAX,
105 0 : shard_count: ShardCount::MAX,
106 0 : },
107 0 : )
108 0 : }
109 :
110 5466 : pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
111 5466 : ShardSlug(self)
112 5466 : }
113 :
114 : /// Convenience for code that has special behavior on the 0th shard.
115 88 : pub fn is_zero(&self) -> bool {
116 88 : self.shard_number == ShardNumber(0)
117 88 : }
118 :
119 0 : pub fn is_unsharded(&self) -> bool {
120 0 : self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
121 0 : }
122 :
123 : /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
124 : /// is useful when logging from code that is already in a span that includes tenant ID, to
125 : /// keep messages reasonably terse.
126 0 : pub fn to_index(&self) -> ShardIndex {
127 0 : ShardIndex {
128 0 : shard_number: self.shard_number,
129 0 : shard_count: self.shard_count,
130 0 : }
131 0 : }
132 :
133 : /// Calculate the children of this TenantShardId when splitting the overall tenant into
134 : /// the given number of shards.
135 8 : pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
136 8 : let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
137 8 : let mut child_shards = Vec::new();
138 32 : for shard_number in 0..ShardNumber(new_shard_count.0).0 {
139 : // Key mapping is based on a round robin mapping of key hash modulo shard count,
140 : // so our child shards are the ones which the same keys would map to.
141 32 : if shard_number % effective_old_shard_count == self.shard_number.0 {
142 24 : child_shards.push(TenantShardId {
143 24 : tenant_id: self.tenant_id,
144 24 : shard_number: ShardNumber(shard_number),
145 24 : shard_count: new_shard_count,
146 24 : })
147 8 : }
148 : }
149 :
150 8 : child_shards
151 8 : }
152 : }
153 :
154 : /// Formatting helper
155 : struct ShardSlug<'a>(&'a TenantShardId);
156 :
157 : impl<'a> std::fmt::Display for ShardSlug<'a> {
158 5466 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 5466 : write!(
160 5466 : f,
161 5466 : "{:02x}{:02x}",
162 5466 : self.0.shard_number.0, self.0.shard_count.0
163 5466 : )
164 5466 : }
165 : }
166 :
167 : impl std::fmt::Display for TenantShardId {
168 4473 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 4473 : if self.shard_count != ShardCount(0) {
170 2 : write!(f, "{}-{}", self.tenant_id, self.shard_slug())
171 : } else {
172 : // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
173 : // is distinct from the normal single shard case (shard count == 1).
174 4471 : self.tenant_id.fmt(f)
175 : }
176 4473 : }
177 : }
178 :
179 : impl std::fmt::Debug for TenantShardId {
180 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 0 : // Debug is the same as Display: the compact hex representation
182 0 : write!(f, "{}", self)
183 0 : }
184 : }
185 :
186 : impl std::str::FromStr for TenantShardId {
187 : type Err = hex::FromHexError;
188 :
189 1666 : fn from_str(s: &str) -> Result<Self, Self::Err> {
190 1666 : // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
191 1666 : if s.len() == 32 {
192 : // Legacy case: no shard specified
193 : Ok(Self {
194 1664 : tenant_id: TenantId::from_str(s)?,
195 1664 : shard_number: ShardNumber(0),
196 1664 : shard_count: ShardCount(0),
197 : })
198 2 : } else if s.len() == 37 {
199 2 : let bytes = s.as_bytes();
200 2 : let tenant_id = TenantId::from_hex(&bytes[0..32])?;
201 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
202 2 : hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
203 2 : Ok(Self {
204 2 : tenant_id,
205 2 : shard_number: ShardNumber(shard_parts[0]),
206 2 : shard_count: ShardCount(shard_parts[1]),
207 2 : })
208 : } else {
209 0 : Err(hex::FromHexError::InvalidStringLength)
210 : }
211 1666 : }
212 : }
213 :
214 : impl From<[u8; 18]> for TenantShardId {
215 4 : fn from(b: [u8; 18]) -> Self {
216 4 : let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
217 4 :
218 4 : Self {
219 4 : tenant_id: TenantId::from(tenant_id_bytes),
220 4 : shard_number: ShardNumber(b[16]),
221 4 : shard_count: ShardCount(b[17]),
222 4 : }
223 4 : }
224 : }
225 :
226 : /// For use within the context of a particular tenant, when we need to know which
227 : /// shard we're dealing with, but do not need to know the full ShardIdentity (because
228 : /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
229 : /// TenantShardId.
230 3816 : #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
231 : pub struct ShardIndex {
232 : pub shard_number: ShardNumber,
233 : pub shard_count: ShardCount,
234 : }
235 :
236 : impl ShardIndex {
237 0 : pub fn new(number: ShardNumber, count: ShardCount) -> Self {
238 0 : Self {
239 0 : shard_number: number,
240 0 : shard_count: count,
241 0 : }
242 0 : }
243 152 : pub fn unsharded() -> Self {
244 152 : Self {
245 152 : shard_number: ShardNumber(0),
246 152 : shard_count: ShardCount(0),
247 152 : }
248 152 : }
249 :
250 5825 : pub fn is_unsharded(&self) -> bool {
251 5825 : self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
252 5825 : }
253 :
254 : /// For use in constructing remote storage paths: concatenate this with a TenantId
255 : /// to get a fully qualified TenantShardId.
256 : ///
257 : /// Backward compat: this function returns an empty string if Self::is_unsharded, such
258 : /// that the legacy pre-sharding remote key format is preserved.
259 13 : pub fn get_suffix(&self) -> String {
260 13 : if self.is_unsharded() {
261 13 : "".to_string()
262 : } else {
263 0 : format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
264 : }
265 13 : }
266 : }
267 :
268 : impl std::fmt::Display for ShardIndex {
269 868 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 868 : write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
271 868 : }
272 : }
273 :
274 : impl std::fmt::Debug for ShardIndex {
275 562 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 562 : // Debug is the same as Display: the compact hex representation
277 562 : write!(f, "{}", self)
278 562 : }
279 : }
280 :
281 : impl std::str::FromStr for ShardIndex {
282 : type Err = hex::FromHexError;
283 :
284 2 : fn from_str(s: &str) -> Result<Self, Self::Err> {
285 2 : // Expect format: 1 byte shard number, 1 byte shard count
286 2 : if s.len() == 4 {
287 2 : let bytes = s.as_bytes();
288 2 : let mut shard_parts: [u8; 2] = [0u8; 2];
289 2 : hex::decode_to_slice(bytes, &mut shard_parts)?;
290 2 : Ok(Self {
291 2 : shard_number: ShardNumber(shard_parts[0]),
292 2 : shard_count: ShardCount(shard_parts[1]),
293 2 : })
294 : } else {
295 0 : Err(hex::FromHexError::InvalidStringLength)
296 : }
297 2 : }
298 : }
299 :
300 : impl From<[u8; 2]> for ShardIndex {
301 2 : fn from(b: [u8; 2]) -> Self {
302 2 : Self {
303 2 : shard_number: ShardNumber(b[0]),
304 2 : shard_count: ShardCount(b[1]),
305 2 : }
306 2 : }
307 : }
308 :
309 : impl Serialize for TenantShardId {
310 55 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
311 55 : where
312 55 : S: serde::Serializer,
313 55 : {
314 55 : if serializer.is_human_readable() {
315 47 : serializer.collect_str(self)
316 : } else {
317 8 : let mut packed: [u8; 18] = [0; 18];
318 8 : packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
319 8 : packed[16] = self.shard_number.0;
320 8 : packed[17] = self.shard_count.0;
321 8 :
322 8 : packed.serialize(serializer)
323 : }
324 55 : }
325 : }
326 :
327 : impl<'de> Deserialize<'de> for TenantShardId {
328 12 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
329 12 : where
330 12 : D: serde::Deserializer<'de>,
331 12 : {
332 12 : struct IdVisitor {
333 12 : is_human_readable_deserializer: bool,
334 12 : }
335 12 :
336 12 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
337 12 : type Value = TenantShardId;
338 12 :
339 12 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
340 0 : if self.is_human_readable_deserializer {
341 12 : formatter.write_str("value in form of hex string")
342 12 : } else {
343 12 : formatter.write_str("value in form of integer array([u8; 18])")
344 12 : }
345 12 : }
346 12 :
347 12 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
348 4 : where
349 4 : A: serde::de::SeqAccess<'de>,
350 4 : {
351 4 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
352 12 : let id: [u8; 18] = Deserialize::deserialize(s)?;
353 12 : Ok(TenantShardId::from(id))
354 12 : }
355 12 :
356 12 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
357 8 : where
358 8 : E: serde::de::Error,
359 8 : {
360 8 : TenantShardId::from_str(v).map_err(E::custom)
361 8 : }
362 12 : }
363 12 :
364 12 : if deserializer.is_human_readable() {
365 8 : deserializer.deserialize_str(IdVisitor {
366 8 : is_human_readable_deserializer: true,
367 8 : })
368 : } else {
369 4 : deserializer.deserialize_tuple(
370 4 : 18,
371 4 : IdVisitor {
372 4 : is_human_readable_deserializer: false,
373 4 : },
374 4 : )
375 : }
376 12 : }
377 : }
378 :
379 : /// Stripe size in number of pages
380 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
381 : pub struct ShardStripeSize(pub u32);
382 :
383 : /// Layout version: for future upgrades where we might change how the key->shard mapping works
384 167030 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
385 : pub struct ShardLayout(u8);
386 :
387 : const LAYOUT_V1: ShardLayout = ShardLayout(1);
388 : /// ShardIdentity uses a magic layout value to indicate if it is unusable
389 : const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
390 :
391 : /// Default stripe size in pages: 256MiB divided by 8kiB page size.
392 : const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
393 :
394 : /// The ShardIdentity contains the information needed for one member of map
395 : /// to resolve a key to a shard, and then check whether that shard is ==self.
396 0 : #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
397 : pub struct ShardIdentity {
398 : pub number: ShardNumber,
399 : pub count: ShardCount,
400 : pub stripe_size: ShardStripeSize,
401 : layout: ShardLayout,
402 : }
403 :
404 10 : #[derive(thiserror::Error, Debug, PartialEq, Eq)]
405 : pub enum ShardConfigError {
406 : #[error("Invalid shard count")]
407 : InvalidCount,
408 : #[error("Invalid shard number")]
409 : InvalidNumber,
410 : #[error("Invalid stripe size")]
411 : InvalidStripeSize,
412 : }
413 :
414 : impl ShardIdentity {
415 : /// An identity with number=0 count=0 is a "none" identity, which represents legacy
416 : /// tenants. Modern single-shard tenants should not use this: they should
417 : /// have number=0 count=1.
418 88 : pub fn unsharded() -> Self {
419 88 : Self {
420 88 : number: ShardNumber(0),
421 88 : count: ShardCount(0),
422 88 : layout: LAYOUT_V1,
423 88 : stripe_size: DEFAULT_STRIPE_SIZE,
424 88 : }
425 88 : }
426 :
427 : /// A broken instance of this type is only used for `TenantState::Broken` tenants,
428 : /// which are constructed in code paths that don't have access to proper configuration.
429 : ///
430 : /// A ShardIdentity in this state may not be used for anything, and should not be persisted.
431 : /// Enforcement is via assertions, to avoid making our interface fallible for this
432 : /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
433 : /// state, and by extension to avoid trying to do any page->shard resolution.
434 0 : pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
435 0 : Self {
436 0 : number,
437 0 : count,
438 0 : layout: LAYOUT_BROKEN,
439 0 : stripe_size: DEFAULT_STRIPE_SIZE,
440 0 : }
441 0 : }
442 :
443 0 : pub fn is_unsharded(&self) -> bool {
444 0 : self.number == ShardNumber(0) && self.count == ShardCount(0)
445 0 : }
446 :
447 : /// Count must be nonzero, and number must be < count. To construct
448 : /// the legacy case (count==0), use Self::unsharded instead.
449 18 : pub fn new(
450 18 : number: ShardNumber,
451 18 : count: ShardCount,
452 18 : stripe_size: ShardStripeSize,
453 18 : ) -> Result<Self, ShardConfigError> {
454 18 : if count.0 == 0 {
455 2 : Err(ShardConfigError::InvalidCount)
456 16 : } else if number.0 > count.0 - 1 {
457 6 : Err(ShardConfigError::InvalidNumber)
458 10 : } else if stripe_size.0 == 0 {
459 2 : Err(ShardConfigError::InvalidStripeSize)
460 : } else {
461 8 : Ok(Self {
462 8 : number,
463 8 : count,
464 8 : layout: LAYOUT_V1,
465 8 : stripe_size,
466 8 : })
467 : }
468 18 : }
469 :
470 : /// For use when creating ShardIdentity instances for new shards, where a creation request
471 : /// specifies the ShardParameters that apply to all shards.
472 88 : pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
473 88 : Self {
474 88 : number,
475 88 : count: params.count,
476 88 : layout: LAYOUT_V1,
477 88 : stripe_size: params.stripe_size,
478 88 : }
479 88 : }
480 :
481 167030 : fn is_broken(&self) -> bool {
482 167030 : self.layout == LAYOUT_BROKEN
483 167030 : }
484 :
485 3004 : pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
486 3004 : assert!(!self.is_broken());
487 3004 : key_to_shard_number(self.count, self.stripe_size, key)
488 3004 : }
489 :
490 : /// Return true if the key should be ingested by this shard
491 164026 : pub fn is_key_local(&self, key: &Key) -> bool {
492 164026 : assert!(!self.is_broken());
493 164026 : if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
494 164026 : true
495 : } else {
496 0 : key_to_shard_number(self.count, self.stripe_size, key) == self.number
497 : }
498 164026 : }
499 :
500 : /// Return true if the key should be discarded if found in this shard's
501 : /// data store, e.g. during compaction after a split
502 2605708 : pub fn is_key_disposable(&self, key: &Key) -> bool {
503 2605708 : if key_is_shard0(key) {
504 : // Q: Why can't we dispose of shard0 content if we're not shard 0?
505 : // A1: because the WAL ingestion logic currently ingests some shard 0
506 : // content on all shards, even though it's only read on shard 0. If we
507 : // dropped it, then subsequent WAL ingest to these keys would encounter
508 : // an error.
509 : // A2: because key_is_shard0 also covers relation size keys, which are written
510 : // on all shards even though they're only maintained accurately on shard 0.
511 2587324 : false
512 : } else {
513 18384 : !self.is_key_local(key)
514 : }
515 2605708 : }
516 :
517 0 : pub fn shard_slug(&self) -> String {
518 0 : if self.count > ShardCount(0) {
519 0 : format!("-{:02x}{:02x}", self.number.0, self.count.0)
520 : } else {
521 0 : String::new()
522 : }
523 0 : }
524 :
525 : /// Convenience for checking if this identity is the 0th shard in a tenant,
526 : /// for special cases on shard 0 such as ingesting relation sizes.
527 0 : pub fn is_zero(&self) -> bool {
528 0 : self.number == ShardNumber(0)
529 0 : }
530 : }
531 :
532 : impl Serialize for ShardIndex {
533 4 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
534 4 : where
535 4 : S: serde::Serializer,
536 4 : {
537 4 : if serializer.is_human_readable() {
538 0 : serializer.collect_str(self)
539 : } else {
540 : // Binary encoding is not used in index_part.json, but is included in anticipation of
541 : // switching various structures (e.g. inter-process communication, remote metadata) to more
542 : // compact binary encodings in future.
543 4 : let mut packed: [u8; 2] = [0; 2];
544 4 : packed[0] = self.shard_number.0;
545 4 : packed[1] = self.shard_count.0;
546 4 : packed.serialize(serializer)
547 : }
548 4 : }
549 : }
550 :
551 : impl<'de> Deserialize<'de> for ShardIndex {
552 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
553 2 : where
554 2 : D: serde::Deserializer<'de>,
555 2 : {
556 2 : struct IdVisitor {
557 2 : is_human_readable_deserializer: bool,
558 2 : }
559 2 :
560 2 : impl<'de> serde::de::Visitor<'de> for IdVisitor {
561 2 : type Value = ShardIndex;
562 2 :
563 2 : fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
564 0 : if self.is_human_readable_deserializer {
565 2 : formatter.write_str("value in form of hex string")
566 2 : } else {
567 2 : formatter.write_str("value in form of integer array([u8; 2])")
568 2 : }
569 2 : }
570 2 :
571 2 : fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
572 2 : where
573 2 : A: serde::de::SeqAccess<'de>,
574 2 : {
575 2 : let s = serde::de::value::SeqAccessDeserializer::new(seq);
576 2 : let id: [u8; 2] = Deserialize::deserialize(s)?;
577 2 : Ok(ShardIndex::from(id))
578 2 : }
579 2 :
580 2 : fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
581 0 : where
582 0 : E: serde::de::Error,
583 0 : {
584 0 : ShardIndex::from_str(v).map_err(E::custom)
585 0 : }
586 2 : }
587 2 :
588 2 : if deserializer.is_human_readable() {
589 0 : deserializer.deserialize_str(IdVisitor {
590 0 : is_human_readable_deserializer: true,
591 0 : })
592 : } else {
593 2 : deserializer.deserialize_tuple(
594 2 : 2,
595 2 : IdVisitor {
596 2 : is_human_readable_deserializer: false,
597 2 : },
598 2 : )
599 : }
600 2 : }
601 : }
602 :
603 : /// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
604 : /// in order to be able to serve basebackup requests without peer communication).
605 2605710 : fn key_is_shard0(key: &Key) -> bool {
606 2605710 : // To decide what to shard out to shards >0, we apply a simple rule that only
607 2605710 : // relation pages are distributed to shards other than shard zero. Everything else gets
608 2605710 : // stored on shard 0. This guarantees that shard 0 can independently serve basebackup
609 2605710 : // requests, and any request other than those for particular blocks in relations.
610 2605710 : !is_rel_block_key(key)
611 2605710 : }
612 :
613 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
614 6 : fn murmurhash32(mut h: u32) -> u32 {
615 6 : h ^= h >> 16;
616 6 : h = h.wrapping_mul(0x85ebca6b);
617 6 : h ^= h >> 13;
618 6 : h = h.wrapping_mul(0xc2b2ae35);
619 6 : h ^= h >> 16;
620 6 : h
621 6 : }
622 :
623 : /// Provide the same result as the function in postgres `hashfn.h` with the same name
624 4 : fn hash_combine(mut a: u32, mut b: u32) -> u32 {
625 4 : b = b.wrapping_add(0x9e3779b9);
626 4 : b = b.wrapping_add(a << 6);
627 4 : b = b.wrapping_add(a >> 2);
628 4 :
629 4 : a ^= b;
630 4 : a
631 4 : }
632 :
633 : /// Where a Key is to be distributed across shards, select the shard. This function
634 : /// does not account for keys that should be broadcast across shards.
635 : ///
636 : /// The hashing in this function must exactly match what we do in postgres smgr
637 : /// code. The resulting distribution of pages is intended to preserve locality within
638 : /// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
639 : /// distributing data pseudo-randomly.
640 : ///
641 : /// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
642 : /// and will be handled at higher levels when shards are split.
643 3006 : fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
644 3006 : // Fast path for un-sharded tenants or broadcast keys
645 3006 : if count < ShardCount(2) || key_is_shard0(key) {
646 3004 : return ShardNumber(0);
647 2 : }
648 2 :
649 2 : // relNode
650 2 : let mut hash = murmurhash32(key.field4);
651 2 : // blockNum/stripe size
652 2 : hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
653 2 :
654 2 : ShardNumber((hash % count.0 as u32) as u8)
655 3006 : }
656 :
657 : #[cfg(test)]
658 : mod tests {
659 : use std::str::FromStr;
660 :
661 : use bincode;
662 : use utils::{id::TenantId, Hex};
663 :
664 : use super::*;
665 :
666 : const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
667 :
668 2 : #[test]
669 2 : fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
670 2 : let example = TenantShardId {
671 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
672 2 : shard_count: ShardCount(10),
673 2 : shard_number: ShardNumber(7),
674 2 : };
675 2 :
676 2 : let encoded = format!("{example}");
677 2 :
678 2 : let expected = format!("{EXAMPLE_TENANT_ID}-070a");
679 2 : assert_eq!(&encoded, &expected);
680 :
681 2 : let decoded = TenantShardId::from_str(&encoded)?;
682 :
683 2 : assert_eq!(example, decoded);
684 :
685 2 : Ok(())
686 2 : }
687 :
688 2 : #[test]
689 2 : fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
690 2 : let example = TenantShardId {
691 2 : tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
692 2 : shard_count: ShardCount(10),
693 2 : shard_number: ShardNumber(7),
694 2 : };
695 2 :
696 2 : let encoded = bincode::serialize(&example).unwrap();
697 2 : let expected: [u8; 18] = [
698 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
699 2 : 0xf6, 0xfc, 0x07, 0x0a,
700 2 : ];
701 2 : assert_eq!(Hex(&encoded), Hex(&expected));
702 :
703 2 : let decoded = bincode::deserialize(&encoded).unwrap();
704 2 :
705 2 : assert_eq!(example, decoded);
706 :
707 2 : Ok(())
708 2 : }
709 :
710 2 : #[test]
711 2 : fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
712 2 : // Test that TenantShardId can decode a TenantId in human
713 2 : // readable form
714 2 : let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
715 2 : let encoded = format!("{example}");
716 2 :
717 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
718 :
719 2 : let decoded = TenantShardId::from_str(&encoded)?;
720 :
721 2 : assert_eq!(example, decoded.tenant_id);
722 2 : assert_eq!(decoded.shard_count, ShardCount(0));
723 2 : assert_eq!(decoded.shard_number, ShardNumber(0));
724 :
725 2 : Ok(())
726 2 : }
727 :
728 2 : #[test]
729 2 : fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
730 2 : // Test that a legacy TenantShardId encodes into a form that
731 2 : // can be decoded as TenantId
732 2 : let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
733 2 : let example = TenantShardId::unsharded(example_tenant_id);
734 2 : let encoded = format!("{example}");
735 2 :
736 2 : assert_eq!(&encoded, EXAMPLE_TENANT_ID);
737 :
738 2 : let decoded = TenantId::from_str(&encoded)?;
739 :
740 2 : assert_eq!(example_tenant_id, decoded);
741 :
742 2 : Ok(())
743 2 : }
744 :
745 2 : #[test]
746 2 : fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
747 2 : // Unlike in human readable encoding, binary encoding does not
748 2 : // do any special handling of legacy unsharded TenantIds: this test
749 2 : // is equivalent to the main test for binary encoding, just verifying
750 2 : // that the same behavior applies when we have used `unsharded()` to
751 2 : // construct a TenantShardId.
752 2 : let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
753 2 : let encoded = bincode::serialize(&example).unwrap();
754 2 :
755 2 : let expected: [u8; 18] = [
756 2 : 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
757 2 : 0xf6, 0xfc, 0x00, 0x00,
758 2 : ];
759 2 : assert_eq!(Hex(&encoded), Hex(&expected));
760 :
761 2 : let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
762 2 : assert_eq!(example, decoded);
763 :
764 2 : Ok(())
765 2 : }
766 :
767 2 : #[test]
768 2 : fn shard_identity_validation() -> Result<(), ShardConfigError> {
769 2 : // Happy cases
770 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
771 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
772 2 : ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
773 :
774 2 : assert_eq!(
775 2 : ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
776 2 : Err(ShardConfigError::InvalidCount)
777 2 : );
778 2 : assert_eq!(
779 2 : ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
780 2 : Err(ShardConfigError::InvalidNumber)
781 2 : );
782 2 : assert_eq!(
783 2 : ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
784 2 : Err(ShardConfigError::InvalidNumber)
785 2 : );
786 2 : assert_eq!(
787 2 : ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
788 2 : Err(ShardConfigError::InvalidNumber)
789 2 : );
790 2 : assert_eq!(
791 2 : ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
792 2 : Err(ShardConfigError::InvalidStripeSize)
793 2 : );
794 :
795 2 : Ok(())
796 2 : }
797 :
798 2 : #[test]
799 2 : fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
800 2 : let example = ShardIndex {
801 2 : shard_number: ShardNumber(13),
802 2 : shard_count: ShardCount(17),
803 2 : };
804 2 : let expected: String = "0d11".to_string();
805 2 : let encoded = format!("{example}");
806 2 : assert_eq!(&encoded, &expected);
807 :
808 2 : let decoded = ShardIndex::from_str(&encoded)?;
809 2 : assert_eq!(example, decoded);
810 2 : Ok(())
811 2 : }
812 :
813 2 : #[test]
814 2 : fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
815 2 : let example = ShardIndex {
816 2 : shard_number: ShardNumber(13),
817 2 : shard_count: ShardCount(17),
818 2 : };
819 2 : let expected: [u8; 2] = [0x0d, 0x11];
820 2 :
821 2 : let encoded = bincode::serialize(&example).unwrap();
822 2 : assert_eq!(Hex(&encoded), Hex(&expected));
823 2 : let decoded = bincode::deserialize(&encoded).unwrap();
824 2 : assert_eq!(example, decoded);
825 :
826 2 : Ok(())
827 2 : }
828 :
829 : // These are only smoke tests to spot check that our implementation doesn't
830 : // deviate from a few examples values: not aiming to validate the overall
831 : // hashing algorithm.
832 2 : #[test]
833 2 : fn murmur_hash() {
834 2 : assert_eq!(murmurhash32(0), 0);
835 :
836 2 : assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
837 2 : }
838 :
839 2 : #[test]
840 2 : fn shard_mapping() {
841 2 : let key = Key {
842 2 : field1: 0x00,
843 2 : field2: 0x67f,
844 2 : field3: 0x5,
845 2 : field4: 0x400c,
846 2 : field5: 0x00,
847 2 : field6: 0x7d06,
848 2 : };
849 2 :
850 2 : let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
851 2 : assert_eq!(shard, ShardNumber(8));
852 2 : }
853 :
854 2 : #[test]
855 2 : fn shard_id_split() {
856 2 : let tenant_id = TenantId::generate();
857 2 : let parent = TenantShardId::unsharded(tenant_id);
858 2 :
859 2 : // Unsharded into 2
860 2 : assert_eq!(
861 2 : parent.split(ShardCount(2)),
862 2 : vec![
863 2 : TenantShardId {
864 2 : tenant_id,
865 2 : shard_count: ShardCount(2),
866 2 : shard_number: ShardNumber(0)
867 2 : },
868 2 : TenantShardId {
869 2 : tenant_id,
870 2 : shard_count: ShardCount(2),
871 2 : shard_number: ShardNumber(1)
872 2 : }
873 2 : ]
874 2 : );
875 :
876 : // Unsharded into 4
877 2 : assert_eq!(
878 2 : parent.split(ShardCount(4)),
879 2 : vec![
880 2 : TenantShardId {
881 2 : tenant_id,
882 2 : shard_count: ShardCount(4),
883 2 : shard_number: ShardNumber(0)
884 2 : },
885 2 : TenantShardId {
886 2 : tenant_id,
887 2 : shard_count: ShardCount(4),
888 2 : shard_number: ShardNumber(1)
889 2 : },
890 2 : TenantShardId {
891 2 : tenant_id,
892 2 : shard_count: ShardCount(4),
893 2 : shard_number: ShardNumber(2)
894 2 : },
895 2 : TenantShardId {
896 2 : tenant_id,
897 2 : shard_count: ShardCount(4),
898 2 : shard_number: ShardNumber(3)
899 2 : }
900 2 : ]
901 2 : );
902 :
903 : // count=1 into 2 (check this works the same as unsharded.)
904 2 : let parent = TenantShardId {
905 2 : tenant_id,
906 2 : shard_count: ShardCount(1),
907 2 : shard_number: ShardNumber(0),
908 2 : };
909 2 : assert_eq!(
910 2 : parent.split(ShardCount(2)),
911 2 : vec![
912 2 : TenantShardId {
913 2 : tenant_id,
914 2 : shard_count: ShardCount(2),
915 2 : shard_number: ShardNumber(0)
916 2 : },
917 2 : TenantShardId {
918 2 : tenant_id,
919 2 : shard_count: ShardCount(2),
920 2 : shard_number: ShardNumber(1)
921 2 : }
922 2 : ]
923 2 : );
924 :
925 : // count=2 into count=8
926 2 : let parent = TenantShardId {
927 2 : tenant_id,
928 2 : shard_count: ShardCount(2),
929 2 : shard_number: ShardNumber(1),
930 2 : };
931 2 : assert_eq!(
932 2 : parent.split(ShardCount(8)),
933 2 : vec![
934 2 : TenantShardId {
935 2 : tenant_id,
936 2 : shard_count: ShardCount(8),
937 2 : shard_number: ShardNumber(1)
938 2 : },
939 2 : TenantShardId {
940 2 : tenant_id,
941 2 : shard_count: ShardCount(8),
942 2 : shard_number: ShardNumber(3)
943 2 : },
944 2 : TenantShardId {
945 2 : tenant_id,
946 2 : shard_count: ShardCount(8),
947 2 : shard_number: ShardNumber(5)
948 2 : },
949 2 : TenantShardId {
950 2 : tenant_id,
951 2 : shard_count: ShardCount(8),
952 2 : shard_number: ShardNumber(7)
953 2 : },
954 2 : ]
955 2 : );
956 2 : }
957 : }
|