Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : use core::ops::Range;
6 : use std::collections::HashMap;
7 : use std::fmt::Display;
8 : use std::io::{BufRead, Read};
9 : use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
10 : use std::str::FromStr;
11 : use std::time::{Duration, SystemTime};
12 :
13 : use byteorder::{BigEndian, ReadBytesExt};
14 : use bytes::{Buf, BufMut, Bytes, BytesMut};
15 : #[cfg(feature = "testing")]
16 : use camino::Utf8PathBuf;
17 : use postgres_ffi::BLCKSZ;
18 : use serde::{Deserialize, Deserializer, Serialize, Serializer};
19 : use serde_with::serde_as;
20 : pub use utilization::PageserverUtilization;
21 : use utils::id::{NodeId, TenantId, TimelineId};
22 : use utils::lsn::Lsn;
23 : use utils::{completion, serde_system_time};
24 :
25 : use crate::config::Ratio;
26 : use crate::key::{CompactKey, Key};
27 : use crate::reltag::RelTag;
28 : use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
29 :
30 : /// The state of a tenant in this pageserver.
31 : ///
32 : /// ```mermaid
33 : /// stateDiagram-v2
34 : ///
35 : /// [*] --> Attaching: spawn_attach()
36 : ///
37 : /// Attaching --> Activating: activate()
38 : /// Activating --> Active: infallible
39 : ///
40 : /// Attaching --> Broken: attach() failure
41 : ///
42 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
43 : /// Stopping --> Broken: late error in remove_tenant_from_memory
44 : ///
45 : /// Broken --> [*]: ignore / detach / shutdown
46 : /// Stopping --> [*]: remove_from_memory complete
47 : ///
48 : /// Active --> Broken: cfg(testing)-only tenant break point
49 : /// ```
50 : #[derive(
51 : Clone,
52 : PartialEq,
53 : Eq,
54 0 : serde::Serialize,
55 1 : serde::Deserialize,
56 : strum_macros::Display,
57 : strum_macros::VariantNames,
58 : strum_macros::AsRefStr,
59 : strum_macros::IntoStaticStr,
60 : )]
61 : #[serde(tag = "slug", content = "data")]
62 : pub enum TenantState {
63 : /// This tenant is being attached to the pageserver.
64 : ///
65 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
66 : Attaching,
67 : /// The tenant is transitioning from Loading/Attaching to Active.
68 : ///
69 : /// While in this state, the individual timelines are being activated.
70 : ///
71 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
72 : Activating(ActivatingFrom),
73 : /// The tenant has finished activating and is open for business.
74 : ///
75 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
76 : Active,
77 : /// The tenant is recognized by pageserver, but it is being detached or the
78 : /// system is being shut down.
79 : ///
80 : /// Transitions out of this state are possible through `set_broken()`.
81 : Stopping {
82 : /// The barrier can be used to wait for shutdown to complete. The first caller to set
83 : /// Some(Barrier) is responsible for driving shutdown to completion. Subsequent callers
84 : /// will wait for the first caller's existing barrier.
85 : ///
86 : /// None is set when an attach is cancelled, to signal to shutdown that the attach has in
87 : /// fact cancelled:
88 : ///
89 : /// 1. `shutdown` sees `TenantState::Attaching`, and cancels the tenant.
90 : /// 2. `attach` sets `TenantState::Stopping(None)` and exits.
91 : /// 3. `set_stopping` waits for `TenantState::Stopping(None)` and sets
92 : /// `TenantState::Stopping(Some)` to claim the barrier as the shutdown owner.
93 : //
94 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
95 : // otherwise it will not be skipped during deserialization
96 : #[serde(skip)]
97 : progress: Option<completion::Barrier>,
98 : },
99 : /// The tenant is recognized by the pageserver, but can no longer be used for
100 : /// any operations.
101 : ///
102 : /// If the tenant fails to load or attach, it will transition to this state
103 : /// and it is guaranteed that no background tasks are running in its name.
104 : ///
105 : /// The other way to transition into this state is from `Stopping` state
106 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
107 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
108 : Broken { reason: String, backtrace: String },
109 : }
110 :
111 : impl TenantState {
112 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
113 : use TenantAttachmentStatus::*;
114 :
115 : // Below TenantState::Activating is used as "transient" or "transparent" state for
116 : // attachment_status determining.
117 0 : match self {
118 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
119 : // So, technically, we can return Attached here.
120 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
121 : // But, our attach task might still be fetching the remote timelines, etc.
122 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
123 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
124 : // We only reach Active after successful load / attach.
125 : // So, call atttachment status Attached.
126 0 : Self::Active => Attached,
127 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
128 : // However, it also becomes Broken if the regular load fails.
129 : // From Console's perspective there's no practical difference
130 : // because attachment_status is polled by console only during attach operation execution.
131 0 : Self::Broken { reason, .. } => Failed {
132 0 : reason: reason.to_owned(),
133 0 : },
134 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
135 : // we set the Stopping state irrespective of whether the tenant
136 : // has finished attaching or not.
137 0 : Self::Stopping { .. } => Maybe,
138 : }
139 0 : }
140 :
141 0 : pub fn broken_from_reason(reason: String) -> Self {
142 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
143 0 : Self::Broken {
144 0 : reason,
145 0 : backtrace: backtrace_str,
146 0 : }
147 0 : }
148 : }
149 :
150 : impl std::fmt::Debug for TenantState {
151 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 2 : match self {
153 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
154 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
155 : }
156 0 : _ => write!(f, "{self}"),
157 : }
158 2 : }
159 : }
160 :
161 : /// A temporary lease to a specific lsn inside a timeline.
162 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
163 : #[serde_as]
164 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
165 : pub struct LsnLease {
166 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
167 : pub valid_until: SystemTime,
168 : }
169 :
170 : serde_with::serde_conv!(
171 : SystemTimeAsRfc3339Millis,
172 : SystemTime,
173 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
174 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
175 : );
176 :
177 : impl LsnLease {
178 : /// The default length for an explicit LSN lease request (10 minutes).
179 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
180 :
181 : /// The default length for an implicit LSN lease granted during
182 : /// `get_lsn_by_timestamp` request (1 minutes).
183 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
184 :
185 : /// Checks whether the lease is expired.
186 3 : pub fn is_expired(&self, now: &SystemTime) -> bool {
187 3 : now > &self.valid_until
188 3 : }
189 : }
190 :
191 : /// Controls the detach ancestor behavior.
192 : /// - When set to `NoAncestorAndReparent`, we will only detach a branch if its ancestor is a root branch. It will automatically reparent any children of the ancestor before and at the branch point.
193 : /// - When set to `MultiLevelAndNoReparent`, we will detach a branch from multiple levels of ancestors, and no reparenting will happen at all.
194 : #[derive(Debug, Clone, Copy, Default)]
195 : pub enum DetachBehavior {
196 : #[default]
197 : NoAncestorAndReparent,
198 : MultiLevelAndNoReparent,
199 : }
200 :
201 : impl std::str::FromStr for DetachBehavior {
202 : type Err = &'static str;
203 :
204 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
205 0 : match s {
206 0 : "no_ancestor_and_reparent" => Ok(DetachBehavior::NoAncestorAndReparent),
207 0 : "multi_level_and_no_reparent" => Ok(DetachBehavior::MultiLevelAndNoReparent),
208 0 : "v1" => Ok(DetachBehavior::NoAncestorAndReparent),
209 0 : "v2" => Ok(DetachBehavior::MultiLevelAndNoReparent),
210 0 : _ => Err("cannot parse detach behavior"),
211 : }
212 0 : }
213 : }
214 :
215 : impl std::fmt::Display for DetachBehavior {
216 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 0 : match self {
218 0 : DetachBehavior::NoAncestorAndReparent => write!(f, "no_ancestor_and_reparent"),
219 0 : DetachBehavior::MultiLevelAndNoReparent => write!(f, "multi_level_and_no_reparent"),
220 : }
221 0 : }
222 : }
223 :
224 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
225 : ///
226 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
227 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
228 1 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
229 : pub enum ActivatingFrom {
230 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
231 : Attaching,
232 : }
233 :
234 : /// A state of a timeline in pageserver's memory.
235 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
236 : pub enum TimelineState {
237 : /// The timeline is recognized by the pageserver but is not yet operational.
238 : /// In particular, the walreceiver connection loop is not running for this timeline.
239 : /// It will eventually transition to state Active or Broken.
240 : Loading,
241 : /// The timeline is fully operational.
242 : /// It can be queried, and the walreceiver connection loop is running.
243 : Active,
244 : /// The timeline was previously Loading or Active but is shutting down.
245 : /// It cannot transition back into any other state.
246 : Stopping,
247 : /// The timeline is broken and not operational (previous states: Loading or Active).
248 : Broken { reason: String, backtrace: String },
249 : }
250 :
251 : #[serde_with::serde_as]
252 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
253 : pub struct CompactLsnRange {
254 : pub start: Lsn,
255 : pub end: Lsn,
256 : }
257 :
258 : #[serde_with::serde_as]
259 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
260 : pub struct CompactKeyRange {
261 : #[serde_as(as = "serde_with::DisplayFromStr")]
262 : pub start: Key,
263 : #[serde_as(as = "serde_with::DisplayFromStr")]
264 : pub end: Key,
265 : }
266 :
267 : impl From<Range<Lsn>> for CompactLsnRange {
268 3 : fn from(range: Range<Lsn>) -> Self {
269 3 : Self {
270 3 : start: range.start,
271 3 : end: range.end,
272 3 : }
273 3 : }
274 : }
275 :
276 : impl From<Range<Key>> for CompactKeyRange {
277 8 : fn from(range: Range<Key>) -> Self {
278 8 : Self {
279 8 : start: range.start,
280 8 : end: range.end,
281 8 : }
282 8 : }
283 : }
284 :
285 : impl From<CompactLsnRange> for Range<Lsn> {
286 5 : fn from(range: CompactLsnRange) -> Self {
287 5 : range.start..range.end
288 5 : }
289 : }
290 :
291 : impl From<CompactKeyRange> for Range<Key> {
292 8 : fn from(range: CompactKeyRange) -> Self {
293 8 : range.start..range.end
294 8 : }
295 : }
296 :
297 : impl CompactLsnRange {
298 2 : pub fn above(lsn: Lsn) -> Self {
299 2 : Self {
300 2 : start: lsn,
301 2 : end: Lsn::MAX,
302 2 : }
303 2 : }
304 : }
305 :
306 : #[derive(Debug, Clone, Serialize)]
307 : pub struct CompactInfoResponse {
308 : pub compact_key_range: Option<CompactKeyRange>,
309 : pub compact_lsn_range: Option<CompactLsnRange>,
310 : pub sub_compaction: bool,
311 : pub running: bool,
312 : pub job_id: usize,
313 : }
314 :
315 0 : #[derive(Serialize, Deserialize, Clone)]
316 : pub struct TimelineCreateRequest {
317 : pub new_timeline_id: TimelineId,
318 : #[serde(flatten)]
319 : pub mode: TimelineCreateRequestMode,
320 : }
321 :
322 : impl TimelineCreateRequest {
323 0 : pub fn mode_tag(&self) -> &'static str {
324 0 : match &self.mode {
325 0 : TimelineCreateRequestMode::Branch { .. } => "branch",
326 0 : TimelineCreateRequestMode::ImportPgdata { .. } => "import",
327 0 : TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
328 : }
329 0 : }
330 :
331 0 : pub fn is_import(&self) -> bool {
332 0 : matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. })
333 0 : }
334 : }
335 :
336 0 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
337 : pub enum ShardImportStatus {
338 : InProgress(Option<ShardImportProgress>),
339 : Done,
340 : Error(String),
341 : }
342 :
343 0 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
344 : pub enum ShardImportProgress {
345 : V1(ShardImportProgressV1),
346 : }
347 :
348 0 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
349 : pub struct ShardImportProgressV1 {
350 : /// Total number of jobs in the import plan
351 : pub jobs: usize,
352 : /// Number of jobs completed
353 : pub completed: usize,
354 : /// Hash of the plan
355 : pub import_plan_hash: u64,
356 : /// Soft limit for the job size
357 : /// This needs to remain constant throughout the import
358 : pub job_soft_size_limit: usize,
359 : }
360 :
361 : impl ShardImportStatus {
362 0 : pub fn is_terminal(&self) -> bool {
363 0 : match self {
364 0 : ShardImportStatus::InProgress(_) => false,
365 0 : ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
366 : }
367 0 : }
368 : }
369 :
370 : /// Storage controller specific extensions to [`TimelineInfo`].
371 0 : #[derive(Serialize, Deserialize, Clone)]
372 : pub struct TimelineCreateResponseStorcon {
373 : #[serde(flatten)]
374 : pub timeline_info: TimelineInfo,
375 :
376 : pub safekeepers: Option<SafekeepersInfo>,
377 : }
378 :
379 : /// Safekeepers as returned in timeline creation request to storcon or pushed to
380 : /// cplane in the post migration hook.
381 0 : #[derive(Serialize, Deserialize, Clone)]
382 : pub struct SafekeepersInfo {
383 : pub tenant_id: TenantId,
384 : pub timeline_id: TimelineId,
385 : pub generation: u32,
386 : pub safekeepers: Vec<SafekeeperInfo>,
387 : }
388 :
389 0 : #[derive(Serialize, Deserialize, Clone)]
390 : pub struct SafekeeperInfo {
391 : pub id: NodeId,
392 : pub hostname: String,
393 : }
394 :
395 0 : #[derive(Serialize, Deserialize, Clone)]
396 : #[serde(untagged)]
397 : pub enum TimelineCreateRequestMode {
398 : Branch {
399 : ancestor_timeline_id: TimelineId,
400 : #[serde(default)]
401 : ancestor_start_lsn: Option<Lsn>,
402 : // TODO: cplane sets this, but, the branching code always
403 : // inherits the ancestor's pg_version. Earlier code wasn't
404 : // using a flattened enum, so, it was an accepted field, and
405 : // we continue to accept it by having it here.
406 : pg_version: Option<u32>,
407 : #[serde(default, skip_serializing_if = "std::ops::Not::not")]
408 : read_only: bool,
409 : },
410 : ImportPgdata {
411 : import_pgdata: TimelineCreateRequestModeImportPgdata,
412 : },
413 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
414 : // (serde picks the first matching enum variant, in declaration order).
415 : Bootstrap {
416 : #[serde(default)]
417 : existing_initdb_timeline_id: Option<TimelineId>,
418 : pg_version: Option<u32>,
419 : },
420 : }
421 :
422 0 : #[derive(Serialize, Deserialize, Clone)]
423 : pub struct TimelineCreateRequestModeImportPgdata {
424 : pub location: ImportPgdataLocation,
425 : pub idempotency_key: ImportPgdataIdempotencyKey,
426 : }
427 :
428 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
429 : pub enum ImportPgdataLocation {
430 : #[cfg(feature = "testing")]
431 : LocalFs { path: Utf8PathBuf },
432 : AwsS3 {
433 : region: String,
434 : bucket: String,
435 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
436 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
437 : key: String,
438 : },
439 : }
440 :
441 0 : #[derive(Serialize, Deserialize, Clone)]
442 : #[serde(transparent)]
443 : pub struct ImportPgdataIdempotencyKey(pub String);
444 :
445 : impl ImportPgdataIdempotencyKey {
446 0 : pub fn random() -> Self {
447 : use rand::Rng;
448 : use rand::distributions::Alphanumeric;
449 0 : Self(
450 0 : rand::thread_rng()
451 0 : .sample_iter(&Alphanumeric)
452 0 : .take(20)
453 0 : .map(char::from)
454 0 : .collect(),
455 0 : )
456 0 : }
457 : }
458 :
459 0 : #[derive(Serialize, Deserialize, Clone)]
460 : pub struct LsnLeaseRequest {
461 : pub lsn: Lsn,
462 : }
463 :
464 0 : #[derive(Serialize, Deserialize)]
465 : pub struct TenantShardSplitRequest {
466 : pub new_shard_count: u8,
467 :
468 : // A tenant's stripe size is only meaningful the first time their shard count goes
469 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
470 : //
471 : // If this is set while the stripe count is being increased from an already >1 value,
472 : // then the request will fail with 400.
473 : pub new_stripe_size: Option<ShardStripeSize>,
474 : }
475 :
476 0 : #[derive(Serialize, Deserialize)]
477 : pub struct TenantShardSplitResponse {
478 : pub new_shards: Vec<TenantShardId>,
479 : }
480 :
481 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
482 0 : #[derive(Serialize, Deserialize, Debug)]
483 : #[serde(deny_unknown_fields)]
484 : pub struct ShardParameters {
485 : pub count: ShardCount,
486 : pub stripe_size: ShardStripeSize,
487 : }
488 :
489 : impl ShardParameters {
490 0 : pub fn is_unsharded(&self) -> bool {
491 0 : self.count.is_unsharded()
492 0 : }
493 : }
494 :
495 : impl Default for ShardParameters {
496 119 : fn default() -> Self {
497 119 : Self {
498 119 : count: ShardCount::new(0),
499 119 : stripe_size: DEFAULT_STRIPE_SIZE,
500 119 : }
501 119 : }
502 : }
503 :
504 : #[derive(Debug, Default, Clone, Eq, PartialEq)]
505 : pub enum FieldPatch<T> {
506 : Upsert(T),
507 : Remove,
508 : #[default]
509 : Noop,
510 : }
511 :
512 : impl<T> FieldPatch<T> {
513 76 : fn is_noop(&self) -> bool {
514 76 : matches!(self, FieldPatch::Noop)
515 76 : }
516 :
517 38 : pub fn apply(self, target: &mut Option<T>) {
518 38 : match self {
519 1 : Self::Upsert(v) => *target = Some(v),
520 1 : Self::Remove => *target = None,
521 36 : Self::Noop => {}
522 : }
523 38 : }
524 :
525 10 : pub fn map<U, E, F: FnOnce(T) -> Result<U, E>>(self, map: F) -> Result<FieldPatch<U>, E> {
526 10 : match self {
527 0 : Self::Upsert(v) => Ok(FieldPatch::<U>::Upsert(map(v)?)),
528 0 : Self::Remove => Ok(FieldPatch::<U>::Remove),
529 10 : Self::Noop => Ok(FieldPatch::<U>::Noop),
530 : }
531 10 : }
532 : }
533 :
534 : impl<'de, T: Deserialize<'de>> Deserialize<'de> for FieldPatch<T> {
535 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
536 2 : where
537 2 : D: Deserializer<'de>,
538 2 : {
539 2 : Option::deserialize(deserializer).map(|opt| match opt {
540 1 : None => FieldPatch::Remove,
541 1 : Some(val) => FieldPatch::Upsert(val),
542 2 : })
543 2 : }
544 : }
545 :
546 : impl<T: Serialize> Serialize for FieldPatch<T> {
547 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
548 2 : where
549 2 : S: Serializer,
550 2 : {
551 2 : match self {
552 1 : FieldPatch::Upsert(val) => serializer.serialize_some(val),
553 1 : FieldPatch::Remove => serializer.serialize_none(),
554 0 : FieldPatch::Noop => unreachable!(),
555 : }
556 2 : }
557 : }
558 :
559 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
560 : #[serde(default)]
561 : pub struct TenantConfigPatch {
562 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
563 : pub checkpoint_distance: FieldPatch<u64>,
564 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
565 : pub checkpoint_timeout: FieldPatch<String>,
566 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
567 : pub compaction_target_size: FieldPatch<u64>,
568 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
569 : pub compaction_period: FieldPatch<String>,
570 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
571 : pub compaction_threshold: FieldPatch<usize>,
572 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
573 : pub compaction_upper_limit: FieldPatch<usize>,
574 : // defer parsing compaction_algorithm, like eviction_policy
575 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
576 : pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
577 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
578 : pub compaction_shard_ancestor: FieldPatch<bool>,
579 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
580 : pub compaction_l0_first: FieldPatch<bool>,
581 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
582 : pub compaction_l0_semaphore: FieldPatch<bool>,
583 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
584 : pub l0_flush_delay_threshold: FieldPatch<usize>,
585 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
586 : pub l0_flush_stall_threshold: FieldPatch<usize>,
587 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
588 : pub gc_horizon: FieldPatch<u64>,
589 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
590 : pub gc_period: FieldPatch<String>,
591 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
592 : pub image_creation_threshold: FieldPatch<usize>,
593 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
594 : pub pitr_interval: FieldPatch<String>,
595 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
596 : pub walreceiver_connect_timeout: FieldPatch<String>,
597 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
598 : pub lagging_wal_timeout: FieldPatch<String>,
599 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
600 : pub max_lsn_wal_lag: FieldPatch<NonZeroU64>,
601 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
602 : pub eviction_policy: FieldPatch<EvictionPolicy>,
603 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
604 : pub min_resident_size_override: FieldPatch<u64>,
605 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
606 : pub evictions_low_residence_duration_metric_threshold: FieldPatch<String>,
607 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
608 : pub heatmap_period: FieldPatch<String>,
609 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
610 : pub lazy_slru_download: FieldPatch<bool>,
611 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
612 : pub timeline_get_throttle: FieldPatch<ThrottleConfig>,
613 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
614 : pub image_layer_creation_check_threshold: FieldPatch<u8>,
615 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
616 : pub image_creation_preempt_threshold: FieldPatch<usize>,
617 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
618 : pub lsn_lease_length: FieldPatch<String>,
619 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
620 : pub lsn_lease_length_for_ts: FieldPatch<String>,
621 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
622 : pub timeline_offloading: FieldPatch<bool>,
623 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
624 : pub rel_size_v2_enabled: FieldPatch<bool>,
625 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
626 : pub gc_compaction_enabled: FieldPatch<bool>,
627 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
628 : pub gc_compaction_verification: FieldPatch<bool>,
629 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
630 : pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
631 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
632 : pub gc_compaction_ratio_percent: FieldPatch<u64>,
633 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
634 : pub sampling_ratio: FieldPatch<Option<Ratio>>,
635 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
636 : pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
637 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
638 : pub basebackup_cache_enabled: FieldPatch<bool>,
639 : }
640 :
641 : /// Like [`crate::config::TenantConfigToml`], but preserves the information
642 : /// about which parameters are set and which are not.
643 : ///
644 : /// Used in many places, including durably stored ones.
645 2 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
646 : #[serde(default)] // this maps omitted fields in deserialization to None
647 : pub struct TenantConfig {
648 : #[serde(skip_serializing_if = "Option::is_none")]
649 : pub checkpoint_distance: Option<u64>,
650 :
651 : #[serde(skip_serializing_if = "Option::is_none")]
652 : #[serde(with = "humantime_serde")]
653 : pub checkpoint_timeout: Option<Duration>,
654 :
655 : #[serde(skip_serializing_if = "Option::is_none")]
656 : pub compaction_target_size: Option<u64>,
657 :
658 : #[serde(skip_serializing_if = "Option::is_none")]
659 : #[serde(with = "humantime_serde")]
660 : pub compaction_period: Option<Duration>,
661 :
662 : #[serde(skip_serializing_if = "Option::is_none")]
663 : pub compaction_threshold: Option<usize>,
664 :
665 : #[serde(skip_serializing_if = "Option::is_none")]
666 : pub compaction_upper_limit: Option<usize>,
667 :
668 : #[serde(skip_serializing_if = "Option::is_none")]
669 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
670 :
671 : #[serde(skip_serializing_if = "Option::is_none")]
672 : pub compaction_shard_ancestor: Option<bool>,
673 :
674 : #[serde(skip_serializing_if = "Option::is_none")]
675 : pub compaction_l0_first: Option<bool>,
676 :
677 : #[serde(skip_serializing_if = "Option::is_none")]
678 : pub compaction_l0_semaphore: Option<bool>,
679 :
680 : #[serde(skip_serializing_if = "Option::is_none")]
681 : pub l0_flush_delay_threshold: Option<usize>,
682 :
683 : #[serde(skip_serializing_if = "Option::is_none")]
684 : pub l0_flush_stall_threshold: Option<usize>,
685 :
686 : #[serde(skip_serializing_if = "Option::is_none")]
687 : pub gc_horizon: Option<u64>,
688 :
689 : #[serde(skip_serializing_if = "Option::is_none")]
690 : #[serde(with = "humantime_serde")]
691 : pub gc_period: Option<Duration>,
692 :
693 : #[serde(skip_serializing_if = "Option::is_none")]
694 : pub image_creation_threshold: Option<usize>,
695 :
696 : #[serde(skip_serializing_if = "Option::is_none")]
697 : #[serde(with = "humantime_serde")]
698 : pub pitr_interval: Option<Duration>,
699 :
700 : #[serde(skip_serializing_if = "Option::is_none")]
701 : #[serde(with = "humantime_serde")]
702 : pub walreceiver_connect_timeout: Option<Duration>,
703 :
704 : #[serde(skip_serializing_if = "Option::is_none")]
705 : #[serde(with = "humantime_serde")]
706 : pub lagging_wal_timeout: Option<Duration>,
707 :
708 : #[serde(skip_serializing_if = "Option::is_none")]
709 : pub max_lsn_wal_lag: Option<NonZeroU64>,
710 :
711 : #[serde(skip_serializing_if = "Option::is_none")]
712 : pub eviction_policy: Option<EvictionPolicy>,
713 :
714 : #[serde(skip_serializing_if = "Option::is_none")]
715 : pub min_resident_size_override: Option<u64>,
716 :
717 : #[serde(skip_serializing_if = "Option::is_none")]
718 : #[serde(with = "humantime_serde")]
719 : pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
720 :
721 : #[serde(skip_serializing_if = "Option::is_none")]
722 : #[serde(with = "humantime_serde")]
723 : pub heatmap_period: Option<Duration>,
724 :
725 : #[serde(skip_serializing_if = "Option::is_none")]
726 : pub lazy_slru_download: Option<bool>,
727 :
728 : #[serde(skip_serializing_if = "Option::is_none")]
729 : pub timeline_get_throttle: Option<ThrottleConfig>,
730 :
731 : #[serde(skip_serializing_if = "Option::is_none")]
732 : pub image_layer_creation_check_threshold: Option<u8>,
733 :
734 : #[serde(skip_serializing_if = "Option::is_none")]
735 : pub image_creation_preempt_threshold: Option<usize>,
736 :
737 : #[serde(skip_serializing_if = "Option::is_none")]
738 : #[serde(with = "humantime_serde")]
739 : pub lsn_lease_length: Option<Duration>,
740 :
741 : #[serde(skip_serializing_if = "Option::is_none")]
742 : #[serde(with = "humantime_serde")]
743 : pub lsn_lease_length_for_ts: Option<Duration>,
744 :
745 : #[serde(skip_serializing_if = "Option::is_none")]
746 : pub timeline_offloading: Option<bool>,
747 :
748 : #[serde(skip_serializing_if = "Option::is_none")]
749 : pub rel_size_v2_enabled: Option<bool>,
750 :
751 : #[serde(skip_serializing_if = "Option::is_none")]
752 : pub gc_compaction_enabled: Option<bool>,
753 :
754 : #[serde(skip_serializing_if = "Option::is_none")]
755 : pub gc_compaction_verification: Option<bool>,
756 :
757 : #[serde(skip_serializing_if = "Option::is_none")]
758 : pub gc_compaction_initial_threshold_kb: Option<u64>,
759 :
760 : #[serde(skip_serializing_if = "Option::is_none")]
761 : pub gc_compaction_ratio_percent: Option<u64>,
762 :
763 : #[serde(skip_serializing_if = "Option::is_none")]
764 : pub sampling_ratio: Option<Option<Ratio>>,
765 :
766 : #[serde(skip_serializing_if = "Option::is_none")]
767 : pub relsize_snapshot_cache_capacity: Option<usize>,
768 :
769 : #[serde(skip_serializing_if = "Option::is_none")]
770 : pub basebackup_cache_enabled: Option<bool>,
771 : }
772 :
773 : impl TenantConfig {
774 1 : pub fn apply_patch(
775 1 : self,
776 1 : patch: TenantConfigPatch,
777 1 : ) -> Result<TenantConfig, humantime::DurationError> {
778 1 : let Self {
779 1 : mut checkpoint_distance,
780 1 : mut checkpoint_timeout,
781 1 : mut compaction_target_size,
782 1 : mut compaction_period,
783 1 : mut compaction_threshold,
784 1 : mut compaction_upper_limit,
785 1 : mut compaction_algorithm,
786 1 : mut compaction_shard_ancestor,
787 1 : mut compaction_l0_first,
788 1 : mut compaction_l0_semaphore,
789 1 : mut l0_flush_delay_threshold,
790 1 : mut l0_flush_stall_threshold,
791 1 : mut gc_horizon,
792 1 : mut gc_period,
793 1 : mut image_creation_threshold,
794 1 : mut pitr_interval,
795 1 : mut walreceiver_connect_timeout,
796 1 : mut lagging_wal_timeout,
797 1 : mut max_lsn_wal_lag,
798 1 : mut eviction_policy,
799 1 : mut min_resident_size_override,
800 1 : mut evictions_low_residence_duration_metric_threshold,
801 1 : mut heatmap_period,
802 1 : mut lazy_slru_download,
803 1 : mut timeline_get_throttle,
804 1 : mut image_layer_creation_check_threshold,
805 1 : mut image_creation_preempt_threshold,
806 1 : mut lsn_lease_length,
807 1 : mut lsn_lease_length_for_ts,
808 1 : mut timeline_offloading,
809 1 : mut rel_size_v2_enabled,
810 1 : mut gc_compaction_enabled,
811 1 : mut gc_compaction_verification,
812 1 : mut gc_compaction_initial_threshold_kb,
813 1 : mut gc_compaction_ratio_percent,
814 1 : mut sampling_ratio,
815 1 : mut relsize_snapshot_cache_capacity,
816 1 : mut basebackup_cache_enabled,
817 1 : } = self;
818 1 :
819 1 : patch.checkpoint_distance.apply(&mut checkpoint_distance);
820 1 : patch
821 1 : .checkpoint_timeout
822 1 : .map(|v| humantime::parse_duration(&v))?
823 1 : .apply(&mut checkpoint_timeout);
824 1 : patch
825 1 : .compaction_target_size
826 1 : .apply(&mut compaction_target_size);
827 1 : patch
828 1 : .compaction_period
829 1 : .map(|v| humantime::parse_duration(&v))?
830 1 : .apply(&mut compaction_period);
831 1 : patch.compaction_threshold.apply(&mut compaction_threshold);
832 1 : patch
833 1 : .compaction_upper_limit
834 1 : .apply(&mut compaction_upper_limit);
835 1 : patch.compaction_algorithm.apply(&mut compaction_algorithm);
836 1 : patch
837 1 : .compaction_shard_ancestor
838 1 : .apply(&mut compaction_shard_ancestor);
839 1 : patch.compaction_l0_first.apply(&mut compaction_l0_first);
840 1 : patch
841 1 : .compaction_l0_semaphore
842 1 : .apply(&mut compaction_l0_semaphore);
843 1 : patch
844 1 : .l0_flush_delay_threshold
845 1 : .apply(&mut l0_flush_delay_threshold);
846 1 : patch
847 1 : .l0_flush_stall_threshold
848 1 : .apply(&mut l0_flush_stall_threshold);
849 1 : patch.gc_horizon.apply(&mut gc_horizon);
850 1 : patch
851 1 : .gc_period
852 1 : .map(|v| humantime::parse_duration(&v))?
853 1 : .apply(&mut gc_period);
854 1 : patch
855 1 : .image_creation_threshold
856 1 : .apply(&mut image_creation_threshold);
857 1 : patch
858 1 : .pitr_interval
859 1 : .map(|v| humantime::parse_duration(&v))?
860 1 : .apply(&mut pitr_interval);
861 1 : patch
862 1 : .walreceiver_connect_timeout
863 1 : .map(|v| humantime::parse_duration(&v))?
864 1 : .apply(&mut walreceiver_connect_timeout);
865 1 : patch
866 1 : .lagging_wal_timeout
867 1 : .map(|v| humantime::parse_duration(&v))?
868 1 : .apply(&mut lagging_wal_timeout);
869 1 : patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
870 1 : patch.eviction_policy.apply(&mut eviction_policy);
871 1 : patch
872 1 : .min_resident_size_override
873 1 : .apply(&mut min_resident_size_override);
874 1 : patch
875 1 : .evictions_low_residence_duration_metric_threshold
876 1 : .map(|v| humantime::parse_duration(&v))?
877 1 : .apply(&mut evictions_low_residence_duration_metric_threshold);
878 1 : patch
879 1 : .heatmap_period
880 1 : .map(|v| humantime::parse_duration(&v))?
881 1 : .apply(&mut heatmap_period);
882 1 : patch.lazy_slru_download.apply(&mut lazy_slru_download);
883 1 : patch
884 1 : .timeline_get_throttle
885 1 : .apply(&mut timeline_get_throttle);
886 1 : patch
887 1 : .image_layer_creation_check_threshold
888 1 : .apply(&mut image_layer_creation_check_threshold);
889 1 : patch
890 1 : .image_creation_preempt_threshold
891 1 : .apply(&mut image_creation_preempt_threshold);
892 1 : patch
893 1 : .lsn_lease_length
894 1 : .map(|v| humantime::parse_duration(&v))?
895 1 : .apply(&mut lsn_lease_length);
896 1 : patch
897 1 : .lsn_lease_length_for_ts
898 1 : .map(|v| humantime::parse_duration(&v))?
899 1 : .apply(&mut lsn_lease_length_for_ts);
900 1 : patch.timeline_offloading.apply(&mut timeline_offloading);
901 1 : patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled);
902 1 : patch
903 1 : .gc_compaction_enabled
904 1 : .apply(&mut gc_compaction_enabled);
905 1 : patch
906 1 : .gc_compaction_verification
907 1 : .apply(&mut gc_compaction_verification);
908 1 : patch
909 1 : .gc_compaction_initial_threshold_kb
910 1 : .apply(&mut gc_compaction_initial_threshold_kb);
911 1 : patch
912 1 : .gc_compaction_ratio_percent
913 1 : .apply(&mut gc_compaction_ratio_percent);
914 1 : patch.sampling_ratio.apply(&mut sampling_ratio);
915 1 : patch
916 1 : .relsize_snapshot_cache_capacity
917 1 : .apply(&mut relsize_snapshot_cache_capacity);
918 1 : patch
919 1 : .basebackup_cache_enabled
920 1 : .apply(&mut basebackup_cache_enabled);
921 1 :
922 1 : Ok(Self {
923 1 : checkpoint_distance,
924 1 : checkpoint_timeout,
925 1 : compaction_target_size,
926 1 : compaction_period,
927 1 : compaction_threshold,
928 1 : compaction_upper_limit,
929 1 : compaction_algorithm,
930 1 : compaction_shard_ancestor,
931 1 : compaction_l0_first,
932 1 : compaction_l0_semaphore,
933 1 : l0_flush_delay_threshold,
934 1 : l0_flush_stall_threshold,
935 1 : gc_horizon,
936 1 : gc_period,
937 1 : image_creation_threshold,
938 1 : pitr_interval,
939 1 : walreceiver_connect_timeout,
940 1 : lagging_wal_timeout,
941 1 : max_lsn_wal_lag,
942 1 : eviction_policy,
943 1 : min_resident_size_override,
944 1 : evictions_low_residence_duration_metric_threshold,
945 1 : heatmap_period,
946 1 : lazy_slru_download,
947 1 : timeline_get_throttle,
948 1 : image_layer_creation_check_threshold,
949 1 : image_creation_preempt_threshold,
950 1 : lsn_lease_length,
951 1 : lsn_lease_length_for_ts,
952 1 : timeline_offloading,
953 1 : rel_size_v2_enabled,
954 1 : gc_compaction_enabled,
955 1 : gc_compaction_verification,
956 1 : gc_compaction_initial_threshold_kb,
957 1 : gc_compaction_ratio_percent,
958 1 : sampling_ratio,
959 1 : relsize_snapshot_cache_capacity,
960 1 : basebackup_cache_enabled,
961 1 : })
962 1 : }
963 :
964 0 : pub fn merge(
965 0 : &self,
966 0 : global_conf: crate::config::TenantConfigToml,
967 0 : ) -> crate::config::TenantConfigToml {
968 0 : crate::config::TenantConfigToml {
969 0 : checkpoint_distance: self
970 0 : .checkpoint_distance
971 0 : .unwrap_or(global_conf.checkpoint_distance),
972 0 : checkpoint_timeout: self
973 0 : .checkpoint_timeout
974 0 : .unwrap_or(global_conf.checkpoint_timeout),
975 0 : compaction_target_size: self
976 0 : .compaction_target_size
977 0 : .unwrap_or(global_conf.compaction_target_size),
978 0 : compaction_period: self
979 0 : .compaction_period
980 0 : .unwrap_or(global_conf.compaction_period),
981 0 : compaction_threshold: self
982 0 : .compaction_threshold
983 0 : .unwrap_or(global_conf.compaction_threshold),
984 0 : compaction_upper_limit: self
985 0 : .compaction_upper_limit
986 0 : .unwrap_or(global_conf.compaction_upper_limit),
987 0 : compaction_algorithm: self
988 0 : .compaction_algorithm
989 0 : .as_ref()
990 0 : .unwrap_or(&global_conf.compaction_algorithm)
991 0 : .clone(),
992 0 : compaction_shard_ancestor: self
993 0 : .compaction_shard_ancestor
994 0 : .unwrap_or(global_conf.compaction_shard_ancestor),
995 0 : compaction_l0_first: self
996 0 : .compaction_l0_first
997 0 : .unwrap_or(global_conf.compaction_l0_first),
998 0 : compaction_l0_semaphore: self
999 0 : .compaction_l0_semaphore
1000 0 : .unwrap_or(global_conf.compaction_l0_semaphore),
1001 0 : l0_flush_delay_threshold: self
1002 0 : .l0_flush_delay_threshold
1003 0 : .or(global_conf.l0_flush_delay_threshold),
1004 0 : l0_flush_stall_threshold: self
1005 0 : .l0_flush_stall_threshold
1006 0 : .or(global_conf.l0_flush_stall_threshold),
1007 0 : gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
1008 0 : gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
1009 0 : image_creation_threshold: self
1010 0 : .image_creation_threshold
1011 0 : .unwrap_or(global_conf.image_creation_threshold),
1012 0 : pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
1013 0 : walreceiver_connect_timeout: self
1014 0 : .walreceiver_connect_timeout
1015 0 : .unwrap_or(global_conf.walreceiver_connect_timeout),
1016 0 : lagging_wal_timeout: self
1017 0 : .lagging_wal_timeout
1018 0 : .unwrap_or(global_conf.lagging_wal_timeout),
1019 0 : max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
1020 0 : eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
1021 0 : min_resident_size_override: self
1022 0 : .min_resident_size_override
1023 0 : .or(global_conf.min_resident_size_override),
1024 0 : evictions_low_residence_duration_metric_threshold: self
1025 0 : .evictions_low_residence_duration_metric_threshold
1026 0 : .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
1027 0 : heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
1028 0 : lazy_slru_download: self
1029 0 : .lazy_slru_download
1030 0 : .unwrap_or(global_conf.lazy_slru_download),
1031 0 : timeline_get_throttle: self
1032 0 : .timeline_get_throttle
1033 0 : .clone()
1034 0 : .unwrap_or(global_conf.timeline_get_throttle),
1035 0 : image_layer_creation_check_threshold: self
1036 0 : .image_layer_creation_check_threshold
1037 0 : .unwrap_or(global_conf.image_layer_creation_check_threshold),
1038 0 : image_creation_preempt_threshold: self
1039 0 : .image_creation_preempt_threshold
1040 0 : .unwrap_or(global_conf.image_creation_preempt_threshold),
1041 0 : lsn_lease_length: self
1042 0 : .lsn_lease_length
1043 0 : .unwrap_or(global_conf.lsn_lease_length),
1044 0 : lsn_lease_length_for_ts: self
1045 0 : .lsn_lease_length_for_ts
1046 0 : .unwrap_or(global_conf.lsn_lease_length_for_ts),
1047 0 : timeline_offloading: self
1048 0 : .timeline_offloading
1049 0 : .unwrap_or(global_conf.timeline_offloading),
1050 0 : rel_size_v2_enabled: self
1051 0 : .rel_size_v2_enabled
1052 0 : .unwrap_or(global_conf.rel_size_v2_enabled),
1053 0 : gc_compaction_enabled: self
1054 0 : .gc_compaction_enabled
1055 0 : .unwrap_or(global_conf.gc_compaction_enabled),
1056 0 : gc_compaction_verification: self
1057 0 : .gc_compaction_verification
1058 0 : .unwrap_or(global_conf.gc_compaction_verification),
1059 0 : gc_compaction_initial_threshold_kb: self
1060 0 : .gc_compaction_initial_threshold_kb
1061 0 : .unwrap_or(global_conf.gc_compaction_initial_threshold_kb),
1062 0 : gc_compaction_ratio_percent: self
1063 0 : .gc_compaction_ratio_percent
1064 0 : .unwrap_or(global_conf.gc_compaction_ratio_percent),
1065 0 : sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
1066 0 : relsize_snapshot_cache_capacity: self
1067 0 : .relsize_snapshot_cache_capacity
1068 0 : .unwrap_or(global_conf.relsize_snapshot_cache_capacity),
1069 0 : basebackup_cache_enabled: self
1070 0 : .basebackup_cache_enabled
1071 0 : .unwrap_or(global_conf.basebackup_cache_enabled),
1072 0 : }
1073 0 : }
1074 : }
1075 :
1076 : /// The policy for the aux file storage.
1077 : ///
1078 : /// It can be switched through `switch_aux_file_policy` tenant config.
1079 : /// When the first aux file written, the policy will be persisted in the
1080 : /// `index_part.json` file and has a limited migration path.
1081 : ///
1082 : /// Currently, we only allow the following migration path:
1083 : ///
1084 : /// Unset -> V1
1085 : /// -> V2
1086 : /// -> CrossValidation -> V2
1087 : #[derive(
1088 : Eq,
1089 : PartialEq,
1090 : Debug,
1091 : Copy,
1092 : Clone,
1093 0 : strum_macros::EnumString,
1094 : strum_macros::Display,
1095 1 : serde_with::DeserializeFromStr,
1096 : serde_with::SerializeDisplay,
1097 : )]
1098 : #[strum(serialize_all = "kebab-case")]
1099 : pub enum AuxFilePolicy {
1100 : /// V1 aux file policy: store everything in AUX_FILE_KEY
1101 : #[strum(ascii_case_insensitive)]
1102 : V1,
1103 : /// V2 aux file policy: store in the AUX_FILE keyspace
1104 : #[strum(ascii_case_insensitive)]
1105 : V2,
1106 : /// Cross validation runs both formats on the write path and does validation
1107 : /// on the read path.
1108 : #[strum(ascii_case_insensitive)]
1109 : CrossValidation,
1110 : }
1111 :
1112 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1113 : #[serde(tag = "kind")]
1114 : pub enum EvictionPolicy {
1115 : NoEviction,
1116 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
1117 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
1118 : }
1119 :
1120 : impl EvictionPolicy {
1121 0 : pub fn discriminant_str(&self) -> &'static str {
1122 0 : match self {
1123 0 : EvictionPolicy::NoEviction => "NoEviction",
1124 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
1125 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
1126 : }
1127 0 : }
1128 : }
1129 :
1130 : #[derive(
1131 : Eq,
1132 : PartialEq,
1133 : Debug,
1134 : Copy,
1135 : Clone,
1136 0 : strum_macros::EnumString,
1137 : strum_macros::Display,
1138 0 : serde_with::DeserializeFromStr,
1139 : serde_with::SerializeDisplay,
1140 : )]
1141 : #[strum(serialize_all = "kebab-case")]
1142 : pub enum CompactionAlgorithm {
1143 : Legacy,
1144 : Tiered,
1145 : }
1146 :
1147 : #[derive(
1148 4 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
1149 : )]
1150 : pub enum ImageCompressionAlgorithm {
1151 : // Disabled for writes, support decompressing during read path
1152 : Disabled,
1153 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
1154 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
1155 : Zstd {
1156 : level: Option<i8>,
1157 : },
1158 : }
1159 :
1160 : impl FromStr for ImageCompressionAlgorithm {
1161 : type Err = anyhow::Error;
1162 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
1163 8 : let mut components = s.split(['(', ')']);
1164 8 : let first = components
1165 8 : .next()
1166 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
1167 8 : match first {
1168 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
1169 6 : "zstd" => {
1170 6 : let level = if let Some(v) = components.next() {
1171 4 : let v: i8 = v.parse()?;
1172 4 : Some(v)
1173 : } else {
1174 2 : None
1175 : };
1176 :
1177 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
1178 : }
1179 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
1180 : }
1181 8 : }
1182 : }
1183 :
1184 : impl Display for ImageCompressionAlgorithm {
1185 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1186 12 : match self {
1187 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
1188 9 : ImageCompressionAlgorithm::Zstd { level } => {
1189 9 : if let Some(level) = level {
1190 6 : write!(f, "zstd({})", level)
1191 : } else {
1192 3 : write!(f, "zstd")
1193 : }
1194 : }
1195 : }
1196 12 : }
1197 : }
1198 :
1199 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
1200 : pub struct CompactionAlgorithmSettings {
1201 : pub kind: CompactionAlgorithm,
1202 : }
1203 :
1204 0 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
1205 : #[serde(tag = "mode", rename_all = "kebab-case")]
1206 : pub enum L0FlushConfig {
1207 : #[serde(rename_all = "snake_case")]
1208 : Direct { max_concurrency: NonZeroUsize },
1209 : }
1210 :
1211 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1212 : pub struct EvictionPolicyLayerAccessThreshold {
1213 : #[serde(with = "humantime_serde")]
1214 : pub period: Duration,
1215 : #[serde(with = "humantime_serde")]
1216 : pub threshold: Duration,
1217 : }
1218 :
1219 6 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
1220 : pub struct ThrottleConfig {
1221 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
1222 : #[serde(rename = "task_kinds")]
1223 : pub enabled: ThrottleConfigTaskKinds,
1224 : pub initial: u32,
1225 : #[serde(with = "humantime_serde")]
1226 : pub refill_interval: Duration,
1227 : pub refill_amount: NonZeroU32,
1228 : pub max: u32,
1229 : }
1230 :
1231 : /// Before <https://github.com/neondatabase/neon/pull/9962>
1232 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
1233 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
1234 : /// were subject to the throttle.
1235 : ///
1236 : /// After that PR, the throttle is applied at pagestream request level
1237 : /// and the `task_kinds` field does not apply since the only task kind
1238 : /// that us subject to the throttle is that of the page service.
1239 : ///
1240 : /// However, we don't want to make a breaking config change right now
1241 : /// because it means we have to migrate all the tenant configs.
1242 : /// This will be done in a future PR.
1243 : ///
1244 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
1245 : /// field to determine if the throttle is enabled or not.
1246 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1247 : #[serde(transparent)]
1248 : pub struct ThrottleConfigTaskKinds(Vec<String>);
1249 :
1250 : impl ThrottleConfigTaskKinds {
1251 133 : pub fn disabled() -> Self {
1252 133 : Self(vec![])
1253 133 : }
1254 120 : pub fn is_enabled(&self) -> bool {
1255 120 : !self.0.is_empty()
1256 120 : }
1257 : }
1258 :
1259 : impl ThrottleConfig {
1260 133 : pub fn disabled() -> Self {
1261 133 : Self {
1262 133 : enabled: ThrottleConfigTaskKinds::disabled(),
1263 133 : // other values don't matter with emtpy `task_kinds`.
1264 133 : initial: 0,
1265 133 : refill_interval: Duration::from_millis(1),
1266 133 : refill_amount: NonZeroU32::new(1).unwrap(),
1267 133 : max: 1,
1268 133 : }
1269 133 : }
1270 : /// The requests per second allowed by the given config.
1271 0 : pub fn steady_rps(&self) -> f64 {
1272 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
1273 0 : }
1274 : }
1275 :
1276 : #[cfg(test)]
1277 : mod throttle_config_tests {
1278 : use super::*;
1279 :
1280 : #[test]
1281 1 : fn test_disabled_is_disabled() {
1282 1 : let config = ThrottleConfig::disabled();
1283 1 : assert!(!config.enabled.is_enabled());
1284 1 : }
1285 : #[test]
1286 1 : fn test_enabled_backwards_compat() {
1287 1 : let input = serde_json::json!({
1288 1 : "task_kinds": ["PageRequestHandler"],
1289 1 : "initial": 40000,
1290 1 : "refill_interval": "50ms",
1291 1 : "refill_amount": 1000,
1292 1 : "max": 40000,
1293 1 : "fair": true
1294 1 : });
1295 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
1296 1 : assert!(config.enabled.is_enabled());
1297 1 : }
1298 : }
1299 :
1300 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
1301 : /// lists out all possible states (and the virtual "Detached" state)
1302 : /// in a flat form rather than using rust-style enums.
1303 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
1304 : pub enum LocationConfigMode {
1305 : AttachedSingle,
1306 : AttachedMulti,
1307 : AttachedStale,
1308 : Secondary,
1309 : Detached,
1310 : }
1311 :
1312 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1313 : pub struct LocationConfigSecondary {
1314 : pub warm: bool,
1315 : }
1316 :
1317 : /// An alternative representation of `pageserver::tenant::LocationConf`,
1318 : /// for use in external-facing APIs.
1319 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1320 : pub struct LocationConfig {
1321 : pub mode: LocationConfigMode,
1322 : /// If attaching, in what generation?
1323 : #[serde(default)]
1324 : pub generation: Option<u32>,
1325 :
1326 : // If requesting mode `Secondary`, configuration for that.
1327 : #[serde(default)]
1328 : pub secondary_conf: Option<LocationConfigSecondary>,
1329 :
1330 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
1331 : // must be set accurately.
1332 : #[serde(default)]
1333 : pub shard_number: u8,
1334 : #[serde(default)]
1335 : pub shard_count: u8,
1336 : #[serde(default)]
1337 : pub shard_stripe_size: u32,
1338 :
1339 : // This configuration only affects attached mode, but should be provided irrespective
1340 : // of the mode, as a secondary location might transition on startup if the response
1341 : // to the `/re-attach` control plane API requests it.
1342 : pub tenant_conf: TenantConfig,
1343 : }
1344 :
1345 0 : #[derive(Serialize, Deserialize)]
1346 : pub struct LocationConfigListResponse {
1347 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
1348 : }
1349 :
1350 : #[derive(Serialize)]
1351 : pub struct StatusResponse {
1352 : pub id: NodeId,
1353 : }
1354 :
1355 0 : #[derive(Serialize, Deserialize, Debug)]
1356 : #[serde(deny_unknown_fields)]
1357 : pub struct TenantLocationConfigRequest {
1358 : #[serde(flatten)]
1359 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
1360 : }
1361 :
1362 0 : #[derive(Serialize, Deserialize, Debug)]
1363 : #[serde(deny_unknown_fields)]
1364 : pub struct TenantTimeTravelRequest {
1365 : pub shard_counts: Vec<ShardCount>,
1366 : }
1367 :
1368 0 : #[derive(Serialize, Deserialize, Debug)]
1369 : #[serde(deny_unknown_fields)]
1370 : pub struct TenantShardLocation {
1371 : pub shard_id: TenantShardId,
1372 : pub node_id: NodeId,
1373 : }
1374 :
1375 0 : #[derive(Serialize, Deserialize, Debug)]
1376 : #[serde(deny_unknown_fields)]
1377 : pub struct TenantLocationConfigResponse {
1378 : pub shards: Vec<TenantShardLocation>,
1379 : // If the shards' ShardCount count is >1, stripe_size will be set.
1380 : pub stripe_size: Option<ShardStripeSize>,
1381 : }
1382 :
1383 2 : #[derive(Serialize, Deserialize, Debug)]
1384 : #[serde(deny_unknown_fields)]
1385 : pub struct TenantConfigRequest {
1386 : pub tenant_id: TenantId,
1387 : #[serde(flatten)]
1388 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
1389 : }
1390 :
1391 : impl std::ops::Deref for TenantConfigRequest {
1392 : type Target = TenantConfig;
1393 :
1394 0 : fn deref(&self) -> &Self::Target {
1395 0 : &self.config
1396 0 : }
1397 : }
1398 :
1399 : impl TenantConfigRequest {
1400 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
1401 0 : let config = TenantConfig::default();
1402 0 : TenantConfigRequest { tenant_id, config }
1403 0 : }
1404 : }
1405 :
1406 3 : #[derive(Serialize, Deserialize, Debug)]
1407 : #[serde(deny_unknown_fields)]
1408 : pub struct TenantConfigPatchRequest {
1409 : pub tenant_id: TenantId,
1410 : #[serde(flatten)]
1411 : pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
1412 : }
1413 :
1414 0 : #[derive(Serialize, Deserialize, Debug)]
1415 : pub struct TenantWaitLsnRequest {
1416 : #[serde(flatten)]
1417 : pub timelines: HashMap<TimelineId, Lsn>,
1418 : pub timeout: Duration,
1419 : }
1420 :
1421 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
1422 0 : #[derive(Serialize, Deserialize, Clone)]
1423 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
1424 : pub enum TenantAttachmentStatus {
1425 : Maybe,
1426 : Attached,
1427 : Failed { reason: String },
1428 : }
1429 :
1430 0 : #[derive(Serialize, Deserialize, Clone)]
1431 : pub struct TenantInfo {
1432 : pub id: TenantShardId,
1433 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
1434 : pub state: TenantState,
1435 : /// Sum of the size of all layer files.
1436 : /// If a layer is present in both local FS and S3, it counts only once.
1437 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
1438 : pub attachment_status: TenantAttachmentStatus,
1439 : pub generation: u32,
1440 :
1441 : /// Opaque explanation if gc is being blocked.
1442 : ///
1443 : /// Only looked up for the individual tenant detail, not the listing.
1444 : #[serde(skip_serializing_if = "Option::is_none")]
1445 : pub gc_blocking: Option<String>,
1446 : }
1447 :
1448 0 : #[derive(Serialize, Deserialize, Clone)]
1449 : pub struct TenantDetails {
1450 : #[serde(flatten)]
1451 : pub tenant_info: TenantInfo,
1452 :
1453 : pub walredo: Option<WalRedoManagerStatus>,
1454 :
1455 : pub timelines: Vec<TimelineId>,
1456 : }
1457 :
1458 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
1459 : pub enum TimelineArchivalState {
1460 : Archived,
1461 : Unarchived,
1462 : }
1463 :
1464 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1465 : pub enum TimelineVisibilityState {
1466 : Visible,
1467 : Invisible,
1468 : }
1469 :
1470 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1471 : pub struct TimelineArchivalConfigRequest {
1472 : pub state: TimelineArchivalState,
1473 : }
1474 :
1475 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1476 : pub struct TimelinePatchIndexPartRequest {
1477 : pub rel_size_migration: Option<RelSizeMigration>,
1478 : pub gc_compaction_last_completed_lsn: Option<Lsn>,
1479 : pub applied_gc_cutoff_lsn: Option<Lsn>,
1480 : #[serde(default)]
1481 : pub force_index_update: bool,
1482 : }
1483 :
1484 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1485 : pub struct TimelinesInfoAndOffloaded {
1486 : pub timelines: Vec<TimelineInfo>,
1487 : pub offloaded: Vec<OffloadedTimelineInfo>,
1488 : }
1489 :
1490 : /// Analog of [`TimelineInfo`] for offloaded timelines.
1491 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1492 : pub struct OffloadedTimelineInfo {
1493 : pub tenant_id: TenantShardId,
1494 : pub timeline_id: TimelineId,
1495 : /// Whether the timeline has a parent it has been branched off from or not
1496 : pub ancestor_timeline_id: Option<TimelineId>,
1497 : /// Whether to retain the branch lsn at the ancestor or not
1498 : pub ancestor_retain_lsn: Option<Lsn>,
1499 : /// The time point when the timeline was archived
1500 : pub archived_at: chrono::DateTime<chrono::Utc>,
1501 : }
1502 :
1503 4 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1504 : #[serde(rename_all = "camelCase")]
1505 : pub enum RelSizeMigration {
1506 : /// The tenant is using the old rel_size format.
1507 : /// Note that this enum is persisted as `Option<RelSizeMigration>` in the index part, so
1508 : /// `None` is the same as `Some(RelSizeMigration::Legacy)`.
1509 : Legacy,
1510 : /// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
1511 : /// persisted in the index part. The read path will read both formats and merge them.
1512 : Migrating,
1513 : /// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted
1514 : /// in the index part, and the read path will not read the old format.
1515 : Migrated,
1516 : }
1517 :
1518 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
1519 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1520 : pub struct TimelineInfo {
1521 : pub tenant_id: TenantShardId,
1522 : pub timeline_id: TimelineId,
1523 :
1524 : pub ancestor_timeline_id: Option<TimelineId>,
1525 : pub ancestor_lsn: Option<Lsn>,
1526 : pub last_record_lsn: Lsn,
1527 : pub prev_record_lsn: Option<Lsn>,
1528 :
1529 : /// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
1530 : /// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
1531 : /// as it is easier to reason about.
1532 : #[serde(default)]
1533 : pub applied_gc_cutoff_lsn: Lsn,
1534 :
1535 : /// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
1536 : /// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
1537 : /// LSN at which it is legal to create a branch or ephemeral endpoint.
1538 : ///
1539 : /// Note that holders of valid LSN leases may be able to create branches and read pages earlier
1540 : /// than this LSN, but new leases may not be taken out earlier than this LSN.
1541 : #[serde(default)]
1542 : pub min_readable_lsn: Lsn,
1543 :
1544 : pub disk_consistent_lsn: Lsn,
1545 :
1546 : /// The LSN that we have succesfully uploaded to remote storage
1547 : pub remote_consistent_lsn: Lsn,
1548 :
1549 : /// The LSN that we are advertizing to safekeepers
1550 : pub remote_consistent_lsn_visible: Lsn,
1551 :
1552 : /// The LSN from the start of the root timeline (never changes)
1553 : pub initdb_lsn: Lsn,
1554 :
1555 : pub current_logical_size: u64,
1556 : pub current_logical_size_is_accurate: bool,
1557 :
1558 : pub directory_entries_counts: Vec<u64>,
1559 :
1560 : /// Sum of the size of all layer files.
1561 : /// If a layer is present in both local FS and S3, it counts only once.
1562 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
1563 : pub current_logical_size_non_incremental: Option<u64>,
1564 :
1565 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
1566 : /// beyond the branch's branch point, we only count up to the branch point.
1567 : pub pitr_history_size: u64,
1568 :
1569 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
1570 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
1571 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
1572 : /// otherwise be able to GC.
1573 : pub within_ancestor_pitr: bool,
1574 :
1575 : pub timeline_dir_layer_file_size_sum: Option<u64>,
1576 :
1577 : pub wal_source_connstr: Option<String>,
1578 : pub last_received_msg_lsn: Option<Lsn>,
1579 : /// the timestamp (in microseconds) of the last received message
1580 : pub last_received_msg_ts: Option<u128>,
1581 : pub pg_version: u32,
1582 :
1583 : pub state: TimelineState,
1584 :
1585 : pub walreceiver_status: String,
1586 :
1587 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
1588 : // Backward compatibility: you will get a JSON not containing the newly-added field.
1589 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
1590 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
1591 : // read.
1592 : /// Whether the timeline is archived.
1593 : pub is_archived: Option<bool>,
1594 :
1595 : /// The status of the rel_size migration.
1596 : pub rel_size_migration: Option<RelSizeMigration>,
1597 :
1598 : /// Whether the timeline is invisible in synthetic size calculations.
1599 : pub is_invisible: Option<bool>,
1600 : }
1601 :
1602 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1603 : pub struct LayerMapInfo {
1604 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
1605 : pub historic_layers: Vec<HistoricLayerInfo>,
1606 : }
1607 :
1608 : /// The residence status of a layer
1609 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1610 : pub enum LayerResidenceStatus {
1611 : /// Residence status for a layer file that exists locally.
1612 : /// It may also exist on the remote, we don't care here.
1613 : Resident,
1614 : /// Residence status for a layer file that only exists on the remote.
1615 : Evicted,
1616 : }
1617 :
1618 : #[serde_as]
1619 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1620 : pub struct LayerAccessStats {
1621 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1622 : pub access_time: SystemTime,
1623 :
1624 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1625 : pub residence_time: SystemTime,
1626 :
1627 : pub visible: bool,
1628 : }
1629 :
1630 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1631 : #[serde(tag = "kind")]
1632 : pub enum InMemoryLayerInfo {
1633 : Open { lsn_start: Lsn },
1634 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
1635 : }
1636 :
1637 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1638 : #[serde(tag = "kind")]
1639 : pub enum HistoricLayerInfo {
1640 : Delta {
1641 : layer_file_name: String,
1642 : layer_file_size: u64,
1643 :
1644 : lsn_start: Lsn,
1645 : lsn_end: Lsn,
1646 : remote: bool,
1647 : access_stats: LayerAccessStats,
1648 :
1649 : l0: bool,
1650 : },
1651 : Image {
1652 : layer_file_name: String,
1653 : layer_file_size: u64,
1654 :
1655 : lsn_start: Lsn,
1656 : remote: bool,
1657 : access_stats: LayerAccessStats,
1658 : },
1659 : }
1660 :
1661 : impl HistoricLayerInfo {
1662 0 : pub fn layer_file_name(&self) -> &str {
1663 0 : match self {
1664 : HistoricLayerInfo::Delta {
1665 0 : layer_file_name, ..
1666 0 : } => layer_file_name,
1667 : HistoricLayerInfo::Image {
1668 0 : layer_file_name, ..
1669 0 : } => layer_file_name,
1670 : }
1671 0 : }
1672 0 : pub fn is_remote(&self) -> bool {
1673 0 : match self {
1674 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
1675 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
1676 : }
1677 0 : }
1678 0 : pub fn set_remote(&mut self, value: bool) {
1679 0 : let field = match self {
1680 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
1681 0 : HistoricLayerInfo::Image { remote, .. } => remote,
1682 : };
1683 0 : *field = value;
1684 0 : }
1685 0 : pub fn layer_file_size(&self) -> u64 {
1686 0 : match self {
1687 : HistoricLayerInfo::Delta {
1688 0 : layer_file_size, ..
1689 0 : } => *layer_file_size,
1690 : HistoricLayerInfo::Image {
1691 0 : layer_file_size, ..
1692 0 : } => *layer_file_size,
1693 : }
1694 0 : }
1695 : }
1696 :
1697 0 : #[derive(Debug, Serialize, Deserialize)]
1698 : pub struct DownloadRemoteLayersTaskSpawnRequest {
1699 : pub max_concurrent_downloads: NonZeroUsize,
1700 : }
1701 :
1702 0 : #[derive(Debug, Serialize, Deserialize)]
1703 : pub struct IngestAuxFilesRequest {
1704 : pub aux_files: HashMap<String, String>,
1705 : }
1706 :
1707 0 : #[derive(Debug, Serialize, Deserialize)]
1708 : pub struct ListAuxFilesRequest {
1709 : pub lsn: Lsn,
1710 : }
1711 :
1712 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1713 : pub struct DownloadRemoteLayersTaskInfo {
1714 : pub task_id: String,
1715 : pub state: DownloadRemoteLayersTaskState,
1716 : pub total_layer_count: u64, // stable once `completed`
1717 : pub successful_download_count: u64, // stable once `completed`
1718 : pub failed_download_count: u64, // stable once `completed`
1719 : }
1720 :
1721 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1722 : pub enum DownloadRemoteLayersTaskState {
1723 : Running,
1724 : Completed,
1725 : ShutDown,
1726 : }
1727 :
1728 0 : #[derive(Debug, Serialize, Deserialize)]
1729 : pub struct TimelineGcRequest {
1730 : pub gc_horizon: Option<u64>,
1731 : }
1732 :
1733 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1734 : pub struct WalRedoManagerProcessStatus {
1735 : pub pid: u32,
1736 : }
1737 :
1738 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1739 : pub struct WalRedoManagerStatus {
1740 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
1741 : pub process: Option<WalRedoManagerProcessStatus>,
1742 : }
1743 :
1744 : /// The progress of a secondary tenant.
1745 : ///
1746 : /// It is mostly useful when doing a long running download: e.g. initiating
1747 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
1748 : /// what's happening.
1749 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
1750 : pub struct SecondaryProgress {
1751 : /// The remote storage LastModified time of the heatmap object we last downloaded.
1752 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
1753 :
1754 : /// The number of layers currently on-disk
1755 : pub layers_downloaded: usize,
1756 : /// The number of layers in the most recently seen heatmap
1757 : pub layers_total: usize,
1758 :
1759 : /// The number of layer bytes currently on-disk
1760 : pub bytes_downloaded: u64,
1761 : /// The number of layer bytes in the most recently seen heatmap
1762 : pub bytes_total: u64,
1763 : }
1764 :
1765 0 : #[derive(Serialize, Deserialize, Debug)]
1766 : pub struct TenantScanRemoteStorageShard {
1767 : pub tenant_shard_id: TenantShardId,
1768 : pub generation: Option<u32>,
1769 : pub stripe_size: Option<ShardStripeSize>,
1770 : }
1771 :
1772 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1773 : pub struct TenantScanRemoteStorageResponse {
1774 : pub shards: Vec<TenantScanRemoteStorageShard>,
1775 : }
1776 :
1777 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1778 : #[serde(rename_all = "snake_case")]
1779 : pub enum TenantSorting {
1780 : /// Total size of layers on local disk for all timelines in a shard.
1781 : ResidentSize,
1782 : /// The logical size of the largest timeline within a _tenant_ (not shard). Only tracked on
1783 : /// shard 0, contains the sum across all shards.
1784 : MaxLogicalSize,
1785 : /// The logical size of the largest timeline within a _tenant_ (not shard), divided by number of
1786 : /// shards. Only tracked on shard 0, and estimates the per-shard logical size.
1787 : MaxLogicalSizePerShard,
1788 : }
1789 :
1790 : impl Default for TenantSorting {
1791 0 : fn default() -> Self {
1792 0 : Self::ResidentSize
1793 0 : }
1794 : }
1795 :
1796 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1797 : pub struct TopTenantShardsRequest {
1798 : // How would you like to sort the tenants?
1799 : pub order_by: TenantSorting,
1800 :
1801 : // How many results?
1802 : pub limit: usize,
1803 :
1804 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1805 : // that the caller would ever split to)
1806 : pub where_shards_lt: Option<ShardCount>,
1807 :
1808 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1809 : // let us quickly exclude numerous tiny shards)
1810 : pub where_gt: Option<u64>,
1811 : }
1812 :
1813 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1814 : pub struct TopTenantShardItem {
1815 : pub id: TenantShardId,
1816 :
1817 : /// Total size of layers on local disk for all timelines in this shard.
1818 : pub resident_size: u64,
1819 :
1820 : /// Total size of layers in remote storage for all timelines in this shard.
1821 : pub physical_size: u64,
1822 :
1823 : /// The largest logical size of a timeline within this _tenant_ (not shard). This is only
1824 : /// tracked on shard 0, and contains the sum of the logical size across all shards.
1825 : pub max_logical_size: u64,
1826 :
1827 : /// The largest logical size of a timeline within this _tenant_ (not shard) divided by number of
1828 : /// shards. This is only tracked on shard 0, and is only an estimate as we divide it evenly by
1829 : /// shard count, rounded up.
1830 : pub max_logical_size_per_shard: u64,
1831 : }
1832 :
1833 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1834 : pub struct TopTenantShardsResponse {
1835 : pub shards: Vec<TopTenantShardItem>,
1836 : }
1837 :
1838 : pub mod virtual_file {
1839 :
1840 : #[derive(
1841 : Copy,
1842 : Clone,
1843 : PartialEq,
1844 : Eq,
1845 : Hash,
1846 0 : strum_macros::EnumString,
1847 : strum_macros::Display,
1848 0 : serde_with::DeserializeFromStr,
1849 : serde_with::SerializeDisplay,
1850 : Debug,
1851 : )]
1852 : #[strum(serialize_all = "kebab-case")]
1853 : pub enum IoEngineKind {
1854 : StdFs,
1855 : #[cfg(target_os = "linux")]
1856 : TokioEpollUring,
1857 : }
1858 :
1859 : /// Direct IO modes for a pageserver.
1860 : #[derive(
1861 : Copy,
1862 : Clone,
1863 : PartialEq,
1864 : Eq,
1865 : Hash,
1866 0 : strum_macros::EnumString,
1867 0 : strum_macros::EnumIter,
1868 : strum_macros::Display,
1869 0 : serde_with::DeserializeFromStr,
1870 : serde_with::SerializeDisplay,
1871 : Debug,
1872 : )]
1873 : #[strum(serialize_all = "kebab-case")]
1874 : #[repr(u8)]
1875 : pub enum IoMode {
1876 : /// Uses buffered IO.
1877 : Buffered,
1878 : /// Uses direct IO for reads only.
1879 : Direct,
1880 : /// Use direct IO for reads and writes.
1881 : DirectRw,
1882 : }
1883 :
1884 : impl IoMode {
1885 253 : pub fn preferred() -> Self {
1886 253 : IoMode::DirectRw
1887 253 : }
1888 : }
1889 :
1890 : impl TryFrom<u8> for IoMode {
1891 : type Error = u8;
1892 :
1893 2601 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1894 2601 : Ok(match value {
1895 2601 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1896 2601 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1897 2601 : v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
1898 0 : x => return Err(x),
1899 : })
1900 2601 : }
1901 : }
1902 : }
1903 :
1904 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1905 : pub struct ScanDisposableKeysResponse {
1906 : pub disposable_count: usize,
1907 : pub not_disposable_count: usize,
1908 : }
1909 :
1910 : // Wrapped in libpq CopyData
1911 : #[derive(PartialEq, Eq, Debug)]
1912 : pub enum PagestreamFeMessage {
1913 : Exists(PagestreamExistsRequest),
1914 : Nblocks(PagestreamNblocksRequest),
1915 : GetPage(PagestreamGetPageRequest),
1916 : DbSize(PagestreamDbSizeRequest),
1917 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1918 : #[cfg(feature = "testing")]
1919 : Test(PagestreamTestRequest),
1920 : }
1921 :
1922 : // Wrapped in libpq CopyData
1923 : #[derive(Debug, strum_macros::EnumProperty)]
1924 : pub enum PagestreamBeMessage {
1925 : Exists(PagestreamExistsResponse),
1926 : Nblocks(PagestreamNblocksResponse),
1927 : GetPage(PagestreamGetPageResponse),
1928 : Error(PagestreamErrorResponse),
1929 : DbSize(PagestreamDbSizeResponse),
1930 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1931 : #[cfg(feature = "testing")]
1932 : Test(PagestreamTestResponse),
1933 : }
1934 :
1935 : // Keep in sync with `pagestore_client.h`
1936 : #[repr(u8)]
1937 : enum PagestreamFeMessageTag {
1938 : Exists = 0,
1939 : Nblocks = 1,
1940 : GetPage = 2,
1941 : DbSize = 3,
1942 : GetSlruSegment = 4,
1943 : /* future tags above this line */
1944 : /// For testing purposes, not available in production.
1945 : #[cfg(feature = "testing")]
1946 : Test = 99,
1947 : }
1948 :
1949 : // Keep in sync with `pagestore_client.h`
1950 : #[repr(u8)]
1951 : enum PagestreamBeMessageTag {
1952 : Exists = 100,
1953 : Nblocks = 101,
1954 : GetPage = 102,
1955 : Error = 103,
1956 : DbSize = 104,
1957 : GetSlruSegment = 105,
1958 : /* future tags above this line */
1959 : /// For testing purposes, not available in production.
1960 : #[cfg(feature = "testing")]
1961 : Test = 199,
1962 : }
1963 :
1964 : impl TryFrom<u8> for PagestreamFeMessageTag {
1965 : type Error = u8;
1966 4 : fn try_from(value: u8) -> Result<Self, u8> {
1967 4 : match value {
1968 1 : 0 => Ok(PagestreamFeMessageTag::Exists),
1969 1 : 1 => Ok(PagestreamFeMessageTag::Nblocks),
1970 1 : 2 => Ok(PagestreamFeMessageTag::GetPage),
1971 1 : 3 => Ok(PagestreamFeMessageTag::DbSize),
1972 0 : 4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
1973 : #[cfg(feature = "testing")]
1974 0 : 99 => Ok(PagestreamFeMessageTag::Test),
1975 0 : _ => Err(value),
1976 : }
1977 4 : }
1978 : }
1979 :
1980 : impl TryFrom<u8> for PagestreamBeMessageTag {
1981 : type Error = u8;
1982 0 : fn try_from(value: u8) -> Result<Self, u8> {
1983 0 : match value {
1984 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1985 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1986 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1987 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1988 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1989 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1990 : #[cfg(feature = "testing")]
1991 0 : 199 => Ok(PagestreamBeMessageTag::Test),
1992 0 : _ => Err(value),
1993 : }
1994 0 : }
1995 : }
1996 :
1997 : // A GetPage request contains two LSN values:
1998 : //
1999 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
2000 : // "get the latest version present". It's used by the primary server, which knows that no one else
2001 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
2002 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
2003 : //
2004 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
2005 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
2006 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
2007 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
2008 : // request without waiting for 'request_lsn' to arrive.
2009 : //
2010 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
2011 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
2012 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
2013 : // standby to request a page at a particular non-latest LSN, and also include the
2014 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
2015 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
2016 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
2017 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
2018 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
2019 : // difference in the responses between V1 and V2.
2020 : //
2021 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
2022 : // as well as other fields from requests, which allows to verify that we receive response for our request.
2023 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
2024 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
2025 : //
2026 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2027 : pub enum PagestreamProtocolVersion {
2028 : V2,
2029 : V3,
2030 : }
2031 :
2032 : pub type RequestId = u64;
2033 :
2034 : #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
2035 : pub struct PagestreamRequest {
2036 : pub reqid: RequestId,
2037 : pub request_lsn: Lsn,
2038 : pub not_modified_since: Lsn,
2039 : }
2040 :
2041 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2042 : pub struct PagestreamExistsRequest {
2043 : pub hdr: PagestreamRequest,
2044 : pub rel: RelTag,
2045 : }
2046 :
2047 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2048 : pub struct PagestreamNblocksRequest {
2049 : pub hdr: PagestreamRequest,
2050 : pub rel: RelTag,
2051 : }
2052 :
2053 : #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
2054 : pub struct PagestreamGetPageRequest {
2055 : pub hdr: PagestreamRequest,
2056 : pub rel: RelTag,
2057 : pub blkno: u32,
2058 : }
2059 :
2060 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2061 : pub struct PagestreamDbSizeRequest {
2062 : pub hdr: PagestreamRequest,
2063 : pub dbnode: u32,
2064 : }
2065 :
2066 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2067 : pub struct PagestreamGetSlruSegmentRequest {
2068 : pub hdr: PagestreamRequest,
2069 : pub kind: u8,
2070 : pub segno: u32,
2071 : }
2072 :
2073 : #[derive(Debug)]
2074 : pub struct PagestreamExistsResponse {
2075 : pub req: PagestreamExistsRequest,
2076 : pub exists: bool,
2077 : }
2078 :
2079 : #[derive(Debug)]
2080 : pub struct PagestreamNblocksResponse {
2081 : pub req: PagestreamNblocksRequest,
2082 : pub n_blocks: u32,
2083 : }
2084 :
2085 : #[derive(Debug)]
2086 : pub struct PagestreamGetPageResponse {
2087 : pub req: PagestreamGetPageRequest,
2088 : pub page: Bytes,
2089 : }
2090 :
2091 : #[derive(Debug)]
2092 : pub struct PagestreamGetSlruSegmentResponse {
2093 : pub req: PagestreamGetSlruSegmentRequest,
2094 : pub segment: Bytes,
2095 : }
2096 :
2097 : #[derive(Debug)]
2098 : pub struct PagestreamErrorResponse {
2099 : pub req: PagestreamRequest,
2100 : pub message: String,
2101 : }
2102 :
2103 : #[derive(Debug)]
2104 : pub struct PagestreamDbSizeResponse {
2105 : pub req: PagestreamDbSizeRequest,
2106 : pub db_size: i64,
2107 : }
2108 :
2109 : #[cfg(feature = "testing")]
2110 : #[derive(Debug, PartialEq, Eq, Clone)]
2111 : pub struct PagestreamTestRequest {
2112 : pub hdr: PagestreamRequest,
2113 : pub batch_key: u64,
2114 : pub message: String,
2115 : }
2116 :
2117 : #[cfg(feature = "testing")]
2118 : #[derive(Debug)]
2119 : pub struct PagestreamTestResponse {
2120 : pub req: PagestreamTestRequest,
2121 : }
2122 :
2123 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
2124 : // that require pageserver-internal types. It is sufficient to get the total size.
2125 0 : #[derive(Serialize, Deserialize, Debug)]
2126 : pub struct TenantHistorySize {
2127 : pub id: TenantId,
2128 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
2129 : ///
2130 : /// Will be none if `?inputs_only=true` was given.
2131 : pub size: Option<u64>,
2132 : }
2133 :
2134 : impl PagestreamFeMessage {
2135 : /// Serialize a compute -> pageserver message. This is currently only used in testing
2136 : /// tools. Always uses protocol version 3.
2137 4 : pub fn serialize(&self) -> Bytes {
2138 4 : let mut bytes = BytesMut::new();
2139 4 :
2140 4 : match self {
2141 1 : Self::Exists(req) => {
2142 1 : bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
2143 1 : bytes.put_u64(req.hdr.reqid);
2144 1 : bytes.put_u64(req.hdr.request_lsn.0);
2145 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2146 1 : bytes.put_u32(req.rel.spcnode);
2147 1 : bytes.put_u32(req.rel.dbnode);
2148 1 : bytes.put_u32(req.rel.relnode);
2149 1 : bytes.put_u8(req.rel.forknum);
2150 1 : }
2151 :
2152 1 : Self::Nblocks(req) => {
2153 1 : bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
2154 1 : bytes.put_u64(req.hdr.reqid);
2155 1 : bytes.put_u64(req.hdr.request_lsn.0);
2156 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2157 1 : bytes.put_u32(req.rel.spcnode);
2158 1 : bytes.put_u32(req.rel.dbnode);
2159 1 : bytes.put_u32(req.rel.relnode);
2160 1 : bytes.put_u8(req.rel.forknum);
2161 1 : }
2162 :
2163 1 : Self::GetPage(req) => {
2164 1 : bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
2165 1 : bytes.put_u64(req.hdr.reqid);
2166 1 : bytes.put_u64(req.hdr.request_lsn.0);
2167 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2168 1 : bytes.put_u32(req.rel.spcnode);
2169 1 : bytes.put_u32(req.rel.dbnode);
2170 1 : bytes.put_u32(req.rel.relnode);
2171 1 : bytes.put_u8(req.rel.forknum);
2172 1 : bytes.put_u32(req.blkno);
2173 1 : }
2174 :
2175 1 : Self::DbSize(req) => {
2176 1 : bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
2177 1 : bytes.put_u64(req.hdr.reqid);
2178 1 : bytes.put_u64(req.hdr.request_lsn.0);
2179 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2180 1 : bytes.put_u32(req.dbnode);
2181 1 : }
2182 :
2183 0 : Self::GetSlruSegment(req) => {
2184 0 : bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
2185 0 : bytes.put_u64(req.hdr.reqid);
2186 0 : bytes.put_u64(req.hdr.request_lsn.0);
2187 0 : bytes.put_u64(req.hdr.not_modified_since.0);
2188 0 : bytes.put_u8(req.kind);
2189 0 : bytes.put_u32(req.segno);
2190 0 : }
2191 : #[cfg(feature = "testing")]
2192 0 : Self::Test(req) => {
2193 0 : bytes.put_u8(PagestreamFeMessageTag::Test as u8);
2194 0 : bytes.put_u64(req.hdr.reqid);
2195 0 : bytes.put_u64(req.hdr.request_lsn.0);
2196 0 : bytes.put_u64(req.hdr.not_modified_since.0);
2197 0 : bytes.put_u64(req.batch_key);
2198 0 : let message = req.message.as_bytes();
2199 0 : bytes.put_u64(message.len() as u64);
2200 0 : bytes.put_slice(message);
2201 0 : }
2202 : }
2203 :
2204 4 : bytes.into()
2205 4 : }
2206 :
2207 4 : pub fn parse<R: std::io::Read>(
2208 4 : body: &mut R,
2209 4 : protocol_version: PagestreamProtocolVersion,
2210 4 : ) -> anyhow::Result<PagestreamFeMessage> {
2211 : // these correspond to the NeonMessageTag enum in pagestore_client.h
2212 : //
2213 : // TODO: consider using protobuf or serde bincode for less error prone
2214 : // serialization.
2215 4 : let msg_tag = body.read_u8()?;
2216 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
2217 : PagestreamProtocolVersion::V2 => (
2218 : 0,
2219 0 : Lsn::from(body.read_u64::<BigEndian>()?),
2220 0 : Lsn::from(body.read_u64::<BigEndian>()?),
2221 : ),
2222 : PagestreamProtocolVersion::V3 => (
2223 4 : body.read_u64::<BigEndian>()?,
2224 4 : Lsn::from(body.read_u64::<BigEndian>()?),
2225 4 : Lsn::from(body.read_u64::<BigEndian>()?),
2226 : ),
2227 : };
2228 :
2229 4 : match PagestreamFeMessageTag::try_from(msg_tag)
2230 4 : .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
2231 : {
2232 : PagestreamFeMessageTag::Exists => {
2233 : Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
2234 1 : hdr: PagestreamRequest {
2235 1 : reqid,
2236 1 : request_lsn,
2237 1 : not_modified_since,
2238 1 : },
2239 1 : rel: RelTag {
2240 1 : spcnode: body.read_u32::<BigEndian>()?,
2241 1 : dbnode: body.read_u32::<BigEndian>()?,
2242 1 : relnode: body.read_u32::<BigEndian>()?,
2243 1 : forknum: body.read_u8()?,
2244 : },
2245 : }))
2246 : }
2247 : PagestreamFeMessageTag::Nblocks => {
2248 : Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2249 1 : hdr: PagestreamRequest {
2250 1 : reqid,
2251 1 : request_lsn,
2252 1 : not_modified_since,
2253 1 : },
2254 1 : rel: RelTag {
2255 1 : spcnode: body.read_u32::<BigEndian>()?,
2256 1 : dbnode: body.read_u32::<BigEndian>()?,
2257 1 : relnode: body.read_u32::<BigEndian>()?,
2258 1 : forknum: body.read_u8()?,
2259 : },
2260 : }))
2261 : }
2262 : PagestreamFeMessageTag::GetPage => {
2263 : Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2264 1 : hdr: PagestreamRequest {
2265 1 : reqid,
2266 1 : request_lsn,
2267 1 : not_modified_since,
2268 1 : },
2269 1 : rel: RelTag {
2270 1 : spcnode: body.read_u32::<BigEndian>()?,
2271 1 : dbnode: body.read_u32::<BigEndian>()?,
2272 1 : relnode: body.read_u32::<BigEndian>()?,
2273 1 : forknum: body.read_u8()?,
2274 : },
2275 1 : blkno: body.read_u32::<BigEndian>()?,
2276 : }))
2277 : }
2278 : PagestreamFeMessageTag::DbSize => {
2279 : Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2280 1 : hdr: PagestreamRequest {
2281 1 : reqid,
2282 1 : request_lsn,
2283 1 : not_modified_since,
2284 1 : },
2285 1 : dbnode: body.read_u32::<BigEndian>()?,
2286 : }))
2287 : }
2288 : PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
2289 : PagestreamGetSlruSegmentRequest {
2290 0 : hdr: PagestreamRequest {
2291 0 : reqid,
2292 0 : request_lsn,
2293 0 : not_modified_since,
2294 0 : },
2295 0 : kind: body.read_u8()?,
2296 0 : segno: body.read_u32::<BigEndian>()?,
2297 : },
2298 : )),
2299 : #[cfg(feature = "testing")]
2300 : PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
2301 0 : hdr: PagestreamRequest {
2302 0 : reqid,
2303 0 : request_lsn,
2304 0 : not_modified_since,
2305 0 : },
2306 0 : batch_key: body.read_u64::<BigEndian>()?,
2307 : message: {
2308 0 : let len = body.read_u64::<BigEndian>()?;
2309 0 : let mut buf = vec![0; len as usize];
2310 0 : body.read_exact(&mut buf)?;
2311 0 : String::from_utf8(buf)?
2312 : },
2313 : })),
2314 : }
2315 4 : }
2316 : }
2317 :
2318 : impl PagestreamBeMessage {
2319 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
2320 0 : let mut bytes = BytesMut::new();
2321 :
2322 : use PagestreamBeMessageTag as Tag;
2323 0 : match protocol_version {
2324 : PagestreamProtocolVersion::V2 => {
2325 0 : match self {
2326 0 : Self::Exists(resp) => {
2327 0 : bytes.put_u8(Tag::Exists as u8);
2328 0 : bytes.put_u8(resp.exists as u8);
2329 0 : }
2330 :
2331 0 : Self::Nblocks(resp) => {
2332 0 : bytes.put_u8(Tag::Nblocks as u8);
2333 0 : bytes.put_u32(resp.n_blocks);
2334 0 : }
2335 :
2336 0 : Self::GetPage(resp) => {
2337 0 : bytes.put_u8(Tag::GetPage as u8);
2338 0 : bytes.put(&resp.page[..])
2339 : }
2340 :
2341 0 : Self::Error(resp) => {
2342 0 : bytes.put_u8(Tag::Error as u8);
2343 0 : bytes.put(resp.message.as_bytes());
2344 0 : bytes.put_u8(0); // null terminator
2345 0 : }
2346 0 : Self::DbSize(resp) => {
2347 0 : bytes.put_u8(Tag::DbSize as u8);
2348 0 : bytes.put_i64(resp.db_size);
2349 0 : }
2350 :
2351 0 : Self::GetSlruSegment(resp) => {
2352 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2353 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2354 0 : bytes.put(&resp.segment[..]);
2355 0 : }
2356 :
2357 : #[cfg(feature = "testing")]
2358 0 : Self::Test(resp) => {
2359 0 : bytes.put_u8(Tag::Test as u8);
2360 0 : bytes.put_u64(resp.req.batch_key);
2361 0 : let message = resp.req.message.as_bytes();
2362 0 : bytes.put_u64(message.len() as u64);
2363 0 : bytes.put_slice(message);
2364 0 : }
2365 : }
2366 : }
2367 : PagestreamProtocolVersion::V3 => {
2368 0 : match self {
2369 0 : Self::Exists(resp) => {
2370 0 : bytes.put_u8(Tag::Exists as u8);
2371 0 : bytes.put_u64(resp.req.hdr.reqid);
2372 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2373 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2374 0 : bytes.put_u32(resp.req.rel.spcnode);
2375 0 : bytes.put_u32(resp.req.rel.dbnode);
2376 0 : bytes.put_u32(resp.req.rel.relnode);
2377 0 : bytes.put_u8(resp.req.rel.forknum);
2378 0 : bytes.put_u8(resp.exists as u8);
2379 0 : }
2380 :
2381 0 : Self::Nblocks(resp) => {
2382 0 : bytes.put_u8(Tag::Nblocks as u8);
2383 0 : bytes.put_u64(resp.req.hdr.reqid);
2384 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2385 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2386 0 : bytes.put_u32(resp.req.rel.spcnode);
2387 0 : bytes.put_u32(resp.req.rel.dbnode);
2388 0 : bytes.put_u32(resp.req.rel.relnode);
2389 0 : bytes.put_u8(resp.req.rel.forknum);
2390 0 : bytes.put_u32(resp.n_blocks);
2391 0 : }
2392 :
2393 0 : Self::GetPage(resp) => {
2394 0 : bytes.put_u8(Tag::GetPage as u8);
2395 0 : bytes.put_u64(resp.req.hdr.reqid);
2396 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2397 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2398 0 : bytes.put_u32(resp.req.rel.spcnode);
2399 0 : bytes.put_u32(resp.req.rel.dbnode);
2400 0 : bytes.put_u32(resp.req.rel.relnode);
2401 0 : bytes.put_u8(resp.req.rel.forknum);
2402 0 : bytes.put_u32(resp.req.blkno);
2403 0 : bytes.put(&resp.page[..])
2404 : }
2405 :
2406 0 : Self::Error(resp) => {
2407 0 : bytes.put_u8(Tag::Error as u8);
2408 0 : bytes.put_u64(resp.req.reqid);
2409 0 : bytes.put_u64(resp.req.request_lsn.0);
2410 0 : bytes.put_u64(resp.req.not_modified_since.0);
2411 0 : bytes.put(resp.message.as_bytes());
2412 0 : bytes.put_u8(0); // null terminator
2413 0 : }
2414 0 : Self::DbSize(resp) => {
2415 0 : bytes.put_u8(Tag::DbSize as u8);
2416 0 : bytes.put_u64(resp.req.hdr.reqid);
2417 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2418 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2419 0 : bytes.put_u32(resp.req.dbnode);
2420 0 : bytes.put_i64(resp.db_size);
2421 0 : }
2422 :
2423 0 : Self::GetSlruSegment(resp) => {
2424 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2425 0 : bytes.put_u64(resp.req.hdr.reqid);
2426 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2427 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2428 0 : bytes.put_u8(resp.req.kind);
2429 0 : bytes.put_u32(resp.req.segno);
2430 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2431 0 : bytes.put(&resp.segment[..]);
2432 0 : }
2433 :
2434 : #[cfg(feature = "testing")]
2435 0 : Self::Test(resp) => {
2436 0 : bytes.put_u8(Tag::Test as u8);
2437 0 : bytes.put_u64(resp.req.hdr.reqid);
2438 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2439 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2440 0 : bytes.put_u64(resp.req.batch_key);
2441 0 : let message = resp.req.message.as_bytes();
2442 0 : bytes.put_u64(message.len() as u64);
2443 0 : bytes.put_slice(message);
2444 0 : }
2445 : }
2446 : }
2447 : }
2448 0 : bytes.into()
2449 0 : }
2450 :
2451 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
2452 0 : let mut buf = buf.reader();
2453 0 : let msg_tag = buf.read_u8()?;
2454 :
2455 : use PagestreamBeMessageTag as Tag;
2456 0 : let ok =
2457 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
2458 : Tag::Exists => {
2459 0 : let reqid = buf.read_u64::<BigEndian>()?;
2460 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2461 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2462 0 : let rel = RelTag {
2463 0 : spcnode: buf.read_u32::<BigEndian>()?,
2464 0 : dbnode: buf.read_u32::<BigEndian>()?,
2465 0 : relnode: buf.read_u32::<BigEndian>()?,
2466 0 : forknum: buf.read_u8()?,
2467 : };
2468 0 : let exists = buf.read_u8()? != 0;
2469 0 : Self::Exists(PagestreamExistsResponse {
2470 0 : req: PagestreamExistsRequest {
2471 0 : hdr: PagestreamRequest {
2472 0 : reqid,
2473 0 : request_lsn,
2474 0 : not_modified_since,
2475 0 : },
2476 0 : rel,
2477 0 : },
2478 0 : exists,
2479 0 : })
2480 : }
2481 : Tag::Nblocks => {
2482 0 : let reqid = buf.read_u64::<BigEndian>()?;
2483 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2484 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2485 0 : let rel = RelTag {
2486 0 : spcnode: buf.read_u32::<BigEndian>()?,
2487 0 : dbnode: buf.read_u32::<BigEndian>()?,
2488 0 : relnode: buf.read_u32::<BigEndian>()?,
2489 0 : forknum: buf.read_u8()?,
2490 : };
2491 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2492 0 : Self::Nblocks(PagestreamNblocksResponse {
2493 0 : req: PagestreamNblocksRequest {
2494 0 : hdr: PagestreamRequest {
2495 0 : reqid,
2496 0 : request_lsn,
2497 0 : not_modified_since,
2498 0 : },
2499 0 : rel,
2500 0 : },
2501 0 : n_blocks,
2502 0 : })
2503 : }
2504 : Tag::GetPage => {
2505 0 : let reqid = buf.read_u64::<BigEndian>()?;
2506 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2507 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2508 0 : let rel = RelTag {
2509 0 : spcnode: buf.read_u32::<BigEndian>()?,
2510 0 : dbnode: buf.read_u32::<BigEndian>()?,
2511 0 : relnode: buf.read_u32::<BigEndian>()?,
2512 0 : forknum: buf.read_u8()?,
2513 : };
2514 0 : let blkno = buf.read_u32::<BigEndian>()?;
2515 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
2516 0 : buf.read_exact(&mut page)?;
2517 0 : Self::GetPage(PagestreamGetPageResponse {
2518 0 : req: PagestreamGetPageRequest {
2519 0 : hdr: PagestreamRequest {
2520 0 : reqid,
2521 0 : request_lsn,
2522 0 : not_modified_since,
2523 0 : },
2524 0 : rel,
2525 0 : blkno,
2526 0 : },
2527 0 : page: page.into(),
2528 0 : })
2529 : }
2530 : Tag::Error => {
2531 0 : let reqid = buf.read_u64::<BigEndian>()?;
2532 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2533 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2534 0 : let mut msg = Vec::new();
2535 0 : buf.read_until(0, &mut msg)?;
2536 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
2537 0 : let rust_str = cstring.to_str()?;
2538 0 : Self::Error(PagestreamErrorResponse {
2539 0 : req: PagestreamRequest {
2540 0 : reqid,
2541 0 : request_lsn,
2542 0 : not_modified_since,
2543 0 : },
2544 0 : message: rust_str.to_owned(),
2545 0 : })
2546 : }
2547 : Tag::DbSize => {
2548 0 : let reqid = buf.read_u64::<BigEndian>()?;
2549 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2550 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2551 0 : let dbnode = buf.read_u32::<BigEndian>()?;
2552 0 : let db_size = buf.read_i64::<BigEndian>()?;
2553 0 : Self::DbSize(PagestreamDbSizeResponse {
2554 0 : req: PagestreamDbSizeRequest {
2555 0 : hdr: PagestreamRequest {
2556 0 : reqid,
2557 0 : request_lsn,
2558 0 : not_modified_since,
2559 0 : },
2560 0 : dbnode,
2561 0 : },
2562 0 : db_size,
2563 0 : })
2564 : }
2565 : Tag::GetSlruSegment => {
2566 0 : let reqid = buf.read_u64::<BigEndian>()?;
2567 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2568 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2569 0 : let kind = buf.read_u8()?;
2570 0 : let segno = buf.read_u32::<BigEndian>()?;
2571 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2572 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
2573 0 : buf.read_exact(&mut segment)?;
2574 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
2575 0 : req: PagestreamGetSlruSegmentRequest {
2576 0 : hdr: PagestreamRequest {
2577 0 : reqid,
2578 0 : request_lsn,
2579 0 : not_modified_since,
2580 0 : },
2581 0 : kind,
2582 0 : segno,
2583 0 : },
2584 0 : segment: segment.into(),
2585 0 : })
2586 : }
2587 : #[cfg(feature = "testing")]
2588 : Tag::Test => {
2589 0 : let reqid = buf.read_u64::<BigEndian>()?;
2590 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2591 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2592 0 : let batch_key = buf.read_u64::<BigEndian>()?;
2593 0 : let len = buf.read_u64::<BigEndian>()?;
2594 0 : let mut msg = vec![0; len as usize];
2595 0 : buf.read_exact(&mut msg)?;
2596 0 : let message = String::from_utf8(msg)?;
2597 0 : Self::Test(PagestreamTestResponse {
2598 0 : req: PagestreamTestRequest {
2599 0 : hdr: PagestreamRequest {
2600 0 : reqid,
2601 0 : request_lsn,
2602 0 : not_modified_since,
2603 0 : },
2604 0 : batch_key,
2605 0 : message,
2606 0 : },
2607 0 : })
2608 : }
2609 : };
2610 0 : let remaining = buf.into_inner();
2611 0 : if !remaining.is_empty() {
2612 0 : anyhow::bail!(
2613 0 : "remaining bytes in msg with tag={msg_tag}: {}",
2614 0 : remaining.len()
2615 0 : );
2616 0 : }
2617 0 : Ok(ok)
2618 0 : }
2619 :
2620 0 : pub fn kind(&self) -> &'static str {
2621 0 : match self {
2622 0 : Self::Exists(_) => "Exists",
2623 0 : Self::Nblocks(_) => "Nblocks",
2624 0 : Self::GetPage(_) => "GetPage",
2625 0 : Self::Error(_) => "Error",
2626 0 : Self::DbSize(_) => "DbSize",
2627 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
2628 : #[cfg(feature = "testing")]
2629 0 : Self::Test(_) => "Test",
2630 : }
2631 0 : }
2632 : }
2633 :
2634 0 : #[derive(Debug, Serialize, Deserialize)]
2635 : pub struct PageTraceEvent {
2636 : pub key: CompactKey,
2637 : pub effective_lsn: Lsn,
2638 : pub time: SystemTime,
2639 : }
2640 :
2641 : impl Default for PageTraceEvent {
2642 0 : fn default() -> Self {
2643 0 : Self {
2644 0 : key: Default::default(),
2645 0 : effective_lsn: Default::default(),
2646 0 : time: std::time::UNIX_EPOCH,
2647 0 : }
2648 0 : }
2649 : }
2650 :
2651 : #[cfg(test)]
2652 : mod tests {
2653 : use std::str::FromStr;
2654 :
2655 : use serde_json::json;
2656 :
2657 : use super::*;
2658 :
2659 : #[test]
2660 1 : fn test_pagestream() {
2661 1 : // Test serialization/deserialization of PagestreamFeMessage
2662 1 : let messages = vec![
2663 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
2664 1 : hdr: PagestreamRequest {
2665 1 : reqid: 0,
2666 1 : request_lsn: Lsn(4),
2667 1 : not_modified_since: Lsn(3),
2668 1 : },
2669 1 : rel: RelTag {
2670 1 : forknum: 1,
2671 1 : spcnode: 2,
2672 1 : dbnode: 3,
2673 1 : relnode: 4,
2674 1 : },
2675 1 : }),
2676 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2677 1 : hdr: PagestreamRequest {
2678 1 : reqid: 0,
2679 1 : request_lsn: Lsn(4),
2680 1 : not_modified_since: Lsn(4),
2681 1 : },
2682 1 : rel: RelTag {
2683 1 : forknum: 1,
2684 1 : spcnode: 2,
2685 1 : dbnode: 3,
2686 1 : relnode: 4,
2687 1 : },
2688 1 : }),
2689 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2690 1 : hdr: PagestreamRequest {
2691 1 : reqid: 0,
2692 1 : request_lsn: Lsn(4),
2693 1 : not_modified_since: Lsn(3),
2694 1 : },
2695 1 : rel: RelTag {
2696 1 : forknum: 1,
2697 1 : spcnode: 2,
2698 1 : dbnode: 3,
2699 1 : relnode: 4,
2700 1 : },
2701 1 : blkno: 7,
2702 1 : }),
2703 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2704 1 : hdr: PagestreamRequest {
2705 1 : reqid: 0,
2706 1 : request_lsn: Lsn(4),
2707 1 : not_modified_since: Lsn(3),
2708 1 : },
2709 1 : dbnode: 7,
2710 1 : }),
2711 1 : ];
2712 5 : for msg in messages {
2713 4 : let bytes = msg.serialize();
2714 4 : let reconstructed =
2715 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
2716 4 : .unwrap();
2717 4 : assert!(msg == reconstructed);
2718 : }
2719 1 : }
2720 :
2721 : #[test]
2722 1 : fn test_tenantinfo_serde() {
2723 1 : // Test serialization/deserialization of TenantInfo
2724 1 : let original_active = TenantInfo {
2725 1 : id: TenantShardId::unsharded(TenantId::generate()),
2726 1 : state: TenantState::Active,
2727 1 : current_physical_size: Some(42),
2728 1 : attachment_status: TenantAttachmentStatus::Attached,
2729 1 : generation: 1,
2730 1 : gc_blocking: None,
2731 1 : };
2732 1 : let expected_active = json!({
2733 1 : "id": original_active.id.to_string(),
2734 1 : "state": {
2735 1 : "slug": "Active",
2736 1 : },
2737 1 : "current_physical_size": 42,
2738 1 : "attachment_status": {
2739 1 : "slug":"attached",
2740 1 : },
2741 1 : "generation" : 1
2742 1 : });
2743 1 :
2744 1 : let original_broken = TenantInfo {
2745 1 : id: TenantShardId::unsharded(TenantId::generate()),
2746 1 : state: TenantState::Broken {
2747 1 : reason: "reason".into(),
2748 1 : backtrace: "backtrace info".into(),
2749 1 : },
2750 1 : current_physical_size: Some(42),
2751 1 : attachment_status: TenantAttachmentStatus::Attached,
2752 1 : generation: 1,
2753 1 : gc_blocking: None,
2754 1 : };
2755 1 : let expected_broken = json!({
2756 1 : "id": original_broken.id.to_string(),
2757 1 : "state": {
2758 1 : "slug": "Broken",
2759 1 : "data": {
2760 1 : "backtrace": "backtrace info",
2761 1 : "reason": "reason",
2762 1 : }
2763 1 : },
2764 1 : "current_physical_size": 42,
2765 1 : "attachment_status": {
2766 1 : "slug":"attached",
2767 1 : },
2768 1 : "generation" : 1
2769 1 : });
2770 1 :
2771 1 : assert_eq!(
2772 1 : serde_json::to_value(&original_active).unwrap(),
2773 1 : expected_active
2774 1 : );
2775 :
2776 1 : assert_eq!(
2777 1 : serde_json::to_value(&original_broken).unwrap(),
2778 1 : expected_broken
2779 1 : );
2780 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
2781 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
2782 1 : }
2783 :
2784 : #[test]
2785 1 : fn test_reject_unknown_field() {
2786 1 : let id = TenantId::generate();
2787 1 : let config_request = json!({
2788 1 : "tenant_id": id.to_string(),
2789 1 : "unknown_field": "unknown_value".to_string(),
2790 1 : });
2791 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
2792 1 : assert!(
2793 1 : err.to_string().contains("unknown field `unknown_field`"),
2794 0 : "expect unknown field `unknown_field` error, got: {}",
2795 : err
2796 : );
2797 1 : }
2798 :
2799 : #[test]
2800 1 : fn tenantstatus_activating_serde() {
2801 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
2802 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
2803 1 :
2804 1 : let actual = serde_json::to_string(&states).unwrap();
2805 1 :
2806 1 : assert_eq!(actual, expected);
2807 :
2808 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
2809 1 :
2810 1 : assert_eq!(states.as_slice(), &parsed);
2811 1 : }
2812 :
2813 : #[test]
2814 1 : fn tenantstatus_activating_strum() {
2815 1 : // tests added, because we use these for metrics
2816 1 : let examples = [
2817 1 : (line!(), TenantState::Attaching, "Attaching"),
2818 1 : (
2819 1 : line!(),
2820 1 : TenantState::Activating(ActivatingFrom::Attaching),
2821 1 : "Activating",
2822 1 : ),
2823 1 : (line!(), TenantState::Active, "Active"),
2824 1 : (
2825 1 : line!(),
2826 1 : TenantState::Stopping { progress: None },
2827 1 : "Stopping",
2828 1 : ),
2829 1 : (
2830 1 : line!(),
2831 1 : TenantState::Stopping {
2832 1 : progress: Some(completion::Barrier::default()),
2833 1 : },
2834 1 : "Stopping",
2835 1 : ),
2836 1 : (
2837 1 : line!(),
2838 1 : TenantState::Broken {
2839 1 : reason: "Example".into(),
2840 1 : backtrace: "Looooong backtrace".into(),
2841 1 : },
2842 1 : "Broken",
2843 1 : ),
2844 1 : ];
2845 :
2846 7 : for (line, rendered, expected) in examples {
2847 6 : let actual: &'static str = rendered.into();
2848 6 : assert_eq!(actual, expected, "example on {line}");
2849 : }
2850 1 : }
2851 :
2852 : #[test]
2853 1 : fn test_image_compression_algorithm_parsing() {
2854 : use ImageCompressionAlgorithm::*;
2855 1 : let cases = [
2856 1 : ("disabled", Disabled),
2857 1 : ("zstd", Zstd { level: None }),
2858 1 : ("zstd(18)", Zstd { level: Some(18) }),
2859 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
2860 1 : ];
2861 :
2862 5 : for (display, expected) in cases {
2863 4 : assert_eq!(
2864 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
2865 : expected,
2866 0 : "parsing works"
2867 : );
2868 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
2869 :
2870 4 : let ser = serde_json::to_string(&expected).expect("serialization");
2871 4 : assert_eq!(
2872 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
2873 : expected,
2874 0 : "serde roundtrip"
2875 : );
2876 :
2877 4 : assert_eq!(
2878 4 : serde_json::Value::String(display.to_string()),
2879 4 : serde_json::to_value(expected).unwrap(),
2880 0 : "Display is the serde serialization"
2881 : );
2882 : }
2883 1 : }
2884 :
2885 : #[test]
2886 1 : fn test_tenant_config_patch_request_serde() {
2887 1 : let patch_request = TenantConfigPatchRequest {
2888 1 : tenant_id: TenantId::from_str("17c6d121946a61e5ab0fe5a2fd4d8215").unwrap(),
2889 1 : config: TenantConfigPatch {
2890 1 : checkpoint_distance: FieldPatch::Upsert(42),
2891 1 : gc_horizon: FieldPatch::Remove,
2892 1 : compaction_threshold: FieldPatch::Noop,
2893 1 : ..TenantConfigPatch::default()
2894 1 : },
2895 1 : };
2896 1 :
2897 1 : let json = serde_json::to_string(&patch_request).unwrap();
2898 1 :
2899 1 : let expected = r#"{"tenant_id":"17c6d121946a61e5ab0fe5a2fd4d8215","checkpoint_distance":42,"gc_horizon":null}"#;
2900 1 : assert_eq!(json, expected);
2901 :
2902 1 : let decoded: TenantConfigPatchRequest = serde_json::from_str(&json).unwrap();
2903 1 : assert_eq!(decoded.tenant_id, patch_request.tenant_id);
2904 1 : assert_eq!(decoded.config, patch_request.config);
2905 :
2906 : // Now apply the patch to a config to demonstrate semantics
2907 :
2908 1 : let base = TenantConfig {
2909 1 : checkpoint_distance: Some(28),
2910 1 : gc_horizon: Some(100),
2911 1 : compaction_target_size: Some(1024),
2912 1 : ..Default::default()
2913 1 : };
2914 1 :
2915 1 : let expected = TenantConfig {
2916 1 : checkpoint_distance: Some(42),
2917 1 : gc_horizon: None,
2918 1 : ..base.clone()
2919 1 : };
2920 1 :
2921 1 : let patched = base.apply_patch(decoded.config).unwrap();
2922 1 :
2923 1 : assert_eq!(patched, expected);
2924 1 : }
2925 : }
|