Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : use core::ops::Range;
6 : use std::collections::HashMap;
7 : use std::fmt::Display;
8 : use std::io::{BufRead, Read};
9 : use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
10 : use std::str::FromStr;
11 : use std::time::{Duration, SystemTime};
12 :
13 : use byteorder::{BigEndian, ReadBytesExt};
14 : use bytes::{Buf, BufMut, Bytes, BytesMut};
15 : #[cfg(feature = "testing")]
16 : use camino::Utf8PathBuf;
17 : use postgres_ffi::BLCKSZ;
18 : use serde::{Deserialize, Deserializer, Serialize, Serializer};
19 : use serde_with::serde_as;
20 : pub use utilization::PageserverUtilization;
21 : use utils::id::{NodeId, TenantId, TimelineId};
22 : use utils::lsn::Lsn;
23 : use utils::postgres_client::PostgresClientProtocol;
24 : use utils::{completion, serde_system_time};
25 :
26 : use crate::config::Ratio;
27 : use crate::key::{CompactKey, Key};
28 : use crate::reltag::RelTag;
29 : use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
30 :
31 : /// The state of a tenant in this pageserver.
32 : ///
33 : /// ```mermaid
34 : /// stateDiagram-v2
35 : ///
36 : /// [*] --> Attaching: spawn_attach()
37 : ///
38 : /// Attaching --> Activating: activate()
39 : /// Activating --> Active: infallible
40 : ///
41 : /// Attaching --> Broken: attach() failure
42 : ///
43 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
44 : /// Stopping --> Broken: late error in remove_tenant_from_memory
45 : ///
46 : /// Broken --> [*]: ignore / detach / shutdown
47 : /// Stopping --> [*]: remove_from_memory complete
48 : ///
49 : /// Active --> Broken: cfg(testing)-only tenant break point
50 : /// ```
51 : #[derive(
52 : Clone,
53 : PartialEq,
54 : Eq,
55 0 : serde::Serialize,
56 1 : serde::Deserialize,
57 : strum_macros::Display,
58 : strum_macros::VariantNames,
59 : strum_macros::AsRefStr,
60 : strum_macros::IntoStaticStr,
61 : )]
62 : #[serde(tag = "slug", content = "data")]
63 : pub enum TenantState {
64 : /// This tenant is being attached to the pageserver.
65 : ///
66 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
67 : Attaching,
68 : /// The tenant is transitioning from Loading/Attaching to Active.
69 : ///
70 : /// While in this state, the individual timelines are being activated.
71 : ///
72 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
73 : Activating(ActivatingFrom),
74 : /// The tenant has finished activating and is open for business.
75 : ///
76 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
77 : Active,
78 : /// The tenant is recognized by pageserver, but it is being detached or the
79 : /// system is being shut down.
80 : ///
81 : /// Transitions out of this state are possible through `set_broken()`.
82 : Stopping {
83 : /// The barrier can be used to wait for shutdown to complete. The first caller to set
84 : /// Some(Barrier) is responsible for driving shutdown to completion. Subsequent callers
85 : /// will wait for the first caller's existing barrier.
86 : ///
87 : /// None is set when an attach is cancelled, to signal to shutdown that the attach has in
88 : /// fact cancelled:
89 : ///
90 : /// 1. `shutdown` sees `TenantState::Attaching`, and cancels the tenant.
91 : /// 2. `attach` sets `TenantState::Stopping(None)` and exits.
92 : /// 3. `set_stopping` waits for `TenantState::Stopping(None)` and sets
93 : /// `TenantState::Stopping(Some)` to claim the barrier as the shutdown owner.
94 : //
95 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
96 : // otherwise it will not be skipped during deserialization
97 : #[serde(skip)]
98 : progress: Option<completion::Barrier>,
99 : },
100 : /// The tenant is recognized by the pageserver, but can no longer be used for
101 : /// any operations.
102 : ///
103 : /// If the tenant fails to load or attach, it will transition to this state
104 : /// and it is guaranteed that no background tasks are running in its name.
105 : ///
106 : /// The other way to transition into this state is from `Stopping` state
107 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
108 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
109 : Broken { reason: String, backtrace: String },
110 : }
111 :
112 : impl TenantState {
113 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
114 : use TenantAttachmentStatus::*;
115 :
116 : // Below TenantState::Activating is used as "transient" or "transparent" state for
117 : // attachment_status determining.
118 0 : match self {
119 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
120 : // So, technically, we can return Attached here.
121 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
122 : // But, our attach task might still be fetching the remote timelines, etc.
123 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
124 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
125 : // We only reach Active after successful load / attach.
126 : // So, call atttachment status Attached.
127 0 : Self::Active => Attached,
128 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
129 : // However, it also becomes Broken if the regular load fails.
130 : // From Console's perspective there's no practical difference
131 : // because attachment_status is polled by console only during attach operation execution.
132 0 : Self::Broken { reason, .. } => Failed {
133 0 : reason: reason.to_owned(),
134 0 : },
135 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
136 : // we set the Stopping state irrespective of whether the tenant
137 : // has finished attaching or not.
138 0 : Self::Stopping { .. } => Maybe,
139 : }
140 0 : }
141 :
142 0 : pub fn broken_from_reason(reason: String) -> Self {
143 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
144 0 : Self::Broken {
145 0 : reason,
146 0 : backtrace: backtrace_str,
147 0 : }
148 0 : }
149 : }
150 :
151 : impl std::fmt::Debug for TenantState {
152 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 2 : match self {
154 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
155 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
156 : }
157 0 : _ => write!(f, "{self}"),
158 : }
159 2 : }
160 : }
161 :
162 : /// A temporary lease to a specific lsn inside a timeline.
163 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
164 : #[serde_as]
165 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
166 : pub struct LsnLease {
167 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
168 : pub valid_until: SystemTime,
169 : }
170 :
171 : serde_with::serde_conv!(
172 : SystemTimeAsRfc3339Millis,
173 : SystemTime,
174 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
175 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
176 : );
177 :
178 : impl LsnLease {
179 : /// The default length for an explicit LSN lease request (10 minutes).
180 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
181 :
182 : /// The default length for an implicit LSN lease granted during
183 : /// `get_lsn_by_timestamp` request (1 minutes).
184 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
185 :
186 : /// Checks whether the lease is expired.
187 36 : pub fn is_expired(&self, now: &SystemTime) -> bool {
188 36 : now > &self.valid_until
189 36 : }
190 : }
191 :
192 : /// Controls the detach ancestor behavior.
193 : /// - When set to `NoAncestorAndReparent`, we will only detach a branch if its ancestor is a root branch. It will automatically reparent any children of the ancestor before and at the branch point.
194 : /// - When set to `MultiLevelAndNoReparent`, we will detach a branch from multiple levels of ancestors, and no reparenting will happen at all.
195 : #[derive(Debug, Clone, Copy, Default)]
196 : pub enum DetachBehavior {
197 : #[default]
198 : NoAncestorAndReparent,
199 : MultiLevelAndNoReparent,
200 : }
201 :
202 : impl std::str::FromStr for DetachBehavior {
203 : type Err = &'static str;
204 :
205 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
206 0 : match s {
207 0 : "no_ancestor_and_reparent" => Ok(DetachBehavior::NoAncestorAndReparent),
208 0 : "multi_level_and_no_reparent" => Ok(DetachBehavior::MultiLevelAndNoReparent),
209 0 : "v1" => Ok(DetachBehavior::NoAncestorAndReparent),
210 0 : "v2" => Ok(DetachBehavior::MultiLevelAndNoReparent),
211 0 : _ => Err("cannot parse detach behavior"),
212 : }
213 0 : }
214 : }
215 :
216 : impl std::fmt::Display for DetachBehavior {
217 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 0 : match self {
219 0 : DetachBehavior::NoAncestorAndReparent => write!(f, "no_ancestor_and_reparent"),
220 0 : DetachBehavior::MultiLevelAndNoReparent => write!(f, "multi_level_and_no_reparent"),
221 : }
222 0 : }
223 : }
224 :
225 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
226 : ///
227 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
228 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
229 1 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
230 : pub enum ActivatingFrom {
231 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
232 : Attaching,
233 : }
234 :
235 : /// A state of a timeline in pageserver's memory.
236 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
237 : pub enum TimelineState {
238 : /// The timeline is recognized by the pageserver but is not yet operational.
239 : /// In particular, the walreceiver connection loop is not running for this timeline.
240 : /// It will eventually transition to state Active or Broken.
241 : Loading,
242 : /// The timeline is fully operational.
243 : /// It can be queried, and the walreceiver connection loop is running.
244 : Active,
245 : /// The timeline was previously Loading or Active but is shutting down.
246 : /// It cannot transition back into any other state.
247 : Stopping,
248 : /// The timeline is broken and not operational (previous states: Loading or Active).
249 : Broken { reason: String, backtrace: String },
250 : }
251 :
252 : #[serde_with::serde_as]
253 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
254 : pub struct CompactLsnRange {
255 : pub start: Lsn,
256 : pub end: Lsn,
257 : }
258 :
259 : #[serde_with::serde_as]
260 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
261 : pub struct CompactKeyRange {
262 : #[serde_as(as = "serde_with::DisplayFromStr")]
263 : pub start: Key,
264 : #[serde_as(as = "serde_with::DisplayFromStr")]
265 : pub end: Key,
266 : }
267 :
268 : impl From<Range<Lsn>> for CompactLsnRange {
269 36 : fn from(range: Range<Lsn>) -> Self {
270 36 : Self {
271 36 : start: range.start,
272 36 : end: range.end,
273 36 : }
274 36 : }
275 : }
276 :
277 : impl From<Range<Key>> for CompactKeyRange {
278 96 : fn from(range: Range<Key>) -> Self {
279 96 : Self {
280 96 : start: range.start,
281 96 : end: range.end,
282 96 : }
283 96 : }
284 : }
285 :
286 : impl From<CompactLsnRange> for Range<Lsn> {
287 60 : fn from(range: CompactLsnRange) -> Self {
288 60 : range.start..range.end
289 60 : }
290 : }
291 :
292 : impl From<CompactKeyRange> for Range<Key> {
293 96 : fn from(range: CompactKeyRange) -> Self {
294 96 : range.start..range.end
295 96 : }
296 : }
297 :
298 : impl CompactLsnRange {
299 24 : pub fn above(lsn: Lsn) -> Self {
300 24 : Self {
301 24 : start: lsn,
302 24 : end: Lsn::MAX,
303 24 : }
304 24 : }
305 : }
306 :
307 : #[derive(Debug, Clone, Serialize)]
308 : pub struct CompactInfoResponse {
309 : pub compact_key_range: Option<CompactKeyRange>,
310 : pub compact_lsn_range: Option<CompactLsnRange>,
311 : pub sub_compaction: bool,
312 : pub running: bool,
313 : pub job_id: usize,
314 : }
315 :
316 0 : #[derive(Serialize, Deserialize, Clone)]
317 : pub struct TimelineCreateRequest {
318 : pub new_timeline_id: TimelineId,
319 : #[serde(flatten)]
320 : pub mode: TimelineCreateRequestMode,
321 : }
322 :
323 : /// Storage controller specific extensions to [`TimelineInfo`].
324 0 : #[derive(Serialize, Deserialize, Clone)]
325 : pub struct TimelineCreateResponseStorcon {
326 : #[serde(flatten)]
327 : pub timeline_info: TimelineInfo,
328 :
329 : pub safekeepers: Option<SafekeepersInfo>,
330 : }
331 :
332 : /// Safekeepers as returned in timeline creation request to storcon or pushed to
333 : /// cplane in the post migration hook.
334 0 : #[derive(Serialize, Deserialize, Clone)]
335 : pub struct SafekeepersInfo {
336 : pub tenant_id: TenantId,
337 : pub timeline_id: TimelineId,
338 : pub generation: u32,
339 : pub safekeepers: Vec<SafekeeperInfo>,
340 : }
341 :
342 0 : #[derive(Serialize, Deserialize, Clone)]
343 : pub struct SafekeeperInfo {
344 : pub id: NodeId,
345 : pub hostname: String,
346 : }
347 :
348 0 : #[derive(Serialize, Deserialize, Clone)]
349 : #[serde(untagged)]
350 : pub enum TimelineCreateRequestMode {
351 : Branch {
352 : ancestor_timeline_id: TimelineId,
353 : #[serde(default)]
354 : ancestor_start_lsn: Option<Lsn>,
355 : // TODO: cplane sets this, but, the branching code always
356 : // inherits the ancestor's pg_version. Earlier code wasn't
357 : // using a flattened enum, so, it was an accepted field, and
358 : // we continue to accept it by having it here.
359 : pg_version: Option<u32>,
360 : },
361 : ImportPgdata {
362 : import_pgdata: TimelineCreateRequestModeImportPgdata,
363 : },
364 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
365 : // (serde picks the first matching enum variant, in declaration order).
366 : Bootstrap {
367 : #[serde(default)]
368 : existing_initdb_timeline_id: Option<TimelineId>,
369 : pg_version: Option<u32>,
370 : },
371 : }
372 :
373 0 : #[derive(Serialize, Deserialize, Clone)]
374 : pub struct TimelineCreateRequestModeImportPgdata {
375 : pub location: ImportPgdataLocation,
376 : pub idempotency_key: ImportPgdataIdempotencyKey,
377 : }
378 :
379 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
380 : pub enum ImportPgdataLocation {
381 : #[cfg(feature = "testing")]
382 : LocalFs { path: Utf8PathBuf },
383 : AwsS3 {
384 : region: String,
385 : bucket: String,
386 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
387 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
388 : key: String,
389 : },
390 : }
391 :
392 0 : #[derive(Serialize, Deserialize, Clone)]
393 : #[serde(transparent)]
394 : pub struct ImportPgdataIdempotencyKey(pub String);
395 :
396 : impl ImportPgdataIdempotencyKey {
397 0 : pub fn random() -> Self {
398 : use rand::Rng;
399 : use rand::distributions::Alphanumeric;
400 0 : Self(
401 0 : rand::thread_rng()
402 0 : .sample_iter(&Alphanumeric)
403 0 : .take(20)
404 0 : .map(char::from)
405 0 : .collect(),
406 0 : )
407 0 : }
408 : }
409 :
410 0 : #[derive(Serialize, Deserialize, Clone)]
411 : pub struct LsnLeaseRequest {
412 : pub lsn: Lsn,
413 : }
414 :
415 0 : #[derive(Serialize, Deserialize)]
416 : pub struct TenantShardSplitRequest {
417 : pub new_shard_count: u8,
418 :
419 : // A tenant's stripe size is only meaningful the first time their shard count goes
420 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
421 : //
422 : // If this is set while the stripe count is being increased from an already >1 value,
423 : // then the request will fail with 400.
424 : pub new_stripe_size: Option<ShardStripeSize>,
425 : }
426 :
427 0 : #[derive(Serialize, Deserialize)]
428 : pub struct TenantShardSplitResponse {
429 : pub new_shards: Vec<TenantShardId>,
430 : }
431 :
432 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
433 0 : #[derive(Serialize, Deserialize, Debug)]
434 : #[serde(deny_unknown_fields)]
435 : pub struct ShardParameters {
436 : pub count: ShardCount,
437 : pub stripe_size: ShardStripeSize,
438 : }
439 :
440 : impl ShardParameters {
441 0 : pub fn is_unsharded(&self) -> bool {
442 0 : self.count.is_unsharded()
443 0 : }
444 : }
445 :
446 : impl Default for ShardParameters {
447 1393 : fn default() -> Self {
448 1393 : Self {
449 1393 : count: ShardCount::new(0),
450 1393 : stripe_size: DEFAULT_STRIPE_SIZE,
451 1393 : }
452 1393 : }
453 : }
454 :
455 : #[derive(Debug, Default, Clone, Eq, PartialEq)]
456 : pub enum FieldPatch<T> {
457 : Upsert(T),
458 : Remove,
459 : #[default]
460 : Noop,
461 : }
462 :
463 : impl<T> FieldPatch<T> {
464 74 : fn is_noop(&self) -> bool {
465 74 : matches!(self, FieldPatch::Noop)
466 74 : }
467 :
468 37 : pub fn apply(self, target: &mut Option<T>) {
469 37 : match self {
470 1 : Self::Upsert(v) => *target = Some(v),
471 1 : Self::Remove => *target = None,
472 35 : Self::Noop => {}
473 : }
474 37 : }
475 :
476 10 : pub fn map<U, E, F: FnOnce(T) -> Result<U, E>>(self, map: F) -> Result<FieldPatch<U>, E> {
477 10 : match self {
478 0 : Self::Upsert(v) => Ok(FieldPatch::<U>::Upsert(map(v)?)),
479 0 : Self::Remove => Ok(FieldPatch::<U>::Remove),
480 10 : Self::Noop => Ok(FieldPatch::<U>::Noop),
481 : }
482 10 : }
483 : }
484 :
485 : impl<'de, T: Deserialize<'de>> Deserialize<'de> for FieldPatch<T> {
486 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
487 2 : where
488 2 : D: Deserializer<'de>,
489 2 : {
490 2 : Option::deserialize(deserializer).map(|opt| match opt {
491 1 : None => FieldPatch::Remove,
492 1 : Some(val) => FieldPatch::Upsert(val),
493 2 : })
494 2 : }
495 : }
496 :
497 : impl<T: Serialize> Serialize for FieldPatch<T> {
498 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
499 2 : where
500 2 : S: Serializer,
501 2 : {
502 2 : match self {
503 1 : FieldPatch::Upsert(val) => serializer.serialize_some(val),
504 1 : FieldPatch::Remove => serializer.serialize_none(),
505 0 : FieldPatch::Noop => unreachable!(),
506 : }
507 2 : }
508 : }
509 :
510 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
511 : #[serde(default)]
512 : pub struct TenantConfigPatch {
513 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
514 : pub checkpoint_distance: FieldPatch<u64>,
515 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
516 : pub checkpoint_timeout: FieldPatch<String>,
517 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
518 : pub compaction_target_size: FieldPatch<u64>,
519 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
520 : pub compaction_period: FieldPatch<String>,
521 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
522 : pub compaction_threshold: FieldPatch<usize>,
523 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
524 : pub compaction_upper_limit: FieldPatch<usize>,
525 : // defer parsing compaction_algorithm, like eviction_policy
526 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
527 : pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
528 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
529 : pub compaction_shard_ancestor: FieldPatch<bool>,
530 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
531 : pub compaction_l0_first: FieldPatch<bool>,
532 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
533 : pub compaction_l0_semaphore: FieldPatch<bool>,
534 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
535 : pub l0_flush_delay_threshold: FieldPatch<usize>,
536 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
537 : pub l0_flush_stall_threshold: FieldPatch<usize>,
538 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
539 : pub gc_horizon: FieldPatch<u64>,
540 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
541 : pub gc_period: FieldPatch<String>,
542 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
543 : pub image_creation_threshold: FieldPatch<usize>,
544 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
545 : pub pitr_interval: FieldPatch<String>,
546 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
547 : pub walreceiver_connect_timeout: FieldPatch<String>,
548 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
549 : pub lagging_wal_timeout: FieldPatch<String>,
550 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
551 : pub max_lsn_wal_lag: FieldPatch<NonZeroU64>,
552 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
553 : pub eviction_policy: FieldPatch<EvictionPolicy>,
554 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
555 : pub min_resident_size_override: FieldPatch<u64>,
556 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
557 : pub evictions_low_residence_duration_metric_threshold: FieldPatch<String>,
558 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
559 : pub heatmap_period: FieldPatch<String>,
560 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
561 : pub lazy_slru_download: FieldPatch<bool>,
562 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
563 : pub timeline_get_throttle: FieldPatch<ThrottleConfig>,
564 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
565 : pub image_layer_creation_check_threshold: FieldPatch<u8>,
566 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
567 : pub image_creation_preempt_threshold: FieldPatch<usize>,
568 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
569 : pub lsn_lease_length: FieldPatch<String>,
570 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
571 : pub lsn_lease_length_for_ts: FieldPatch<String>,
572 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
573 : pub timeline_offloading: FieldPatch<bool>,
574 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
575 : pub wal_receiver_protocol_override: FieldPatch<PostgresClientProtocol>,
576 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
577 : pub rel_size_v2_enabled: FieldPatch<bool>,
578 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
579 : pub gc_compaction_enabled: FieldPatch<bool>,
580 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
581 : pub gc_compaction_verification: FieldPatch<bool>,
582 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
583 : pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
584 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
585 : pub gc_compaction_ratio_percent: FieldPatch<u64>,
586 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
587 : pub sampling_ratio: FieldPatch<Option<Ratio>>,
588 : }
589 :
590 : /// Like [`crate::config::TenantConfigToml`], but preserves the information
591 : /// about which parameters are set and which are not.
592 : ///
593 : /// Used in many places, including durably stored ones.
594 24 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
595 : #[serde(default)] // this maps omitted fields in deserialization to None
596 : pub struct TenantConfig {
597 : #[serde(skip_serializing_if = "Option::is_none")]
598 : pub checkpoint_distance: Option<u64>,
599 :
600 : #[serde(skip_serializing_if = "Option::is_none")]
601 : #[serde(with = "humantime_serde")]
602 : pub checkpoint_timeout: Option<Duration>,
603 :
604 : #[serde(skip_serializing_if = "Option::is_none")]
605 : pub compaction_target_size: Option<u64>,
606 :
607 : #[serde(skip_serializing_if = "Option::is_none")]
608 : #[serde(with = "humantime_serde")]
609 : pub compaction_period: Option<Duration>,
610 :
611 : #[serde(skip_serializing_if = "Option::is_none")]
612 : pub compaction_threshold: Option<usize>,
613 :
614 : #[serde(skip_serializing_if = "Option::is_none")]
615 : pub compaction_upper_limit: Option<usize>,
616 :
617 : #[serde(skip_serializing_if = "Option::is_none")]
618 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
619 :
620 : #[serde(skip_serializing_if = "Option::is_none")]
621 : pub compaction_shard_ancestor: Option<bool>,
622 :
623 : #[serde(skip_serializing_if = "Option::is_none")]
624 : pub compaction_l0_first: Option<bool>,
625 :
626 : #[serde(skip_serializing_if = "Option::is_none")]
627 : pub compaction_l0_semaphore: Option<bool>,
628 :
629 : #[serde(skip_serializing_if = "Option::is_none")]
630 : pub l0_flush_delay_threshold: Option<usize>,
631 :
632 : #[serde(skip_serializing_if = "Option::is_none")]
633 : pub l0_flush_stall_threshold: Option<usize>,
634 :
635 : #[serde(skip_serializing_if = "Option::is_none")]
636 : pub gc_horizon: Option<u64>,
637 :
638 : #[serde(skip_serializing_if = "Option::is_none")]
639 : #[serde(with = "humantime_serde")]
640 : pub gc_period: Option<Duration>,
641 :
642 : #[serde(skip_serializing_if = "Option::is_none")]
643 : pub image_creation_threshold: Option<usize>,
644 :
645 : #[serde(skip_serializing_if = "Option::is_none")]
646 : #[serde(with = "humantime_serde")]
647 : pub pitr_interval: Option<Duration>,
648 :
649 : #[serde(skip_serializing_if = "Option::is_none")]
650 : #[serde(with = "humantime_serde")]
651 : pub walreceiver_connect_timeout: Option<Duration>,
652 :
653 : #[serde(skip_serializing_if = "Option::is_none")]
654 : #[serde(with = "humantime_serde")]
655 : pub lagging_wal_timeout: Option<Duration>,
656 :
657 : #[serde(skip_serializing_if = "Option::is_none")]
658 : pub max_lsn_wal_lag: Option<NonZeroU64>,
659 :
660 : #[serde(skip_serializing_if = "Option::is_none")]
661 : pub eviction_policy: Option<EvictionPolicy>,
662 :
663 : #[serde(skip_serializing_if = "Option::is_none")]
664 : pub min_resident_size_override: Option<u64>,
665 :
666 : #[serde(skip_serializing_if = "Option::is_none")]
667 : #[serde(with = "humantime_serde")]
668 : pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
669 :
670 : #[serde(skip_serializing_if = "Option::is_none")]
671 : #[serde(with = "humantime_serde")]
672 : pub heatmap_period: Option<Duration>,
673 :
674 : #[serde(skip_serializing_if = "Option::is_none")]
675 : pub lazy_slru_download: Option<bool>,
676 :
677 : #[serde(skip_serializing_if = "Option::is_none")]
678 : pub timeline_get_throttle: Option<ThrottleConfig>,
679 :
680 : #[serde(skip_serializing_if = "Option::is_none")]
681 : pub image_layer_creation_check_threshold: Option<u8>,
682 :
683 : #[serde(skip_serializing_if = "Option::is_none")]
684 : pub image_creation_preempt_threshold: Option<usize>,
685 :
686 : #[serde(skip_serializing_if = "Option::is_none")]
687 : #[serde(with = "humantime_serde")]
688 : pub lsn_lease_length: Option<Duration>,
689 :
690 : #[serde(skip_serializing_if = "Option::is_none")]
691 : #[serde(with = "humantime_serde")]
692 : pub lsn_lease_length_for_ts: Option<Duration>,
693 :
694 : #[serde(skip_serializing_if = "Option::is_none")]
695 : pub timeline_offloading: Option<bool>,
696 :
697 : #[serde(skip_serializing_if = "Option::is_none")]
698 : pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
699 :
700 : #[serde(skip_serializing_if = "Option::is_none")]
701 : pub rel_size_v2_enabled: Option<bool>,
702 :
703 : #[serde(skip_serializing_if = "Option::is_none")]
704 : pub gc_compaction_enabled: Option<bool>,
705 :
706 : #[serde(skip_serializing_if = "Option::is_none")]
707 : pub gc_compaction_verification: Option<bool>,
708 :
709 : #[serde(skip_serializing_if = "Option::is_none")]
710 : pub gc_compaction_initial_threshold_kb: Option<u64>,
711 :
712 : #[serde(skip_serializing_if = "Option::is_none")]
713 : pub gc_compaction_ratio_percent: Option<u64>,
714 :
715 : #[serde(skip_serializing_if = "Option::is_none")]
716 : pub sampling_ratio: Option<Option<Ratio>>,
717 : }
718 :
719 : impl TenantConfig {
720 1 : pub fn apply_patch(
721 1 : self,
722 1 : patch: TenantConfigPatch,
723 1 : ) -> Result<TenantConfig, humantime::DurationError> {
724 1 : let Self {
725 1 : mut checkpoint_distance,
726 1 : mut checkpoint_timeout,
727 1 : mut compaction_target_size,
728 1 : mut compaction_period,
729 1 : mut compaction_threshold,
730 1 : mut compaction_upper_limit,
731 1 : mut compaction_algorithm,
732 1 : mut compaction_shard_ancestor,
733 1 : mut compaction_l0_first,
734 1 : mut compaction_l0_semaphore,
735 1 : mut l0_flush_delay_threshold,
736 1 : mut l0_flush_stall_threshold,
737 1 : mut gc_horizon,
738 1 : mut gc_period,
739 1 : mut image_creation_threshold,
740 1 : mut pitr_interval,
741 1 : mut walreceiver_connect_timeout,
742 1 : mut lagging_wal_timeout,
743 1 : mut max_lsn_wal_lag,
744 1 : mut eviction_policy,
745 1 : mut min_resident_size_override,
746 1 : mut evictions_low_residence_duration_metric_threshold,
747 1 : mut heatmap_period,
748 1 : mut lazy_slru_download,
749 1 : mut timeline_get_throttle,
750 1 : mut image_layer_creation_check_threshold,
751 1 : mut image_creation_preempt_threshold,
752 1 : mut lsn_lease_length,
753 1 : mut lsn_lease_length_for_ts,
754 1 : mut timeline_offloading,
755 1 : mut wal_receiver_protocol_override,
756 1 : mut rel_size_v2_enabled,
757 1 : mut gc_compaction_enabled,
758 1 : mut gc_compaction_verification,
759 1 : mut gc_compaction_initial_threshold_kb,
760 1 : mut gc_compaction_ratio_percent,
761 1 : mut sampling_ratio,
762 1 : } = self;
763 1 :
764 1 : patch.checkpoint_distance.apply(&mut checkpoint_distance);
765 1 : patch
766 1 : .checkpoint_timeout
767 1 : .map(|v| humantime::parse_duration(&v))?
768 1 : .apply(&mut checkpoint_timeout);
769 1 : patch
770 1 : .compaction_target_size
771 1 : .apply(&mut compaction_target_size);
772 1 : patch
773 1 : .compaction_period
774 1 : .map(|v| humantime::parse_duration(&v))?
775 1 : .apply(&mut compaction_period);
776 1 : patch.compaction_threshold.apply(&mut compaction_threshold);
777 1 : patch
778 1 : .compaction_upper_limit
779 1 : .apply(&mut compaction_upper_limit);
780 1 : patch.compaction_algorithm.apply(&mut compaction_algorithm);
781 1 : patch
782 1 : .compaction_shard_ancestor
783 1 : .apply(&mut compaction_shard_ancestor);
784 1 : patch.compaction_l0_first.apply(&mut compaction_l0_first);
785 1 : patch
786 1 : .compaction_l0_semaphore
787 1 : .apply(&mut compaction_l0_semaphore);
788 1 : patch
789 1 : .l0_flush_delay_threshold
790 1 : .apply(&mut l0_flush_delay_threshold);
791 1 : patch
792 1 : .l0_flush_stall_threshold
793 1 : .apply(&mut l0_flush_stall_threshold);
794 1 : patch.gc_horizon.apply(&mut gc_horizon);
795 1 : patch
796 1 : .gc_period
797 1 : .map(|v| humantime::parse_duration(&v))?
798 1 : .apply(&mut gc_period);
799 1 : patch
800 1 : .image_creation_threshold
801 1 : .apply(&mut image_creation_threshold);
802 1 : patch
803 1 : .pitr_interval
804 1 : .map(|v| humantime::parse_duration(&v))?
805 1 : .apply(&mut pitr_interval);
806 1 : patch
807 1 : .walreceiver_connect_timeout
808 1 : .map(|v| humantime::parse_duration(&v))?
809 1 : .apply(&mut walreceiver_connect_timeout);
810 1 : patch
811 1 : .lagging_wal_timeout
812 1 : .map(|v| humantime::parse_duration(&v))?
813 1 : .apply(&mut lagging_wal_timeout);
814 1 : patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
815 1 : patch.eviction_policy.apply(&mut eviction_policy);
816 1 : patch
817 1 : .min_resident_size_override
818 1 : .apply(&mut min_resident_size_override);
819 1 : patch
820 1 : .evictions_low_residence_duration_metric_threshold
821 1 : .map(|v| humantime::parse_duration(&v))?
822 1 : .apply(&mut evictions_low_residence_duration_metric_threshold);
823 1 : patch
824 1 : .heatmap_period
825 1 : .map(|v| humantime::parse_duration(&v))?
826 1 : .apply(&mut heatmap_period);
827 1 : patch.lazy_slru_download.apply(&mut lazy_slru_download);
828 1 : patch
829 1 : .timeline_get_throttle
830 1 : .apply(&mut timeline_get_throttle);
831 1 : patch
832 1 : .image_layer_creation_check_threshold
833 1 : .apply(&mut image_layer_creation_check_threshold);
834 1 : patch
835 1 : .image_creation_preempt_threshold
836 1 : .apply(&mut image_creation_preempt_threshold);
837 1 : patch
838 1 : .lsn_lease_length
839 1 : .map(|v| humantime::parse_duration(&v))?
840 1 : .apply(&mut lsn_lease_length);
841 1 : patch
842 1 : .lsn_lease_length_for_ts
843 1 : .map(|v| humantime::parse_duration(&v))?
844 1 : .apply(&mut lsn_lease_length_for_ts);
845 1 : patch.timeline_offloading.apply(&mut timeline_offloading);
846 1 : patch
847 1 : .wal_receiver_protocol_override
848 1 : .apply(&mut wal_receiver_protocol_override);
849 1 : patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled);
850 1 : patch
851 1 : .gc_compaction_enabled
852 1 : .apply(&mut gc_compaction_enabled);
853 1 : patch
854 1 : .gc_compaction_verification
855 1 : .apply(&mut gc_compaction_verification);
856 1 : patch
857 1 : .gc_compaction_initial_threshold_kb
858 1 : .apply(&mut gc_compaction_initial_threshold_kb);
859 1 : patch
860 1 : .gc_compaction_ratio_percent
861 1 : .apply(&mut gc_compaction_ratio_percent);
862 1 : patch.sampling_ratio.apply(&mut sampling_ratio);
863 1 :
864 1 : Ok(Self {
865 1 : checkpoint_distance,
866 1 : checkpoint_timeout,
867 1 : compaction_target_size,
868 1 : compaction_period,
869 1 : compaction_threshold,
870 1 : compaction_upper_limit,
871 1 : compaction_algorithm,
872 1 : compaction_shard_ancestor,
873 1 : compaction_l0_first,
874 1 : compaction_l0_semaphore,
875 1 : l0_flush_delay_threshold,
876 1 : l0_flush_stall_threshold,
877 1 : gc_horizon,
878 1 : gc_period,
879 1 : image_creation_threshold,
880 1 : pitr_interval,
881 1 : walreceiver_connect_timeout,
882 1 : lagging_wal_timeout,
883 1 : max_lsn_wal_lag,
884 1 : eviction_policy,
885 1 : min_resident_size_override,
886 1 : evictions_low_residence_duration_metric_threshold,
887 1 : heatmap_period,
888 1 : lazy_slru_download,
889 1 : timeline_get_throttle,
890 1 : image_layer_creation_check_threshold,
891 1 : image_creation_preempt_threshold,
892 1 : lsn_lease_length,
893 1 : lsn_lease_length_for_ts,
894 1 : timeline_offloading,
895 1 : wal_receiver_protocol_override,
896 1 : rel_size_v2_enabled,
897 1 : gc_compaction_enabled,
898 1 : gc_compaction_verification,
899 1 : gc_compaction_initial_threshold_kb,
900 1 : gc_compaction_ratio_percent,
901 1 : sampling_ratio,
902 1 : })
903 1 : }
904 :
905 0 : pub fn merge(
906 0 : &self,
907 0 : global_conf: crate::config::TenantConfigToml,
908 0 : ) -> crate::config::TenantConfigToml {
909 0 : crate::config::TenantConfigToml {
910 0 : checkpoint_distance: self
911 0 : .checkpoint_distance
912 0 : .unwrap_or(global_conf.checkpoint_distance),
913 0 : checkpoint_timeout: self
914 0 : .checkpoint_timeout
915 0 : .unwrap_or(global_conf.checkpoint_timeout),
916 0 : compaction_target_size: self
917 0 : .compaction_target_size
918 0 : .unwrap_or(global_conf.compaction_target_size),
919 0 : compaction_period: self
920 0 : .compaction_period
921 0 : .unwrap_or(global_conf.compaction_period),
922 0 : compaction_threshold: self
923 0 : .compaction_threshold
924 0 : .unwrap_or(global_conf.compaction_threshold),
925 0 : compaction_upper_limit: self
926 0 : .compaction_upper_limit
927 0 : .unwrap_or(global_conf.compaction_upper_limit),
928 0 : compaction_algorithm: self
929 0 : .compaction_algorithm
930 0 : .as_ref()
931 0 : .unwrap_or(&global_conf.compaction_algorithm)
932 0 : .clone(),
933 0 : compaction_shard_ancestor: self
934 0 : .compaction_shard_ancestor
935 0 : .unwrap_or(global_conf.compaction_shard_ancestor),
936 0 : compaction_l0_first: self
937 0 : .compaction_l0_first
938 0 : .unwrap_or(global_conf.compaction_l0_first),
939 0 : compaction_l0_semaphore: self
940 0 : .compaction_l0_semaphore
941 0 : .unwrap_or(global_conf.compaction_l0_semaphore),
942 0 : l0_flush_delay_threshold: self
943 0 : .l0_flush_delay_threshold
944 0 : .or(global_conf.l0_flush_delay_threshold),
945 0 : l0_flush_stall_threshold: self
946 0 : .l0_flush_stall_threshold
947 0 : .or(global_conf.l0_flush_stall_threshold),
948 0 : gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
949 0 : gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
950 0 : image_creation_threshold: self
951 0 : .image_creation_threshold
952 0 : .unwrap_or(global_conf.image_creation_threshold),
953 0 : pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
954 0 : walreceiver_connect_timeout: self
955 0 : .walreceiver_connect_timeout
956 0 : .unwrap_or(global_conf.walreceiver_connect_timeout),
957 0 : lagging_wal_timeout: self
958 0 : .lagging_wal_timeout
959 0 : .unwrap_or(global_conf.lagging_wal_timeout),
960 0 : max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
961 0 : eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
962 0 : min_resident_size_override: self
963 0 : .min_resident_size_override
964 0 : .or(global_conf.min_resident_size_override),
965 0 : evictions_low_residence_duration_metric_threshold: self
966 0 : .evictions_low_residence_duration_metric_threshold
967 0 : .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
968 0 : heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
969 0 : lazy_slru_download: self
970 0 : .lazy_slru_download
971 0 : .unwrap_or(global_conf.lazy_slru_download),
972 0 : timeline_get_throttle: self
973 0 : .timeline_get_throttle
974 0 : .clone()
975 0 : .unwrap_or(global_conf.timeline_get_throttle),
976 0 : image_layer_creation_check_threshold: self
977 0 : .image_layer_creation_check_threshold
978 0 : .unwrap_or(global_conf.image_layer_creation_check_threshold),
979 0 : image_creation_preempt_threshold: self
980 0 : .image_creation_preempt_threshold
981 0 : .unwrap_or(global_conf.image_creation_preempt_threshold),
982 0 : lsn_lease_length: self
983 0 : .lsn_lease_length
984 0 : .unwrap_or(global_conf.lsn_lease_length),
985 0 : lsn_lease_length_for_ts: self
986 0 : .lsn_lease_length_for_ts
987 0 : .unwrap_or(global_conf.lsn_lease_length_for_ts),
988 0 : timeline_offloading: self
989 0 : .timeline_offloading
990 0 : .unwrap_or(global_conf.timeline_offloading),
991 0 : wal_receiver_protocol_override: self
992 0 : .wal_receiver_protocol_override
993 0 : .or(global_conf.wal_receiver_protocol_override),
994 0 : rel_size_v2_enabled: self
995 0 : .rel_size_v2_enabled
996 0 : .unwrap_or(global_conf.rel_size_v2_enabled),
997 0 : gc_compaction_enabled: self
998 0 : .gc_compaction_enabled
999 0 : .unwrap_or(global_conf.gc_compaction_enabled),
1000 0 : gc_compaction_verification: self
1001 0 : .gc_compaction_verification
1002 0 : .unwrap_or(global_conf.gc_compaction_verification),
1003 0 : gc_compaction_initial_threshold_kb: self
1004 0 : .gc_compaction_initial_threshold_kb
1005 0 : .unwrap_or(global_conf.gc_compaction_initial_threshold_kb),
1006 0 : gc_compaction_ratio_percent: self
1007 0 : .gc_compaction_ratio_percent
1008 0 : .unwrap_or(global_conf.gc_compaction_ratio_percent),
1009 0 : sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
1010 0 : }
1011 0 : }
1012 : }
1013 :
1014 : /// The policy for the aux file storage.
1015 : ///
1016 : /// It can be switched through `switch_aux_file_policy` tenant config.
1017 : /// When the first aux file written, the policy will be persisted in the
1018 : /// `index_part.json` file and has a limited migration path.
1019 : ///
1020 : /// Currently, we only allow the following migration path:
1021 : ///
1022 : /// Unset -> V1
1023 : /// -> V2
1024 : /// -> CrossValidation -> V2
1025 : #[derive(
1026 : Eq,
1027 : PartialEq,
1028 : Debug,
1029 : Copy,
1030 : Clone,
1031 0 : strum_macros::EnumString,
1032 : strum_macros::Display,
1033 12 : serde_with::DeserializeFromStr,
1034 : serde_with::SerializeDisplay,
1035 : )]
1036 : #[strum(serialize_all = "kebab-case")]
1037 : pub enum AuxFilePolicy {
1038 : /// V1 aux file policy: store everything in AUX_FILE_KEY
1039 : #[strum(ascii_case_insensitive)]
1040 : V1,
1041 : /// V2 aux file policy: store in the AUX_FILE keyspace
1042 : #[strum(ascii_case_insensitive)]
1043 : V2,
1044 : /// Cross validation runs both formats on the write path and does validation
1045 : /// on the read path.
1046 : #[strum(ascii_case_insensitive)]
1047 : CrossValidation,
1048 : }
1049 :
1050 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1051 : #[serde(tag = "kind")]
1052 : pub enum EvictionPolicy {
1053 : NoEviction,
1054 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
1055 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
1056 : }
1057 :
1058 : impl EvictionPolicy {
1059 0 : pub fn discriminant_str(&self) -> &'static str {
1060 0 : match self {
1061 0 : EvictionPolicy::NoEviction => "NoEviction",
1062 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
1063 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
1064 : }
1065 0 : }
1066 : }
1067 :
1068 : #[derive(
1069 : Eq,
1070 : PartialEq,
1071 : Debug,
1072 : Copy,
1073 : Clone,
1074 0 : strum_macros::EnumString,
1075 : strum_macros::Display,
1076 0 : serde_with::DeserializeFromStr,
1077 : serde_with::SerializeDisplay,
1078 : )]
1079 : #[strum(serialize_all = "kebab-case")]
1080 : pub enum CompactionAlgorithm {
1081 : Legacy,
1082 : Tiered,
1083 : }
1084 :
1085 : #[derive(
1086 4 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
1087 : )]
1088 : pub enum ImageCompressionAlgorithm {
1089 : // Disabled for writes, support decompressing during read path
1090 : Disabled,
1091 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
1092 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
1093 : Zstd {
1094 : level: Option<i8>,
1095 : },
1096 : }
1097 :
1098 : impl FromStr for ImageCompressionAlgorithm {
1099 : type Err = anyhow::Error;
1100 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
1101 8 : let mut components = s.split(['(', ')']);
1102 8 : let first = components
1103 8 : .next()
1104 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
1105 8 : match first {
1106 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
1107 6 : "zstd" => {
1108 6 : let level = if let Some(v) = components.next() {
1109 4 : let v: i8 = v.parse()?;
1110 4 : Some(v)
1111 : } else {
1112 2 : None
1113 : };
1114 :
1115 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
1116 : }
1117 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
1118 : }
1119 8 : }
1120 : }
1121 :
1122 : impl Display for ImageCompressionAlgorithm {
1123 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1124 12 : match self {
1125 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
1126 9 : ImageCompressionAlgorithm::Zstd { level } => {
1127 9 : if let Some(level) = level {
1128 6 : write!(f, "zstd({})", level)
1129 : } else {
1130 3 : write!(f, "zstd")
1131 : }
1132 : }
1133 : }
1134 12 : }
1135 : }
1136 :
1137 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
1138 : pub struct CompactionAlgorithmSettings {
1139 : pub kind: CompactionAlgorithm,
1140 : }
1141 :
1142 0 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
1143 : #[serde(tag = "mode", rename_all = "kebab-case")]
1144 : pub enum L0FlushConfig {
1145 : #[serde(rename_all = "snake_case")]
1146 : Direct { max_concurrency: NonZeroUsize },
1147 : }
1148 :
1149 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1150 : pub struct EvictionPolicyLayerAccessThreshold {
1151 : #[serde(with = "humantime_serde")]
1152 : pub period: Duration,
1153 : #[serde(with = "humantime_serde")]
1154 : pub threshold: Duration,
1155 : }
1156 :
1157 6 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
1158 : pub struct ThrottleConfig {
1159 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
1160 : #[serde(rename = "task_kinds")]
1161 : pub enabled: ThrottleConfigTaskKinds,
1162 : pub initial: u32,
1163 : #[serde(with = "humantime_serde")]
1164 : pub refill_interval: Duration,
1165 : pub refill_amount: NonZeroU32,
1166 : pub max: u32,
1167 : }
1168 :
1169 : /// Before <https://github.com/neondatabase/neon/pull/9962>
1170 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
1171 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
1172 : /// were subject to the throttle.
1173 : ///
1174 : /// After that PR, the throttle is applied at pagestream request level
1175 : /// and the `task_kinds` field does not apply since the only task kind
1176 : /// that us subject to the throttle is that of the page service.
1177 : ///
1178 : /// However, we don't want to make a breaking config change right now
1179 : /// because it means we have to migrate all the tenant configs.
1180 : /// This will be done in a future PR.
1181 : ///
1182 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
1183 : /// field to determine if the throttle is enabled or not.
1184 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1185 : #[serde(transparent)]
1186 : pub struct ThrottleConfigTaskKinds(Vec<String>);
1187 :
1188 : impl ThrottleConfigTaskKinds {
1189 1501 : pub fn disabled() -> Self {
1190 1501 : Self(vec![])
1191 1501 : }
1192 1394 : pub fn is_enabled(&self) -> bool {
1193 1394 : !self.0.is_empty()
1194 1394 : }
1195 : }
1196 :
1197 : impl ThrottleConfig {
1198 1501 : pub fn disabled() -> Self {
1199 1501 : Self {
1200 1501 : enabled: ThrottleConfigTaskKinds::disabled(),
1201 1501 : // other values don't matter with emtpy `task_kinds`.
1202 1501 : initial: 0,
1203 1501 : refill_interval: Duration::from_millis(1),
1204 1501 : refill_amount: NonZeroU32::new(1).unwrap(),
1205 1501 : max: 1,
1206 1501 : }
1207 1501 : }
1208 : /// The requests per second allowed by the given config.
1209 0 : pub fn steady_rps(&self) -> f64 {
1210 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
1211 0 : }
1212 : }
1213 :
1214 : #[cfg(test)]
1215 : mod throttle_config_tests {
1216 : use super::*;
1217 :
1218 : #[test]
1219 1 : fn test_disabled_is_disabled() {
1220 1 : let config = ThrottleConfig::disabled();
1221 1 : assert!(!config.enabled.is_enabled());
1222 1 : }
1223 : #[test]
1224 1 : fn test_enabled_backwards_compat() {
1225 1 : let input = serde_json::json!({
1226 1 : "task_kinds": ["PageRequestHandler"],
1227 1 : "initial": 40000,
1228 1 : "refill_interval": "50ms",
1229 1 : "refill_amount": 1000,
1230 1 : "max": 40000,
1231 1 : "fair": true
1232 1 : });
1233 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
1234 1 : assert!(config.enabled.is_enabled());
1235 1 : }
1236 : }
1237 :
1238 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
1239 : /// lists out all possible states (and the virtual "Detached" state)
1240 : /// in a flat form rather than using rust-style enums.
1241 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
1242 : pub enum LocationConfigMode {
1243 : AttachedSingle,
1244 : AttachedMulti,
1245 : AttachedStale,
1246 : Secondary,
1247 : Detached,
1248 : }
1249 :
1250 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1251 : pub struct LocationConfigSecondary {
1252 : pub warm: bool,
1253 : }
1254 :
1255 : /// An alternative representation of `pageserver::tenant::LocationConf`,
1256 : /// for use in external-facing APIs.
1257 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1258 : pub struct LocationConfig {
1259 : pub mode: LocationConfigMode,
1260 : /// If attaching, in what generation?
1261 : #[serde(default)]
1262 : pub generation: Option<u32>,
1263 :
1264 : // If requesting mode `Secondary`, configuration for that.
1265 : #[serde(default)]
1266 : pub secondary_conf: Option<LocationConfigSecondary>,
1267 :
1268 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
1269 : // must be set accurately.
1270 : #[serde(default)]
1271 : pub shard_number: u8,
1272 : #[serde(default)]
1273 : pub shard_count: u8,
1274 : #[serde(default)]
1275 : pub shard_stripe_size: u32,
1276 :
1277 : // This configuration only affects attached mode, but should be provided irrespective
1278 : // of the mode, as a secondary location might transition on startup if the response
1279 : // to the `/re-attach` control plane API requests it.
1280 : pub tenant_conf: TenantConfig,
1281 : }
1282 :
1283 0 : #[derive(Serialize, Deserialize)]
1284 : pub struct LocationConfigListResponse {
1285 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
1286 : }
1287 :
1288 : #[derive(Serialize)]
1289 : pub struct StatusResponse {
1290 : pub id: NodeId,
1291 : }
1292 :
1293 0 : #[derive(Serialize, Deserialize, Debug)]
1294 : #[serde(deny_unknown_fields)]
1295 : pub struct TenantLocationConfigRequest {
1296 : #[serde(flatten)]
1297 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
1298 : }
1299 :
1300 0 : #[derive(Serialize, Deserialize, Debug)]
1301 : #[serde(deny_unknown_fields)]
1302 : pub struct TenantTimeTravelRequest {
1303 : pub shard_counts: Vec<ShardCount>,
1304 : }
1305 :
1306 0 : #[derive(Serialize, Deserialize, Debug)]
1307 : #[serde(deny_unknown_fields)]
1308 : pub struct TenantShardLocation {
1309 : pub shard_id: TenantShardId,
1310 : pub node_id: NodeId,
1311 : }
1312 :
1313 0 : #[derive(Serialize, Deserialize, Debug)]
1314 : #[serde(deny_unknown_fields)]
1315 : pub struct TenantLocationConfigResponse {
1316 : pub shards: Vec<TenantShardLocation>,
1317 : // If the shards' ShardCount count is >1, stripe_size will be set.
1318 : pub stripe_size: Option<ShardStripeSize>,
1319 : }
1320 :
1321 2 : #[derive(Serialize, Deserialize, Debug)]
1322 : #[serde(deny_unknown_fields)]
1323 : pub struct TenantConfigRequest {
1324 : pub tenant_id: TenantId,
1325 : #[serde(flatten)]
1326 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
1327 : }
1328 :
1329 : impl std::ops::Deref for TenantConfigRequest {
1330 : type Target = TenantConfig;
1331 :
1332 0 : fn deref(&self) -> &Self::Target {
1333 0 : &self.config
1334 0 : }
1335 : }
1336 :
1337 : impl TenantConfigRequest {
1338 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
1339 0 : let config = TenantConfig::default();
1340 0 : TenantConfigRequest { tenant_id, config }
1341 0 : }
1342 : }
1343 :
1344 3 : #[derive(Serialize, Deserialize, Debug)]
1345 : #[serde(deny_unknown_fields)]
1346 : pub struct TenantConfigPatchRequest {
1347 : pub tenant_id: TenantId,
1348 : #[serde(flatten)]
1349 : pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
1350 : }
1351 :
1352 0 : #[derive(Serialize, Deserialize, Debug)]
1353 : pub struct TenantWaitLsnRequest {
1354 : #[serde(flatten)]
1355 : pub timelines: HashMap<TimelineId, Lsn>,
1356 : pub timeout: Duration,
1357 : }
1358 :
1359 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
1360 0 : #[derive(Serialize, Deserialize, Clone)]
1361 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
1362 : pub enum TenantAttachmentStatus {
1363 : Maybe,
1364 : Attached,
1365 : Failed { reason: String },
1366 : }
1367 :
1368 0 : #[derive(Serialize, Deserialize, Clone)]
1369 : pub struct TenantInfo {
1370 : pub id: TenantShardId,
1371 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
1372 : pub state: TenantState,
1373 : /// Sum of the size of all layer files.
1374 : /// If a layer is present in both local FS and S3, it counts only once.
1375 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
1376 : pub attachment_status: TenantAttachmentStatus,
1377 : pub generation: u32,
1378 :
1379 : /// Opaque explanation if gc is being blocked.
1380 : ///
1381 : /// Only looked up for the individual tenant detail, not the listing.
1382 : #[serde(skip_serializing_if = "Option::is_none")]
1383 : pub gc_blocking: Option<String>,
1384 : }
1385 :
1386 0 : #[derive(Serialize, Deserialize, Clone)]
1387 : pub struct TenantDetails {
1388 : #[serde(flatten)]
1389 : pub tenant_info: TenantInfo,
1390 :
1391 : pub walredo: Option<WalRedoManagerStatus>,
1392 :
1393 : pub timelines: Vec<TimelineId>,
1394 : }
1395 :
1396 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
1397 : pub enum TimelineArchivalState {
1398 : Archived,
1399 : Unarchived,
1400 : }
1401 :
1402 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1403 : pub enum TimelineVisibilityState {
1404 : Visible,
1405 : Invisible,
1406 : }
1407 :
1408 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1409 : pub struct TimelineArchivalConfigRequest {
1410 : pub state: TimelineArchivalState,
1411 : }
1412 :
1413 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1414 : pub struct TimelinePatchIndexPartRequest {
1415 : pub rel_size_migration: Option<RelSizeMigration>,
1416 : pub gc_compaction_last_completed_lsn: Option<Lsn>,
1417 : pub applied_gc_cutoff_lsn: Option<Lsn>,
1418 : #[serde(default)]
1419 : pub force_index_update: bool,
1420 : }
1421 :
1422 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1423 : pub struct TimelinesInfoAndOffloaded {
1424 : pub timelines: Vec<TimelineInfo>,
1425 : pub offloaded: Vec<OffloadedTimelineInfo>,
1426 : }
1427 :
1428 : /// Analog of [`TimelineInfo`] for offloaded timelines.
1429 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1430 : pub struct OffloadedTimelineInfo {
1431 : pub tenant_id: TenantShardId,
1432 : pub timeline_id: TimelineId,
1433 : /// Whether the timeline has a parent it has been branched off from or not
1434 : pub ancestor_timeline_id: Option<TimelineId>,
1435 : /// Whether to retain the branch lsn at the ancestor or not
1436 : pub ancestor_retain_lsn: Option<Lsn>,
1437 : /// The time point when the timeline was archived
1438 : pub archived_at: chrono::DateTime<chrono::Utc>,
1439 : }
1440 :
1441 48 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1442 : #[serde(rename_all = "camelCase")]
1443 : pub enum RelSizeMigration {
1444 : /// The tenant is using the old rel_size format.
1445 : /// Note that this enum is persisted as `Option<RelSizeMigration>` in the index part, so
1446 : /// `None` is the same as `Some(RelSizeMigration::Legacy)`.
1447 : Legacy,
1448 : /// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
1449 : /// persisted in the index part. The read path will read both formats and merge them.
1450 : Migrating,
1451 : /// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted
1452 : /// in the index part, and the read path will not read the old format.
1453 : Migrated,
1454 : }
1455 :
1456 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
1457 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1458 : pub struct TimelineInfo {
1459 : pub tenant_id: TenantShardId,
1460 : pub timeline_id: TimelineId,
1461 :
1462 : pub ancestor_timeline_id: Option<TimelineId>,
1463 : pub ancestor_lsn: Option<Lsn>,
1464 : pub last_record_lsn: Lsn,
1465 : pub prev_record_lsn: Option<Lsn>,
1466 :
1467 : /// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
1468 : /// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
1469 : /// as it is easier to reason about.
1470 : #[serde(default)]
1471 : pub applied_gc_cutoff_lsn: Lsn,
1472 :
1473 : /// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
1474 : /// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
1475 : /// LSN at which it is legal to create a branch or ephemeral endpoint.
1476 : ///
1477 : /// Note that holders of valid LSN leases may be able to create branches and read pages earlier
1478 : /// than this LSN, but new leases may not be taken out earlier than this LSN.
1479 : #[serde(default)]
1480 : pub min_readable_lsn: Lsn,
1481 :
1482 : pub disk_consistent_lsn: Lsn,
1483 :
1484 : /// The LSN that we have succesfully uploaded to remote storage
1485 : pub remote_consistent_lsn: Lsn,
1486 :
1487 : /// The LSN that we are advertizing to safekeepers
1488 : pub remote_consistent_lsn_visible: Lsn,
1489 :
1490 : /// The LSN from the start of the root timeline (never changes)
1491 : pub initdb_lsn: Lsn,
1492 :
1493 : pub current_logical_size: u64,
1494 : pub current_logical_size_is_accurate: bool,
1495 :
1496 : pub directory_entries_counts: Vec<u64>,
1497 :
1498 : /// Sum of the size of all layer files.
1499 : /// If a layer is present in both local FS and S3, it counts only once.
1500 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
1501 : pub current_logical_size_non_incremental: Option<u64>,
1502 :
1503 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
1504 : /// beyond the branch's branch point, we only count up to the branch point.
1505 : pub pitr_history_size: u64,
1506 :
1507 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
1508 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
1509 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
1510 : /// otherwise be able to GC.
1511 : pub within_ancestor_pitr: bool,
1512 :
1513 : pub timeline_dir_layer_file_size_sum: Option<u64>,
1514 :
1515 : pub wal_source_connstr: Option<String>,
1516 : pub last_received_msg_lsn: Option<Lsn>,
1517 : /// the timestamp (in microseconds) of the last received message
1518 : pub last_received_msg_ts: Option<u128>,
1519 : pub pg_version: u32,
1520 :
1521 : pub state: TimelineState,
1522 :
1523 : pub walreceiver_status: String,
1524 :
1525 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
1526 : // Backward compatibility: you will get a JSON not containing the newly-added field.
1527 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
1528 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
1529 : // read.
1530 : /// Whether the timeline is archived.
1531 : pub is_archived: Option<bool>,
1532 :
1533 : /// The status of the rel_size migration.
1534 : pub rel_size_migration: Option<RelSizeMigration>,
1535 :
1536 : /// Whether the timeline is invisible in synthetic size calculations.
1537 : pub is_invisible: Option<bool>,
1538 : }
1539 :
1540 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1541 : pub struct LayerMapInfo {
1542 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
1543 : pub historic_layers: Vec<HistoricLayerInfo>,
1544 : }
1545 :
1546 : /// The residence status of a layer
1547 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1548 : pub enum LayerResidenceStatus {
1549 : /// Residence status for a layer file that exists locally.
1550 : /// It may also exist on the remote, we don't care here.
1551 : Resident,
1552 : /// Residence status for a layer file that only exists on the remote.
1553 : Evicted,
1554 : }
1555 :
1556 : #[serde_as]
1557 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1558 : pub struct LayerAccessStats {
1559 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1560 : pub access_time: SystemTime,
1561 :
1562 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1563 : pub residence_time: SystemTime,
1564 :
1565 : pub visible: bool,
1566 : }
1567 :
1568 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1569 : #[serde(tag = "kind")]
1570 : pub enum InMemoryLayerInfo {
1571 : Open { lsn_start: Lsn },
1572 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
1573 : }
1574 :
1575 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1576 : #[serde(tag = "kind")]
1577 : pub enum HistoricLayerInfo {
1578 : Delta {
1579 : layer_file_name: String,
1580 : layer_file_size: u64,
1581 :
1582 : lsn_start: Lsn,
1583 : lsn_end: Lsn,
1584 : remote: bool,
1585 : access_stats: LayerAccessStats,
1586 :
1587 : l0: bool,
1588 : },
1589 : Image {
1590 : layer_file_name: String,
1591 : layer_file_size: u64,
1592 :
1593 : lsn_start: Lsn,
1594 : remote: bool,
1595 : access_stats: LayerAccessStats,
1596 : },
1597 : }
1598 :
1599 : impl HistoricLayerInfo {
1600 0 : pub fn layer_file_name(&self) -> &str {
1601 0 : match self {
1602 : HistoricLayerInfo::Delta {
1603 0 : layer_file_name, ..
1604 0 : } => layer_file_name,
1605 : HistoricLayerInfo::Image {
1606 0 : layer_file_name, ..
1607 0 : } => layer_file_name,
1608 : }
1609 0 : }
1610 0 : pub fn is_remote(&self) -> bool {
1611 0 : match self {
1612 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
1613 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
1614 : }
1615 0 : }
1616 0 : pub fn set_remote(&mut self, value: bool) {
1617 0 : let field = match self {
1618 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
1619 0 : HistoricLayerInfo::Image { remote, .. } => remote,
1620 : };
1621 0 : *field = value;
1622 0 : }
1623 0 : pub fn layer_file_size(&self) -> u64 {
1624 0 : match self {
1625 : HistoricLayerInfo::Delta {
1626 0 : layer_file_size, ..
1627 0 : } => *layer_file_size,
1628 : HistoricLayerInfo::Image {
1629 0 : layer_file_size, ..
1630 0 : } => *layer_file_size,
1631 : }
1632 0 : }
1633 : }
1634 :
1635 0 : #[derive(Debug, Serialize, Deserialize)]
1636 : pub struct DownloadRemoteLayersTaskSpawnRequest {
1637 : pub max_concurrent_downloads: NonZeroUsize,
1638 : }
1639 :
1640 0 : #[derive(Debug, Serialize, Deserialize)]
1641 : pub struct IngestAuxFilesRequest {
1642 : pub aux_files: HashMap<String, String>,
1643 : }
1644 :
1645 0 : #[derive(Debug, Serialize, Deserialize)]
1646 : pub struct ListAuxFilesRequest {
1647 : pub lsn: Lsn,
1648 : }
1649 :
1650 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1651 : pub struct DownloadRemoteLayersTaskInfo {
1652 : pub task_id: String,
1653 : pub state: DownloadRemoteLayersTaskState,
1654 : pub total_layer_count: u64, // stable once `completed`
1655 : pub successful_download_count: u64, // stable once `completed`
1656 : pub failed_download_count: u64, // stable once `completed`
1657 : }
1658 :
1659 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1660 : pub enum DownloadRemoteLayersTaskState {
1661 : Running,
1662 : Completed,
1663 : ShutDown,
1664 : }
1665 :
1666 0 : #[derive(Debug, Serialize, Deserialize)]
1667 : pub struct TimelineGcRequest {
1668 : pub gc_horizon: Option<u64>,
1669 : }
1670 :
1671 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1672 : pub struct WalRedoManagerProcessStatus {
1673 : pub pid: u32,
1674 : }
1675 :
1676 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1677 : pub struct WalRedoManagerStatus {
1678 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
1679 : pub process: Option<WalRedoManagerProcessStatus>,
1680 : }
1681 :
1682 : /// The progress of a secondary tenant.
1683 : ///
1684 : /// It is mostly useful when doing a long running download: e.g. initiating
1685 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
1686 : /// what's happening.
1687 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
1688 : pub struct SecondaryProgress {
1689 : /// The remote storage LastModified time of the heatmap object we last downloaded.
1690 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
1691 :
1692 : /// The number of layers currently on-disk
1693 : pub layers_downloaded: usize,
1694 : /// The number of layers in the most recently seen heatmap
1695 : pub layers_total: usize,
1696 :
1697 : /// The number of layer bytes currently on-disk
1698 : pub bytes_downloaded: u64,
1699 : /// The number of layer bytes in the most recently seen heatmap
1700 : pub bytes_total: u64,
1701 : }
1702 :
1703 0 : #[derive(Serialize, Deserialize, Debug)]
1704 : pub struct TenantScanRemoteStorageShard {
1705 : pub tenant_shard_id: TenantShardId,
1706 : pub generation: Option<u32>,
1707 : pub stripe_size: Option<ShardStripeSize>,
1708 : }
1709 :
1710 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1711 : pub struct TenantScanRemoteStorageResponse {
1712 : pub shards: Vec<TenantScanRemoteStorageShard>,
1713 : }
1714 :
1715 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1716 : #[serde(rename_all = "snake_case")]
1717 : pub enum TenantSorting {
1718 : /// Total size of layers on local disk for all timelines in a shard.
1719 : ResidentSize,
1720 : /// The logical size of the largest timeline within a _tenant_ (not shard). Only tracked on
1721 : /// shard 0, contains the sum across all shards.
1722 : MaxLogicalSize,
1723 : /// The logical size of the largest timeline within a _tenant_ (not shard), divided by number of
1724 : /// shards. Only tracked on shard 0, and estimates the per-shard logical size.
1725 : MaxLogicalSizePerShard,
1726 : }
1727 :
1728 : impl Default for TenantSorting {
1729 0 : fn default() -> Self {
1730 0 : Self::ResidentSize
1731 0 : }
1732 : }
1733 :
1734 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1735 : pub struct TopTenantShardsRequest {
1736 : // How would you like to sort the tenants?
1737 : pub order_by: TenantSorting,
1738 :
1739 : // How many results?
1740 : pub limit: usize,
1741 :
1742 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1743 : // that the caller would ever split to)
1744 : pub where_shards_lt: Option<ShardCount>,
1745 :
1746 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1747 : // let us quickly exclude numerous tiny shards)
1748 : pub where_gt: Option<u64>,
1749 : }
1750 :
1751 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1752 : pub struct TopTenantShardItem {
1753 : pub id: TenantShardId,
1754 :
1755 : /// Total size of layers on local disk for all timelines in this shard.
1756 : pub resident_size: u64,
1757 :
1758 : /// Total size of layers in remote storage for all timelines in this shard.
1759 : pub physical_size: u64,
1760 :
1761 : /// The largest logical size of a timeline within this _tenant_ (not shard). This is only
1762 : /// tracked on shard 0, and contains the sum of the logical size across all shards.
1763 : pub max_logical_size: u64,
1764 :
1765 : /// The largest logical size of a timeline within this _tenant_ (not shard) divided by number of
1766 : /// shards. This is only tracked on shard 0, and is only an estimate as we divide it evenly by
1767 : /// shard count, rounded up.
1768 : pub max_logical_size_per_shard: u64,
1769 : }
1770 :
1771 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1772 : pub struct TopTenantShardsResponse {
1773 : pub shards: Vec<TopTenantShardItem>,
1774 : }
1775 :
1776 : pub mod virtual_file {
1777 : #[derive(
1778 : Copy,
1779 : Clone,
1780 : PartialEq,
1781 : Eq,
1782 : Hash,
1783 0 : strum_macros::EnumString,
1784 : strum_macros::Display,
1785 0 : serde_with::DeserializeFromStr,
1786 : serde_with::SerializeDisplay,
1787 : Debug,
1788 : )]
1789 : #[strum(serialize_all = "kebab-case")]
1790 : pub enum IoEngineKind {
1791 : StdFs,
1792 : #[cfg(target_os = "linux")]
1793 : TokioEpollUring,
1794 : }
1795 :
1796 : /// Direct IO modes for a pageserver.
1797 : #[derive(
1798 : Copy,
1799 : Clone,
1800 : PartialEq,
1801 : Eq,
1802 : Hash,
1803 0 : strum_macros::EnumString,
1804 : strum_macros::Display,
1805 0 : serde_with::DeserializeFromStr,
1806 : serde_with::SerializeDisplay,
1807 : Debug,
1808 : )]
1809 : #[strum(serialize_all = "kebab-case")]
1810 : #[repr(u8)]
1811 : pub enum IoMode {
1812 : /// Uses buffered IO.
1813 : Buffered,
1814 : /// Uses direct IO, error out if the operation fails.
1815 : #[cfg(target_os = "linux")]
1816 : Direct,
1817 : }
1818 :
1819 : impl IoMode {
1820 2808 : pub fn preferred() -> Self {
1821 2808 : // The default behavior when running Rust unit tests without any further
1822 2808 : // flags is to use the newest behavior if available on the platform (Direct).
1823 2808 : // The CI uses the following environment variable to unit tests for all
1824 2808 : // different modes.
1825 2808 : // NB: the Python regression & perf tests have their own defaults management
1826 2808 : // that writes pageserver.toml; they do not use this variable.
1827 2808 : if cfg!(test) {
1828 : use once_cell::sync::Lazy;
1829 0 : static CACHED: Lazy<IoMode> = Lazy::new(|| {
1830 0 : utils::env::var_serde_json_string(
1831 0 : "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
1832 0 : )
1833 0 : .unwrap_or({
1834 0 : #[cfg(target_os = "linux")]
1835 0 : {
1836 0 : IoMode::Direct
1837 0 : }
1838 0 : #[cfg(not(target_os = "linux"))]
1839 0 : {
1840 0 : IoMode::Buffered
1841 0 : }
1842 0 : })
1843 0 : });
1844 0 : *CACHED
1845 : } else {
1846 2808 : IoMode::Buffered
1847 : }
1848 2808 : }
1849 : }
1850 :
1851 : impl TryFrom<u8> for IoMode {
1852 : type Error = u8;
1853 :
1854 15468 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1855 15468 : Ok(match value {
1856 15468 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1857 : #[cfg(target_os = "linux")]
1858 0 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1859 0 : x => return Err(x),
1860 : })
1861 15468 : }
1862 : }
1863 : }
1864 :
1865 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1866 : pub struct ScanDisposableKeysResponse {
1867 : pub disposable_count: usize,
1868 : pub not_disposable_count: usize,
1869 : }
1870 :
1871 : // Wrapped in libpq CopyData
1872 : #[derive(PartialEq, Eq, Debug)]
1873 : pub enum PagestreamFeMessage {
1874 : Exists(PagestreamExistsRequest),
1875 : Nblocks(PagestreamNblocksRequest),
1876 : GetPage(PagestreamGetPageRequest),
1877 : DbSize(PagestreamDbSizeRequest),
1878 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1879 : #[cfg(feature = "testing")]
1880 : Test(PagestreamTestRequest),
1881 : }
1882 :
1883 : // Wrapped in libpq CopyData
1884 : #[derive(strum_macros::EnumProperty)]
1885 : pub enum PagestreamBeMessage {
1886 : Exists(PagestreamExistsResponse),
1887 : Nblocks(PagestreamNblocksResponse),
1888 : GetPage(PagestreamGetPageResponse),
1889 : Error(PagestreamErrorResponse),
1890 : DbSize(PagestreamDbSizeResponse),
1891 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1892 : #[cfg(feature = "testing")]
1893 : Test(PagestreamTestResponse),
1894 : }
1895 :
1896 : // Keep in sync with `pagestore_client.h`
1897 : #[repr(u8)]
1898 : enum PagestreamFeMessageTag {
1899 : Exists = 0,
1900 : Nblocks = 1,
1901 : GetPage = 2,
1902 : DbSize = 3,
1903 : GetSlruSegment = 4,
1904 : /* future tags above this line */
1905 : /// For testing purposes, not available in production.
1906 : #[cfg(feature = "testing")]
1907 : Test = 99,
1908 : }
1909 :
1910 : // Keep in sync with `pagestore_client.h`
1911 : #[repr(u8)]
1912 : enum PagestreamBeMessageTag {
1913 : Exists = 100,
1914 : Nblocks = 101,
1915 : GetPage = 102,
1916 : Error = 103,
1917 : DbSize = 104,
1918 : GetSlruSegment = 105,
1919 : /* future tags above this line */
1920 : /// For testing purposes, not available in production.
1921 : #[cfg(feature = "testing")]
1922 : Test = 199,
1923 : }
1924 :
1925 : impl TryFrom<u8> for PagestreamFeMessageTag {
1926 : type Error = u8;
1927 4 : fn try_from(value: u8) -> Result<Self, u8> {
1928 4 : match value {
1929 1 : 0 => Ok(PagestreamFeMessageTag::Exists),
1930 1 : 1 => Ok(PagestreamFeMessageTag::Nblocks),
1931 1 : 2 => Ok(PagestreamFeMessageTag::GetPage),
1932 1 : 3 => Ok(PagestreamFeMessageTag::DbSize),
1933 0 : 4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
1934 : #[cfg(feature = "testing")]
1935 0 : 99 => Ok(PagestreamFeMessageTag::Test),
1936 0 : _ => Err(value),
1937 : }
1938 4 : }
1939 : }
1940 :
1941 : impl TryFrom<u8> for PagestreamBeMessageTag {
1942 : type Error = u8;
1943 0 : fn try_from(value: u8) -> Result<Self, u8> {
1944 0 : match value {
1945 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1946 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1947 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1948 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1949 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1950 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1951 : #[cfg(feature = "testing")]
1952 0 : 199 => Ok(PagestreamBeMessageTag::Test),
1953 0 : _ => Err(value),
1954 : }
1955 0 : }
1956 : }
1957 :
1958 : // A GetPage request contains two LSN values:
1959 : //
1960 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
1961 : // "get the latest version present". It's used by the primary server, which knows that no one else
1962 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
1963 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
1964 : //
1965 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
1966 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
1967 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
1968 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
1969 : // request without waiting for 'request_lsn' to arrive.
1970 : //
1971 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
1972 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
1973 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
1974 : // standby to request a page at a particular non-latest LSN, and also include the
1975 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
1976 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
1977 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
1978 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
1979 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
1980 : // difference in the responses between V1 and V2.
1981 : //
1982 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
1983 : // as well as other fields from requests, which allows to verify that we receive response for our request.
1984 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
1985 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
1986 : //
1987 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1988 : pub enum PagestreamProtocolVersion {
1989 : V2,
1990 : V3,
1991 : }
1992 :
1993 : pub type RequestId = u64;
1994 :
1995 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1996 : pub struct PagestreamRequest {
1997 : pub reqid: RequestId,
1998 : pub request_lsn: Lsn,
1999 : pub not_modified_since: Lsn,
2000 : }
2001 :
2002 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2003 : pub struct PagestreamExistsRequest {
2004 : pub hdr: PagestreamRequest,
2005 : pub rel: RelTag,
2006 : }
2007 :
2008 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2009 : pub struct PagestreamNblocksRequest {
2010 : pub hdr: PagestreamRequest,
2011 : pub rel: RelTag,
2012 : }
2013 :
2014 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2015 : pub struct PagestreamGetPageRequest {
2016 : pub hdr: PagestreamRequest,
2017 : pub rel: RelTag,
2018 : pub blkno: u32,
2019 : }
2020 :
2021 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2022 : pub struct PagestreamDbSizeRequest {
2023 : pub hdr: PagestreamRequest,
2024 : pub dbnode: u32,
2025 : }
2026 :
2027 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
2028 : pub struct PagestreamGetSlruSegmentRequest {
2029 : pub hdr: PagestreamRequest,
2030 : pub kind: u8,
2031 : pub segno: u32,
2032 : }
2033 :
2034 : #[derive(Debug)]
2035 : pub struct PagestreamExistsResponse {
2036 : pub req: PagestreamExistsRequest,
2037 : pub exists: bool,
2038 : }
2039 :
2040 : #[derive(Debug)]
2041 : pub struct PagestreamNblocksResponse {
2042 : pub req: PagestreamNblocksRequest,
2043 : pub n_blocks: u32,
2044 : }
2045 :
2046 : #[derive(Debug)]
2047 : pub struct PagestreamGetPageResponse {
2048 : pub req: PagestreamGetPageRequest,
2049 : pub page: Bytes,
2050 : }
2051 :
2052 : #[derive(Debug)]
2053 : pub struct PagestreamGetSlruSegmentResponse {
2054 : pub req: PagestreamGetSlruSegmentRequest,
2055 : pub segment: Bytes,
2056 : }
2057 :
2058 : #[derive(Debug)]
2059 : pub struct PagestreamErrorResponse {
2060 : pub req: PagestreamRequest,
2061 : pub message: String,
2062 : }
2063 :
2064 : #[derive(Debug)]
2065 : pub struct PagestreamDbSizeResponse {
2066 : pub req: PagestreamDbSizeRequest,
2067 : pub db_size: i64,
2068 : }
2069 :
2070 : #[cfg(feature = "testing")]
2071 : #[derive(Debug, PartialEq, Eq, Clone)]
2072 : pub struct PagestreamTestRequest {
2073 : pub hdr: PagestreamRequest,
2074 : pub batch_key: u64,
2075 : pub message: String,
2076 : }
2077 :
2078 : #[cfg(feature = "testing")]
2079 : #[derive(Debug)]
2080 : pub struct PagestreamTestResponse {
2081 : pub req: PagestreamTestRequest,
2082 : }
2083 :
2084 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
2085 : // that require pageserver-internal types. It is sufficient to get the total size.
2086 0 : #[derive(Serialize, Deserialize, Debug)]
2087 : pub struct TenantHistorySize {
2088 : pub id: TenantId,
2089 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
2090 : ///
2091 : /// Will be none if `?inputs_only=true` was given.
2092 : pub size: Option<u64>,
2093 : }
2094 :
2095 : impl PagestreamFeMessage {
2096 : /// Serialize a compute -> pageserver message. This is currently only used in testing
2097 : /// tools. Always uses protocol version 3.
2098 4 : pub fn serialize(&self) -> Bytes {
2099 4 : let mut bytes = BytesMut::new();
2100 4 :
2101 4 : match self {
2102 1 : Self::Exists(req) => {
2103 1 : bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
2104 1 : bytes.put_u64(req.hdr.reqid);
2105 1 : bytes.put_u64(req.hdr.request_lsn.0);
2106 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2107 1 : bytes.put_u32(req.rel.spcnode);
2108 1 : bytes.put_u32(req.rel.dbnode);
2109 1 : bytes.put_u32(req.rel.relnode);
2110 1 : bytes.put_u8(req.rel.forknum);
2111 1 : }
2112 :
2113 1 : Self::Nblocks(req) => {
2114 1 : bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
2115 1 : bytes.put_u64(req.hdr.reqid);
2116 1 : bytes.put_u64(req.hdr.request_lsn.0);
2117 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2118 1 : bytes.put_u32(req.rel.spcnode);
2119 1 : bytes.put_u32(req.rel.dbnode);
2120 1 : bytes.put_u32(req.rel.relnode);
2121 1 : bytes.put_u8(req.rel.forknum);
2122 1 : }
2123 :
2124 1 : Self::GetPage(req) => {
2125 1 : bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
2126 1 : bytes.put_u64(req.hdr.reqid);
2127 1 : bytes.put_u64(req.hdr.request_lsn.0);
2128 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2129 1 : bytes.put_u32(req.rel.spcnode);
2130 1 : bytes.put_u32(req.rel.dbnode);
2131 1 : bytes.put_u32(req.rel.relnode);
2132 1 : bytes.put_u8(req.rel.forknum);
2133 1 : bytes.put_u32(req.blkno);
2134 1 : }
2135 :
2136 1 : Self::DbSize(req) => {
2137 1 : bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
2138 1 : bytes.put_u64(req.hdr.reqid);
2139 1 : bytes.put_u64(req.hdr.request_lsn.0);
2140 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2141 1 : bytes.put_u32(req.dbnode);
2142 1 : }
2143 :
2144 0 : Self::GetSlruSegment(req) => {
2145 0 : bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
2146 0 : bytes.put_u64(req.hdr.reqid);
2147 0 : bytes.put_u64(req.hdr.request_lsn.0);
2148 0 : bytes.put_u64(req.hdr.not_modified_since.0);
2149 0 : bytes.put_u8(req.kind);
2150 0 : bytes.put_u32(req.segno);
2151 0 : }
2152 : #[cfg(feature = "testing")]
2153 0 : Self::Test(req) => {
2154 0 : bytes.put_u8(PagestreamFeMessageTag::Test as u8);
2155 0 : bytes.put_u64(req.hdr.reqid);
2156 0 : bytes.put_u64(req.hdr.request_lsn.0);
2157 0 : bytes.put_u64(req.hdr.not_modified_since.0);
2158 0 : bytes.put_u64(req.batch_key);
2159 0 : let message = req.message.as_bytes();
2160 0 : bytes.put_u64(message.len() as u64);
2161 0 : bytes.put_slice(message);
2162 0 : }
2163 : }
2164 :
2165 4 : bytes.into()
2166 4 : }
2167 :
2168 4 : pub fn parse<R: std::io::Read>(
2169 4 : body: &mut R,
2170 4 : protocol_version: PagestreamProtocolVersion,
2171 4 : ) -> anyhow::Result<PagestreamFeMessage> {
2172 : // these correspond to the NeonMessageTag enum in pagestore_client.h
2173 : //
2174 : // TODO: consider using protobuf or serde bincode for less error prone
2175 : // serialization.
2176 4 : let msg_tag = body.read_u8()?;
2177 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
2178 : PagestreamProtocolVersion::V2 => (
2179 : 0,
2180 0 : Lsn::from(body.read_u64::<BigEndian>()?),
2181 0 : Lsn::from(body.read_u64::<BigEndian>()?),
2182 : ),
2183 : PagestreamProtocolVersion::V3 => (
2184 4 : body.read_u64::<BigEndian>()?,
2185 4 : Lsn::from(body.read_u64::<BigEndian>()?),
2186 4 : Lsn::from(body.read_u64::<BigEndian>()?),
2187 : ),
2188 : };
2189 :
2190 4 : match PagestreamFeMessageTag::try_from(msg_tag)
2191 4 : .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
2192 : {
2193 : PagestreamFeMessageTag::Exists => {
2194 : Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
2195 1 : hdr: PagestreamRequest {
2196 1 : reqid,
2197 1 : request_lsn,
2198 1 : not_modified_since,
2199 1 : },
2200 1 : rel: RelTag {
2201 1 : spcnode: body.read_u32::<BigEndian>()?,
2202 1 : dbnode: body.read_u32::<BigEndian>()?,
2203 1 : relnode: body.read_u32::<BigEndian>()?,
2204 1 : forknum: body.read_u8()?,
2205 : },
2206 : }))
2207 : }
2208 : PagestreamFeMessageTag::Nblocks => {
2209 : Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2210 1 : hdr: PagestreamRequest {
2211 1 : reqid,
2212 1 : request_lsn,
2213 1 : not_modified_since,
2214 1 : },
2215 1 : rel: RelTag {
2216 1 : spcnode: body.read_u32::<BigEndian>()?,
2217 1 : dbnode: body.read_u32::<BigEndian>()?,
2218 1 : relnode: body.read_u32::<BigEndian>()?,
2219 1 : forknum: body.read_u8()?,
2220 : },
2221 : }))
2222 : }
2223 : PagestreamFeMessageTag::GetPage => {
2224 : Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2225 1 : hdr: PagestreamRequest {
2226 1 : reqid,
2227 1 : request_lsn,
2228 1 : not_modified_since,
2229 1 : },
2230 1 : rel: RelTag {
2231 1 : spcnode: body.read_u32::<BigEndian>()?,
2232 1 : dbnode: body.read_u32::<BigEndian>()?,
2233 1 : relnode: body.read_u32::<BigEndian>()?,
2234 1 : forknum: body.read_u8()?,
2235 : },
2236 1 : blkno: body.read_u32::<BigEndian>()?,
2237 : }))
2238 : }
2239 : PagestreamFeMessageTag::DbSize => {
2240 : Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2241 1 : hdr: PagestreamRequest {
2242 1 : reqid,
2243 1 : request_lsn,
2244 1 : not_modified_since,
2245 1 : },
2246 1 : dbnode: body.read_u32::<BigEndian>()?,
2247 : }))
2248 : }
2249 : PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
2250 : PagestreamGetSlruSegmentRequest {
2251 0 : hdr: PagestreamRequest {
2252 0 : reqid,
2253 0 : request_lsn,
2254 0 : not_modified_since,
2255 0 : },
2256 0 : kind: body.read_u8()?,
2257 0 : segno: body.read_u32::<BigEndian>()?,
2258 : },
2259 : )),
2260 : #[cfg(feature = "testing")]
2261 : PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
2262 0 : hdr: PagestreamRequest {
2263 0 : reqid,
2264 0 : request_lsn,
2265 0 : not_modified_since,
2266 0 : },
2267 0 : batch_key: body.read_u64::<BigEndian>()?,
2268 : message: {
2269 0 : let len = body.read_u64::<BigEndian>()?;
2270 0 : let mut buf = vec![0; len as usize];
2271 0 : body.read_exact(&mut buf)?;
2272 0 : String::from_utf8(buf)?
2273 : },
2274 : })),
2275 : }
2276 4 : }
2277 : }
2278 :
2279 : impl PagestreamBeMessage {
2280 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
2281 0 : let mut bytes = BytesMut::new();
2282 :
2283 : use PagestreamBeMessageTag as Tag;
2284 0 : match protocol_version {
2285 : PagestreamProtocolVersion::V2 => {
2286 0 : match self {
2287 0 : Self::Exists(resp) => {
2288 0 : bytes.put_u8(Tag::Exists as u8);
2289 0 : bytes.put_u8(resp.exists as u8);
2290 0 : }
2291 :
2292 0 : Self::Nblocks(resp) => {
2293 0 : bytes.put_u8(Tag::Nblocks as u8);
2294 0 : bytes.put_u32(resp.n_blocks);
2295 0 : }
2296 :
2297 0 : Self::GetPage(resp) => {
2298 0 : bytes.put_u8(Tag::GetPage as u8);
2299 0 : bytes.put(&resp.page[..])
2300 : }
2301 :
2302 0 : Self::Error(resp) => {
2303 0 : bytes.put_u8(Tag::Error as u8);
2304 0 : bytes.put(resp.message.as_bytes());
2305 0 : bytes.put_u8(0); // null terminator
2306 0 : }
2307 0 : Self::DbSize(resp) => {
2308 0 : bytes.put_u8(Tag::DbSize as u8);
2309 0 : bytes.put_i64(resp.db_size);
2310 0 : }
2311 :
2312 0 : Self::GetSlruSegment(resp) => {
2313 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2314 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2315 0 : bytes.put(&resp.segment[..]);
2316 0 : }
2317 :
2318 : #[cfg(feature = "testing")]
2319 0 : Self::Test(resp) => {
2320 0 : bytes.put_u8(Tag::Test as u8);
2321 0 : bytes.put_u64(resp.req.batch_key);
2322 0 : let message = resp.req.message.as_bytes();
2323 0 : bytes.put_u64(message.len() as u64);
2324 0 : bytes.put_slice(message);
2325 0 : }
2326 : }
2327 : }
2328 : PagestreamProtocolVersion::V3 => {
2329 0 : match self {
2330 0 : Self::Exists(resp) => {
2331 0 : bytes.put_u8(Tag::Exists as u8);
2332 0 : bytes.put_u64(resp.req.hdr.reqid);
2333 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2334 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2335 0 : bytes.put_u32(resp.req.rel.spcnode);
2336 0 : bytes.put_u32(resp.req.rel.dbnode);
2337 0 : bytes.put_u32(resp.req.rel.relnode);
2338 0 : bytes.put_u8(resp.req.rel.forknum);
2339 0 : bytes.put_u8(resp.exists as u8);
2340 0 : }
2341 :
2342 0 : Self::Nblocks(resp) => {
2343 0 : bytes.put_u8(Tag::Nblocks as u8);
2344 0 : bytes.put_u64(resp.req.hdr.reqid);
2345 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2346 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2347 0 : bytes.put_u32(resp.req.rel.spcnode);
2348 0 : bytes.put_u32(resp.req.rel.dbnode);
2349 0 : bytes.put_u32(resp.req.rel.relnode);
2350 0 : bytes.put_u8(resp.req.rel.forknum);
2351 0 : bytes.put_u32(resp.n_blocks);
2352 0 : }
2353 :
2354 0 : Self::GetPage(resp) => {
2355 0 : bytes.put_u8(Tag::GetPage as u8);
2356 0 : bytes.put_u64(resp.req.hdr.reqid);
2357 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2358 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2359 0 : bytes.put_u32(resp.req.rel.spcnode);
2360 0 : bytes.put_u32(resp.req.rel.dbnode);
2361 0 : bytes.put_u32(resp.req.rel.relnode);
2362 0 : bytes.put_u8(resp.req.rel.forknum);
2363 0 : bytes.put_u32(resp.req.blkno);
2364 0 : bytes.put(&resp.page[..])
2365 : }
2366 :
2367 0 : Self::Error(resp) => {
2368 0 : bytes.put_u8(Tag::Error as u8);
2369 0 : bytes.put_u64(resp.req.reqid);
2370 0 : bytes.put_u64(resp.req.request_lsn.0);
2371 0 : bytes.put_u64(resp.req.not_modified_since.0);
2372 0 : bytes.put(resp.message.as_bytes());
2373 0 : bytes.put_u8(0); // null terminator
2374 0 : }
2375 0 : Self::DbSize(resp) => {
2376 0 : bytes.put_u8(Tag::DbSize as u8);
2377 0 : bytes.put_u64(resp.req.hdr.reqid);
2378 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2379 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2380 0 : bytes.put_u32(resp.req.dbnode);
2381 0 : bytes.put_i64(resp.db_size);
2382 0 : }
2383 :
2384 0 : Self::GetSlruSegment(resp) => {
2385 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2386 0 : bytes.put_u64(resp.req.hdr.reqid);
2387 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2388 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2389 0 : bytes.put_u8(resp.req.kind);
2390 0 : bytes.put_u32(resp.req.segno);
2391 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2392 0 : bytes.put(&resp.segment[..]);
2393 0 : }
2394 :
2395 : #[cfg(feature = "testing")]
2396 0 : Self::Test(resp) => {
2397 0 : bytes.put_u8(Tag::Test as u8);
2398 0 : bytes.put_u64(resp.req.hdr.reqid);
2399 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2400 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2401 0 : bytes.put_u64(resp.req.batch_key);
2402 0 : let message = resp.req.message.as_bytes();
2403 0 : bytes.put_u64(message.len() as u64);
2404 0 : bytes.put_slice(message);
2405 0 : }
2406 : }
2407 : }
2408 : }
2409 0 : bytes.into()
2410 0 : }
2411 :
2412 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
2413 0 : let mut buf = buf.reader();
2414 0 : let msg_tag = buf.read_u8()?;
2415 :
2416 : use PagestreamBeMessageTag as Tag;
2417 0 : let ok =
2418 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
2419 : Tag::Exists => {
2420 0 : let reqid = buf.read_u64::<BigEndian>()?;
2421 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2422 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2423 0 : let rel = RelTag {
2424 0 : spcnode: buf.read_u32::<BigEndian>()?,
2425 0 : dbnode: buf.read_u32::<BigEndian>()?,
2426 0 : relnode: buf.read_u32::<BigEndian>()?,
2427 0 : forknum: buf.read_u8()?,
2428 : };
2429 0 : let exists = buf.read_u8()? != 0;
2430 0 : Self::Exists(PagestreamExistsResponse {
2431 0 : req: PagestreamExistsRequest {
2432 0 : hdr: PagestreamRequest {
2433 0 : reqid,
2434 0 : request_lsn,
2435 0 : not_modified_since,
2436 0 : },
2437 0 : rel,
2438 0 : },
2439 0 : exists,
2440 0 : })
2441 : }
2442 : Tag::Nblocks => {
2443 0 : let reqid = buf.read_u64::<BigEndian>()?;
2444 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2445 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2446 0 : let rel = RelTag {
2447 0 : spcnode: buf.read_u32::<BigEndian>()?,
2448 0 : dbnode: buf.read_u32::<BigEndian>()?,
2449 0 : relnode: buf.read_u32::<BigEndian>()?,
2450 0 : forknum: buf.read_u8()?,
2451 : };
2452 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2453 0 : Self::Nblocks(PagestreamNblocksResponse {
2454 0 : req: PagestreamNblocksRequest {
2455 0 : hdr: PagestreamRequest {
2456 0 : reqid,
2457 0 : request_lsn,
2458 0 : not_modified_since,
2459 0 : },
2460 0 : rel,
2461 0 : },
2462 0 : n_blocks,
2463 0 : })
2464 : }
2465 : Tag::GetPage => {
2466 0 : let reqid = buf.read_u64::<BigEndian>()?;
2467 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2468 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2469 0 : let rel = RelTag {
2470 0 : spcnode: buf.read_u32::<BigEndian>()?,
2471 0 : dbnode: buf.read_u32::<BigEndian>()?,
2472 0 : relnode: buf.read_u32::<BigEndian>()?,
2473 0 : forknum: buf.read_u8()?,
2474 : };
2475 0 : let blkno = buf.read_u32::<BigEndian>()?;
2476 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
2477 0 : buf.read_exact(&mut page)?;
2478 0 : Self::GetPage(PagestreamGetPageResponse {
2479 0 : req: PagestreamGetPageRequest {
2480 0 : hdr: PagestreamRequest {
2481 0 : reqid,
2482 0 : request_lsn,
2483 0 : not_modified_since,
2484 0 : },
2485 0 : rel,
2486 0 : blkno,
2487 0 : },
2488 0 : page: page.into(),
2489 0 : })
2490 : }
2491 : Tag::Error => {
2492 0 : let reqid = buf.read_u64::<BigEndian>()?;
2493 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2494 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2495 0 : let mut msg = Vec::new();
2496 0 : buf.read_until(0, &mut msg)?;
2497 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
2498 0 : let rust_str = cstring.to_str()?;
2499 0 : Self::Error(PagestreamErrorResponse {
2500 0 : req: PagestreamRequest {
2501 0 : reqid,
2502 0 : request_lsn,
2503 0 : not_modified_since,
2504 0 : },
2505 0 : message: rust_str.to_owned(),
2506 0 : })
2507 : }
2508 : Tag::DbSize => {
2509 0 : let reqid = buf.read_u64::<BigEndian>()?;
2510 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2511 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2512 0 : let dbnode = buf.read_u32::<BigEndian>()?;
2513 0 : let db_size = buf.read_i64::<BigEndian>()?;
2514 0 : Self::DbSize(PagestreamDbSizeResponse {
2515 0 : req: PagestreamDbSizeRequest {
2516 0 : hdr: PagestreamRequest {
2517 0 : reqid,
2518 0 : request_lsn,
2519 0 : not_modified_since,
2520 0 : },
2521 0 : dbnode,
2522 0 : },
2523 0 : db_size,
2524 0 : })
2525 : }
2526 : Tag::GetSlruSegment => {
2527 0 : let reqid = buf.read_u64::<BigEndian>()?;
2528 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2529 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2530 0 : let kind = buf.read_u8()?;
2531 0 : let segno = buf.read_u32::<BigEndian>()?;
2532 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2533 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
2534 0 : buf.read_exact(&mut segment)?;
2535 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
2536 0 : req: PagestreamGetSlruSegmentRequest {
2537 0 : hdr: PagestreamRequest {
2538 0 : reqid,
2539 0 : request_lsn,
2540 0 : not_modified_since,
2541 0 : },
2542 0 : kind,
2543 0 : segno,
2544 0 : },
2545 0 : segment: segment.into(),
2546 0 : })
2547 : }
2548 : #[cfg(feature = "testing")]
2549 : Tag::Test => {
2550 0 : let reqid = buf.read_u64::<BigEndian>()?;
2551 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2552 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2553 0 : let batch_key = buf.read_u64::<BigEndian>()?;
2554 0 : let len = buf.read_u64::<BigEndian>()?;
2555 0 : let mut msg = vec![0; len as usize];
2556 0 : buf.read_exact(&mut msg)?;
2557 0 : let message = String::from_utf8(msg)?;
2558 0 : Self::Test(PagestreamTestResponse {
2559 0 : req: PagestreamTestRequest {
2560 0 : hdr: PagestreamRequest {
2561 0 : reqid,
2562 0 : request_lsn,
2563 0 : not_modified_since,
2564 0 : },
2565 0 : batch_key,
2566 0 : message,
2567 0 : },
2568 0 : })
2569 : }
2570 : };
2571 0 : let remaining = buf.into_inner();
2572 0 : if !remaining.is_empty() {
2573 0 : anyhow::bail!(
2574 0 : "remaining bytes in msg with tag={msg_tag}: {}",
2575 0 : remaining.len()
2576 0 : );
2577 0 : }
2578 0 : Ok(ok)
2579 0 : }
2580 :
2581 0 : pub fn kind(&self) -> &'static str {
2582 0 : match self {
2583 0 : Self::Exists(_) => "Exists",
2584 0 : Self::Nblocks(_) => "Nblocks",
2585 0 : Self::GetPage(_) => "GetPage",
2586 0 : Self::Error(_) => "Error",
2587 0 : Self::DbSize(_) => "DbSize",
2588 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
2589 : #[cfg(feature = "testing")]
2590 0 : Self::Test(_) => "Test",
2591 : }
2592 0 : }
2593 : }
2594 :
2595 0 : #[derive(Debug, Serialize, Deserialize)]
2596 : pub struct PageTraceEvent {
2597 : pub key: CompactKey,
2598 : pub effective_lsn: Lsn,
2599 : pub time: SystemTime,
2600 : }
2601 :
2602 : impl Default for PageTraceEvent {
2603 0 : fn default() -> Self {
2604 0 : Self {
2605 0 : key: Default::default(),
2606 0 : effective_lsn: Default::default(),
2607 0 : time: std::time::UNIX_EPOCH,
2608 0 : }
2609 0 : }
2610 : }
2611 :
2612 : #[cfg(test)]
2613 : mod tests {
2614 : use std::str::FromStr;
2615 :
2616 : use serde_json::json;
2617 :
2618 : use super::*;
2619 :
2620 : #[test]
2621 1 : fn test_pagestream() {
2622 1 : // Test serialization/deserialization of PagestreamFeMessage
2623 1 : let messages = vec![
2624 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
2625 1 : hdr: PagestreamRequest {
2626 1 : reqid: 0,
2627 1 : request_lsn: Lsn(4),
2628 1 : not_modified_since: Lsn(3),
2629 1 : },
2630 1 : rel: RelTag {
2631 1 : forknum: 1,
2632 1 : spcnode: 2,
2633 1 : dbnode: 3,
2634 1 : relnode: 4,
2635 1 : },
2636 1 : }),
2637 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2638 1 : hdr: PagestreamRequest {
2639 1 : reqid: 0,
2640 1 : request_lsn: Lsn(4),
2641 1 : not_modified_since: Lsn(4),
2642 1 : },
2643 1 : rel: RelTag {
2644 1 : forknum: 1,
2645 1 : spcnode: 2,
2646 1 : dbnode: 3,
2647 1 : relnode: 4,
2648 1 : },
2649 1 : }),
2650 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2651 1 : hdr: PagestreamRequest {
2652 1 : reqid: 0,
2653 1 : request_lsn: Lsn(4),
2654 1 : not_modified_since: Lsn(3),
2655 1 : },
2656 1 : rel: RelTag {
2657 1 : forknum: 1,
2658 1 : spcnode: 2,
2659 1 : dbnode: 3,
2660 1 : relnode: 4,
2661 1 : },
2662 1 : blkno: 7,
2663 1 : }),
2664 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2665 1 : hdr: PagestreamRequest {
2666 1 : reqid: 0,
2667 1 : request_lsn: Lsn(4),
2668 1 : not_modified_since: Lsn(3),
2669 1 : },
2670 1 : dbnode: 7,
2671 1 : }),
2672 1 : ];
2673 5 : for msg in messages {
2674 4 : let bytes = msg.serialize();
2675 4 : let reconstructed =
2676 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
2677 4 : .unwrap();
2678 4 : assert!(msg == reconstructed);
2679 : }
2680 1 : }
2681 :
2682 : #[test]
2683 1 : fn test_tenantinfo_serde() {
2684 1 : // Test serialization/deserialization of TenantInfo
2685 1 : let original_active = TenantInfo {
2686 1 : id: TenantShardId::unsharded(TenantId::generate()),
2687 1 : state: TenantState::Active,
2688 1 : current_physical_size: Some(42),
2689 1 : attachment_status: TenantAttachmentStatus::Attached,
2690 1 : generation: 1,
2691 1 : gc_blocking: None,
2692 1 : };
2693 1 : let expected_active = json!({
2694 1 : "id": original_active.id.to_string(),
2695 1 : "state": {
2696 1 : "slug": "Active",
2697 1 : },
2698 1 : "current_physical_size": 42,
2699 1 : "attachment_status": {
2700 1 : "slug":"attached",
2701 1 : },
2702 1 : "generation" : 1
2703 1 : });
2704 1 :
2705 1 : let original_broken = TenantInfo {
2706 1 : id: TenantShardId::unsharded(TenantId::generate()),
2707 1 : state: TenantState::Broken {
2708 1 : reason: "reason".into(),
2709 1 : backtrace: "backtrace info".into(),
2710 1 : },
2711 1 : current_physical_size: Some(42),
2712 1 : attachment_status: TenantAttachmentStatus::Attached,
2713 1 : generation: 1,
2714 1 : gc_blocking: None,
2715 1 : };
2716 1 : let expected_broken = json!({
2717 1 : "id": original_broken.id.to_string(),
2718 1 : "state": {
2719 1 : "slug": "Broken",
2720 1 : "data": {
2721 1 : "backtrace": "backtrace info",
2722 1 : "reason": "reason",
2723 1 : }
2724 1 : },
2725 1 : "current_physical_size": 42,
2726 1 : "attachment_status": {
2727 1 : "slug":"attached",
2728 1 : },
2729 1 : "generation" : 1
2730 1 : });
2731 1 :
2732 1 : assert_eq!(
2733 1 : serde_json::to_value(&original_active).unwrap(),
2734 1 : expected_active
2735 1 : );
2736 :
2737 1 : assert_eq!(
2738 1 : serde_json::to_value(&original_broken).unwrap(),
2739 1 : expected_broken
2740 1 : );
2741 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
2742 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
2743 1 : }
2744 :
2745 : #[test]
2746 1 : fn test_reject_unknown_field() {
2747 1 : let id = TenantId::generate();
2748 1 : let config_request = json!({
2749 1 : "tenant_id": id.to_string(),
2750 1 : "unknown_field": "unknown_value".to_string(),
2751 1 : });
2752 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
2753 1 : assert!(
2754 1 : err.to_string().contains("unknown field `unknown_field`"),
2755 0 : "expect unknown field `unknown_field` error, got: {}",
2756 : err
2757 : );
2758 1 : }
2759 :
2760 : #[test]
2761 1 : fn tenantstatus_activating_serde() {
2762 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
2763 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
2764 1 :
2765 1 : let actual = serde_json::to_string(&states).unwrap();
2766 1 :
2767 1 : assert_eq!(actual, expected);
2768 :
2769 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
2770 1 :
2771 1 : assert_eq!(states.as_slice(), &parsed);
2772 1 : }
2773 :
2774 : #[test]
2775 1 : fn tenantstatus_activating_strum() {
2776 1 : // tests added, because we use these for metrics
2777 1 : let examples = [
2778 1 : (line!(), TenantState::Attaching, "Attaching"),
2779 1 : (
2780 1 : line!(),
2781 1 : TenantState::Activating(ActivatingFrom::Attaching),
2782 1 : "Activating",
2783 1 : ),
2784 1 : (line!(), TenantState::Active, "Active"),
2785 1 : (
2786 1 : line!(),
2787 1 : TenantState::Stopping { progress: None },
2788 1 : "Stopping",
2789 1 : ),
2790 1 : (
2791 1 : line!(),
2792 1 : TenantState::Stopping {
2793 1 : progress: Some(completion::Barrier::default()),
2794 1 : },
2795 1 : "Stopping",
2796 1 : ),
2797 1 : (
2798 1 : line!(),
2799 1 : TenantState::Broken {
2800 1 : reason: "Example".into(),
2801 1 : backtrace: "Looooong backtrace".into(),
2802 1 : },
2803 1 : "Broken",
2804 1 : ),
2805 1 : ];
2806 :
2807 7 : for (line, rendered, expected) in examples {
2808 6 : let actual: &'static str = rendered.into();
2809 6 : assert_eq!(actual, expected, "example on {line}");
2810 : }
2811 1 : }
2812 :
2813 : #[test]
2814 1 : fn test_image_compression_algorithm_parsing() {
2815 : use ImageCompressionAlgorithm::*;
2816 1 : let cases = [
2817 1 : ("disabled", Disabled),
2818 1 : ("zstd", Zstd { level: None }),
2819 1 : ("zstd(18)", Zstd { level: Some(18) }),
2820 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
2821 1 : ];
2822 :
2823 5 : for (display, expected) in cases {
2824 4 : assert_eq!(
2825 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
2826 : expected,
2827 0 : "parsing works"
2828 : );
2829 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
2830 :
2831 4 : let ser = serde_json::to_string(&expected).expect("serialization");
2832 4 : assert_eq!(
2833 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
2834 : expected,
2835 0 : "serde roundtrip"
2836 : );
2837 :
2838 4 : assert_eq!(
2839 4 : serde_json::Value::String(display.to_string()),
2840 4 : serde_json::to_value(expected).unwrap(),
2841 0 : "Display is the serde serialization"
2842 : );
2843 : }
2844 1 : }
2845 :
2846 : #[test]
2847 1 : fn test_tenant_config_patch_request_serde() {
2848 1 : let patch_request = TenantConfigPatchRequest {
2849 1 : tenant_id: TenantId::from_str("17c6d121946a61e5ab0fe5a2fd4d8215").unwrap(),
2850 1 : config: TenantConfigPatch {
2851 1 : checkpoint_distance: FieldPatch::Upsert(42),
2852 1 : gc_horizon: FieldPatch::Remove,
2853 1 : compaction_threshold: FieldPatch::Noop,
2854 1 : ..TenantConfigPatch::default()
2855 1 : },
2856 1 : };
2857 1 :
2858 1 : let json = serde_json::to_string(&patch_request).unwrap();
2859 1 :
2860 1 : let expected = r#"{"tenant_id":"17c6d121946a61e5ab0fe5a2fd4d8215","checkpoint_distance":42,"gc_horizon":null}"#;
2861 1 : assert_eq!(json, expected);
2862 :
2863 1 : let decoded: TenantConfigPatchRequest = serde_json::from_str(&json).unwrap();
2864 1 : assert_eq!(decoded.tenant_id, patch_request.tenant_id);
2865 1 : assert_eq!(decoded.config, patch_request.config);
2866 :
2867 : // Now apply the patch to a config to demonstrate semantics
2868 :
2869 1 : let base = TenantConfig {
2870 1 : checkpoint_distance: Some(28),
2871 1 : gc_horizon: Some(100),
2872 1 : compaction_target_size: Some(1024),
2873 1 : ..Default::default()
2874 1 : };
2875 1 :
2876 1 : let expected = TenantConfig {
2877 1 : checkpoint_distance: Some(42),
2878 1 : gc_horizon: None,
2879 1 : ..base.clone()
2880 1 : };
2881 1 :
2882 1 : let patched = base.apply_patch(decoded.config).unwrap();
2883 1 :
2884 1 : assert_eq!(patched, expected);
2885 1 : }
2886 : }
|