Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : use core::ops::Range;
6 : use std::collections::HashMap;
7 : use std::fmt::Display;
8 : use std::io::{BufRead, Read};
9 : use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
10 : use std::str::FromStr;
11 : use std::time::{Duration, SystemTime};
12 :
13 : use byteorder::{BigEndian, ReadBytesExt};
14 : use bytes::{Buf, BufMut, Bytes, BytesMut};
15 : #[cfg(feature = "testing")]
16 : use camino::Utf8PathBuf;
17 : use postgres_ffi::BLCKSZ;
18 : use serde::{Deserialize, Deserializer, Serialize, Serializer};
19 : use serde_with::serde_as;
20 : pub use utilization::PageserverUtilization;
21 : use utils::id::{NodeId, TenantId, TimelineId};
22 : use utils::lsn::Lsn;
23 : use utils::postgres_client::PostgresClientProtocol;
24 : use utils::{completion, serde_system_time};
25 :
26 : use crate::key::{CompactKey, Key};
27 : use crate::reltag::RelTag;
28 : use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
29 :
30 : /// The state of a tenant in this pageserver.
31 : ///
32 : /// ```mermaid
33 : /// stateDiagram-v2
34 : ///
35 : /// [*] --> Attaching: spawn_attach()
36 : ///
37 : /// Attaching --> Activating: activate()
38 : /// Activating --> Active: infallible
39 : ///
40 : /// Attaching --> Broken: attach() failure
41 : ///
42 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
43 : /// Stopping --> Broken: late error in remove_tenant_from_memory
44 : ///
45 : /// Broken --> [*]: ignore / detach / shutdown
46 : /// Stopping --> [*]: remove_from_memory complete
47 : ///
48 : /// Active --> Broken: cfg(testing)-only tenant break point
49 : /// ```
50 : #[derive(
51 : Clone,
52 : PartialEq,
53 : Eq,
54 0 : serde::Serialize,
55 1 : serde::Deserialize,
56 : strum_macros::Display,
57 : strum_macros::VariantNames,
58 : strum_macros::AsRefStr,
59 : strum_macros::IntoStaticStr,
60 : )]
61 : #[serde(tag = "slug", content = "data")]
62 : pub enum TenantState {
63 : /// This tenant is being attached to the pageserver.
64 : ///
65 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
66 : Attaching,
67 : /// The tenant is transitioning from Loading/Attaching to Active.
68 : ///
69 : /// While in this state, the individual timelines are being activated.
70 : ///
71 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
72 : Activating(ActivatingFrom),
73 : /// The tenant has finished activating and is open for business.
74 : ///
75 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
76 : Active,
77 : /// The tenant is recognized by pageserver, but it is being detached or the
78 : /// system is being shut down.
79 : ///
80 : /// Transitions out of this state are possible through `set_broken()`.
81 : Stopping {
82 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
83 : // otherwise it will not be skipped during deserialization
84 : #[serde(skip)]
85 : progress: completion::Barrier,
86 : },
87 : /// The tenant is recognized by the pageserver, but can no longer be used for
88 : /// any operations.
89 : ///
90 : /// If the tenant fails to load or attach, it will transition to this state
91 : /// and it is guaranteed that no background tasks are running in its name.
92 : ///
93 : /// The other way to transition into this state is from `Stopping` state
94 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
95 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
96 : Broken { reason: String, backtrace: String },
97 : }
98 :
99 : impl TenantState {
100 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
101 : use TenantAttachmentStatus::*;
102 :
103 : // Below TenantState::Activating is used as "transient" or "transparent" state for
104 : // attachment_status determining.
105 0 : match self {
106 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
107 : // So, technically, we can return Attached here.
108 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
109 : // But, our attach task might still be fetching the remote timelines, etc.
110 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
111 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
112 : // We only reach Active after successful load / attach.
113 : // So, call atttachment status Attached.
114 0 : Self::Active => Attached,
115 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
116 : // However, it also becomes Broken if the regular load fails.
117 : // From Console's perspective there's no practical difference
118 : // because attachment_status is polled by console only during attach operation execution.
119 0 : Self::Broken { reason, .. } => Failed {
120 0 : reason: reason.to_owned(),
121 0 : },
122 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
123 : // we set the Stopping state irrespective of whether the tenant
124 : // has finished attaching or not.
125 0 : Self::Stopping { .. } => Maybe,
126 : }
127 0 : }
128 :
129 0 : pub fn broken_from_reason(reason: String) -> Self {
130 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
131 0 : Self::Broken {
132 0 : reason,
133 0 : backtrace: backtrace_str,
134 0 : }
135 0 : }
136 : }
137 :
138 : impl std::fmt::Debug for TenantState {
139 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 2 : match self {
141 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
142 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
143 : }
144 0 : _ => write!(f, "{self}"),
145 : }
146 2 : }
147 : }
148 :
149 : /// A temporary lease to a specific lsn inside a timeline.
150 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
151 : #[serde_as]
152 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
153 : pub struct LsnLease {
154 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
155 : pub valid_until: SystemTime,
156 : }
157 :
158 : serde_with::serde_conv!(
159 : SystemTimeAsRfc3339Millis,
160 : SystemTime,
161 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
162 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
163 : );
164 :
165 : impl LsnLease {
166 : /// The default length for an explicit LSN lease request (10 minutes).
167 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
168 :
169 : /// The default length for an implicit LSN lease granted during
170 : /// `get_lsn_by_timestamp` request (1 minutes).
171 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
172 :
173 : /// Checks whether the lease is expired.
174 12 : pub fn is_expired(&self, now: &SystemTime) -> bool {
175 12 : now > &self.valid_until
176 12 : }
177 : }
178 :
179 : /// Controls the detach ancestor behavior.
180 : /// - When set to `NoAncestorAndReparent`, we will only detach a branch if its ancestor is a root branch. It will automatically reparent any children of the ancestor before and at the branch point.
181 : /// - When set to `MultiLevelAndNoReparent`, we will detach a branch from multiple levels of ancestors, and no reparenting will happen at all.
182 : #[derive(Debug, Clone, Copy, Default)]
183 : pub enum DetachBehavior {
184 : #[default]
185 : NoAncestorAndReparent,
186 : MultiLevelAndNoReparent,
187 : }
188 :
189 : impl std::str::FromStr for DetachBehavior {
190 : type Err = &'static str;
191 :
192 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
193 0 : match s {
194 0 : "no_ancestor_and_reparent" => Ok(DetachBehavior::NoAncestorAndReparent),
195 0 : "multi_level_and_no_reparent" => Ok(DetachBehavior::MultiLevelAndNoReparent),
196 0 : "v1" => Ok(DetachBehavior::NoAncestorAndReparent),
197 0 : "v2" => Ok(DetachBehavior::MultiLevelAndNoReparent),
198 0 : _ => Err("cannot parse detach behavior"),
199 : }
200 0 : }
201 : }
202 :
203 : impl std::fmt::Display for DetachBehavior {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : match self {
206 0 : DetachBehavior::NoAncestorAndReparent => write!(f, "no_ancestor_and_reparent"),
207 0 : DetachBehavior::MultiLevelAndNoReparent => write!(f, "multi_level_and_no_reparent"),
208 : }
209 0 : }
210 : }
211 :
212 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
213 : ///
214 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
215 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
216 1 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
217 : pub enum ActivatingFrom {
218 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
219 : Attaching,
220 : }
221 :
222 : /// A state of a timeline in pageserver's memory.
223 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
224 : pub enum TimelineState {
225 : /// The timeline is recognized by the pageserver but is not yet operational.
226 : /// In particular, the walreceiver connection loop is not running for this timeline.
227 : /// It will eventually transition to state Active or Broken.
228 : Loading,
229 : /// The timeline is fully operational.
230 : /// It can be queried, and the walreceiver connection loop is running.
231 : Active,
232 : /// The timeline was previously Loading or Active but is shutting down.
233 : /// It cannot transition back into any other state.
234 : Stopping,
235 : /// The timeline is broken and not operational (previous states: Loading or Active).
236 : Broken { reason: String, backtrace: String },
237 : }
238 :
239 : #[serde_with::serde_as]
240 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
241 : pub struct CompactLsnRange {
242 : pub start: Lsn,
243 : pub end: Lsn,
244 : }
245 :
246 : #[serde_with::serde_as]
247 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
248 : pub struct CompactKeyRange {
249 : #[serde_as(as = "serde_with::DisplayFromStr")]
250 : pub start: Key,
251 : #[serde_as(as = "serde_with::DisplayFromStr")]
252 : pub end: Key,
253 : }
254 :
255 : impl From<Range<Lsn>> for CompactLsnRange {
256 12 : fn from(range: Range<Lsn>) -> Self {
257 12 : Self {
258 12 : start: range.start,
259 12 : end: range.end,
260 12 : }
261 12 : }
262 : }
263 :
264 : impl From<Range<Key>> for CompactKeyRange {
265 32 : fn from(range: Range<Key>) -> Self {
266 32 : Self {
267 32 : start: range.start,
268 32 : end: range.end,
269 32 : }
270 32 : }
271 : }
272 :
273 : impl From<CompactLsnRange> for Range<Lsn> {
274 20 : fn from(range: CompactLsnRange) -> Self {
275 20 : range.start..range.end
276 20 : }
277 : }
278 :
279 : impl From<CompactKeyRange> for Range<Key> {
280 32 : fn from(range: CompactKeyRange) -> Self {
281 32 : range.start..range.end
282 32 : }
283 : }
284 :
285 : impl CompactLsnRange {
286 8 : pub fn above(lsn: Lsn) -> Self {
287 8 : Self {
288 8 : start: lsn,
289 8 : end: Lsn::MAX,
290 8 : }
291 8 : }
292 : }
293 :
294 : #[derive(Debug, Clone, Serialize)]
295 : pub struct CompactInfoResponse {
296 : pub compact_key_range: Option<CompactKeyRange>,
297 : pub compact_lsn_range: Option<CompactLsnRange>,
298 : pub sub_compaction: bool,
299 : pub running: bool,
300 : pub job_id: usize,
301 : }
302 :
303 0 : #[derive(Serialize, Deserialize, Clone)]
304 : pub struct TimelineCreateRequest {
305 : pub new_timeline_id: TimelineId,
306 : #[serde(flatten)]
307 : pub mode: TimelineCreateRequestMode,
308 : }
309 :
310 : /// Storage controller specific extensions to [`TimelineInfo`].
311 0 : #[derive(Serialize, Deserialize, Clone)]
312 : pub struct TimelineCreateResponseStorcon {
313 : #[serde(flatten)]
314 : pub timeline_info: TimelineInfo,
315 :
316 : pub safekeepers: Option<SafekeepersInfo>,
317 : }
318 :
319 : /// Safekeepers as returned in timeline creation request to storcon or pushed to
320 : /// cplane in the post migration hook.
321 0 : #[derive(Serialize, Deserialize, Clone)]
322 : pub struct SafekeepersInfo {
323 : pub tenant_id: TenantId,
324 : pub timeline_id: TimelineId,
325 : pub generation: u32,
326 : pub safekeepers: Vec<SafekeeperInfo>,
327 : }
328 :
329 0 : #[derive(Serialize, Deserialize, Clone)]
330 : pub struct SafekeeperInfo {
331 : pub id: NodeId,
332 : pub hostname: String,
333 : }
334 :
335 0 : #[derive(Serialize, Deserialize, Clone)]
336 : #[serde(untagged)]
337 : pub enum TimelineCreateRequestMode {
338 : Branch {
339 : ancestor_timeline_id: TimelineId,
340 : #[serde(default)]
341 : ancestor_start_lsn: Option<Lsn>,
342 : // TODO: cplane sets this, but, the branching code always
343 : // inherits the ancestor's pg_version. Earlier code wasn't
344 : // using a flattened enum, so, it was an accepted field, and
345 : // we continue to accept it by having it here.
346 : pg_version: Option<u32>,
347 : },
348 : ImportPgdata {
349 : import_pgdata: TimelineCreateRequestModeImportPgdata,
350 : },
351 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
352 : // (serde picks the first matching enum variant, in declaration order).
353 : Bootstrap {
354 : #[serde(default)]
355 : existing_initdb_timeline_id: Option<TimelineId>,
356 : pg_version: Option<u32>,
357 : },
358 : }
359 :
360 0 : #[derive(Serialize, Deserialize, Clone)]
361 : pub struct TimelineCreateRequestModeImportPgdata {
362 : pub location: ImportPgdataLocation,
363 : pub idempotency_key: ImportPgdataIdempotencyKey,
364 : }
365 :
366 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
367 : pub enum ImportPgdataLocation {
368 : #[cfg(feature = "testing")]
369 : LocalFs { path: Utf8PathBuf },
370 : AwsS3 {
371 : region: String,
372 : bucket: String,
373 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
374 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
375 : key: String,
376 : },
377 : }
378 :
379 0 : #[derive(Serialize, Deserialize, Clone)]
380 : #[serde(transparent)]
381 : pub struct ImportPgdataIdempotencyKey(pub String);
382 :
383 : impl ImportPgdataIdempotencyKey {
384 0 : pub fn random() -> Self {
385 : use rand::Rng;
386 : use rand::distributions::Alphanumeric;
387 0 : Self(
388 0 : rand::thread_rng()
389 0 : .sample_iter(&Alphanumeric)
390 0 : .take(20)
391 0 : .map(char::from)
392 0 : .collect(),
393 0 : )
394 0 : }
395 : }
396 :
397 0 : #[derive(Serialize, Deserialize, Clone)]
398 : pub struct LsnLeaseRequest {
399 : pub lsn: Lsn,
400 : }
401 :
402 0 : #[derive(Serialize, Deserialize)]
403 : pub struct TenantShardSplitRequest {
404 : pub new_shard_count: u8,
405 :
406 : // A tenant's stripe size is only meaningful the first time their shard count goes
407 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
408 : //
409 : // If this is set while the stripe count is being increased from an already >1 value,
410 : // then the request will fail with 400.
411 : pub new_stripe_size: Option<ShardStripeSize>,
412 : }
413 :
414 0 : #[derive(Serialize, Deserialize)]
415 : pub struct TenantShardSplitResponse {
416 : pub new_shards: Vec<TenantShardId>,
417 : }
418 :
419 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
420 0 : #[derive(Serialize, Deserialize, Debug)]
421 : #[serde(deny_unknown_fields)]
422 : pub struct ShardParameters {
423 : pub count: ShardCount,
424 : pub stripe_size: ShardStripeSize,
425 : }
426 :
427 : impl ShardParameters {
428 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
429 :
430 0 : pub fn is_unsharded(&self) -> bool {
431 0 : self.count.is_unsharded()
432 0 : }
433 : }
434 :
435 : impl Default for ShardParameters {
436 453 : fn default() -> Self {
437 453 : Self {
438 453 : count: ShardCount::new(0),
439 453 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
440 453 : }
441 453 : }
442 : }
443 :
444 : #[derive(Debug, Default, Clone, Eq, PartialEq)]
445 : pub enum FieldPatch<T> {
446 : Upsert(T),
447 : Remove,
448 : #[default]
449 : Noop,
450 : }
451 :
452 : impl<T> FieldPatch<T> {
453 70 : fn is_noop(&self) -> bool {
454 70 : matches!(self, FieldPatch::Noop)
455 70 : }
456 :
457 35 : pub fn apply(self, target: &mut Option<T>) {
458 35 : match self {
459 1 : Self::Upsert(v) => *target = Some(v),
460 1 : Self::Remove => *target = None,
461 33 : Self::Noop => {}
462 : }
463 35 : }
464 :
465 10 : pub fn map<U, E, F: FnOnce(T) -> Result<U, E>>(self, map: F) -> Result<FieldPatch<U>, E> {
466 10 : match self {
467 0 : Self::Upsert(v) => Ok(FieldPatch::<U>::Upsert(map(v)?)),
468 0 : Self::Remove => Ok(FieldPatch::<U>::Remove),
469 10 : Self::Noop => Ok(FieldPatch::<U>::Noop),
470 : }
471 10 : }
472 : }
473 :
474 : impl<'de, T: Deserialize<'de>> Deserialize<'de> for FieldPatch<T> {
475 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
476 2 : where
477 2 : D: Deserializer<'de>,
478 2 : {
479 2 : Option::deserialize(deserializer).map(|opt| match opt {
480 1 : None => FieldPatch::Remove,
481 1 : Some(val) => FieldPatch::Upsert(val),
482 2 : })
483 2 : }
484 : }
485 :
486 : impl<T: Serialize> Serialize for FieldPatch<T> {
487 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
488 2 : where
489 2 : S: Serializer,
490 2 : {
491 2 : match self {
492 1 : FieldPatch::Upsert(val) => serializer.serialize_some(val),
493 1 : FieldPatch::Remove => serializer.serialize_none(),
494 0 : FieldPatch::Noop => unreachable!(),
495 : }
496 2 : }
497 : }
498 :
499 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
500 : #[serde(default)]
501 : pub struct TenantConfigPatch {
502 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
503 : pub checkpoint_distance: FieldPatch<u64>,
504 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
505 : pub checkpoint_timeout: FieldPatch<String>,
506 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
507 : pub compaction_target_size: FieldPatch<u64>,
508 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
509 : pub compaction_period: FieldPatch<String>,
510 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
511 : pub compaction_threshold: FieldPatch<usize>,
512 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
513 : pub compaction_upper_limit: FieldPatch<usize>,
514 : // defer parsing compaction_algorithm, like eviction_policy
515 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
516 : pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
517 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
518 : pub compaction_l0_first: FieldPatch<bool>,
519 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
520 : pub compaction_l0_semaphore: FieldPatch<bool>,
521 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
522 : pub l0_flush_delay_threshold: FieldPatch<usize>,
523 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
524 : pub l0_flush_stall_threshold: FieldPatch<usize>,
525 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
526 : pub l0_flush_wait_upload: FieldPatch<bool>,
527 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
528 : pub gc_horizon: FieldPatch<u64>,
529 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
530 : pub gc_period: FieldPatch<String>,
531 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
532 : pub image_creation_threshold: FieldPatch<usize>,
533 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
534 : pub pitr_interval: FieldPatch<String>,
535 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
536 : pub walreceiver_connect_timeout: FieldPatch<String>,
537 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
538 : pub lagging_wal_timeout: FieldPatch<String>,
539 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
540 : pub max_lsn_wal_lag: FieldPatch<NonZeroU64>,
541 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
542 : pub eviction_policy: FieldPatch<EvictionPolicy>,
543 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
544 : pub min_resident_size_override: FieldPatch<u64>,
545 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
546 : pub evictions_low_residence_duration_metric_threshold: FieldPatch<String>,
547 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
548 : pub heatmap_period: FieldPatch<String>,
549 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
550 : pub lazy_slru_download: FieldPatch<bool>,
551 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
552 : pub timeline_get_throttle: FieldPatch<ThrottleConfig>,
553 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
554 : pub image_layer_creation_check_threshold: FieldPatch<u8>,
555 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
556 : pub image_creation_preempt_threshold: FieldPatch<usize>,
557 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
558 : pub lsn_lease_length: FieldPatch<String>,
559 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
560 : pub lsn_lease_length_for_ts: FieldPatch<String>,
561 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
562 : pub timeline_offloading: FieldPatch<bool>,
563 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
564 : pub wal_receiver_protocol_override: FieldPatch<PostgresClientProtocol>,
565 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
566 : pub rel_size_v2_enabled: FieldPatch<bool>,
567 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
568 : pub gc_compaction_enabled: FieldPatch<bool>,
569 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
570 : pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
571 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
572 : pub gc_compaction_ratio_percent: FieldPatch<u64>,
573 : }
574 :
575 : /// Like [`crate::config::TenantConfigToml`], but preserves the information
576 : /// about which parameters are set and which are not.
577 : ///
578 : /// Used in many places, including durably stored ones.
579 8 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
580 : #[serde(default)] // this maps omitted fields in deserialization to None
581 : pub struct TenantConfig {
582 : #[serde(skip_serializing_if = "Option::is_none")]
583 : pub checkpoint_distance: Option<u64>,
584 :
585 : #[serde(skip_serializing_if = "Option::is_none")]
586 : #[serde(with = "humantime_serde")]
587 : pub checkpoint_timeout: Option<Duration>,
588 :
589 : #[serde(skip_serializing_if = "Option::is_none")]
590 : pub compaction_target_size: Option<u64>,
591 :
592 : #[serde(skip_serializing_if = "Option::is_none")]
593 : #[serde(with = "humantime_serde")]
594 : pub compaction_period: Option<Duration>,
595 :
596 : #[serde(skip_serializing_if = "Option::is_none")]
597 : pub compaction_threshold: Option<usize>,
598 :
599 : #[serde(skip_serializing_if = "Option::is_none")]
600 : pub compaction_upper_limit: Option<usize>,
601 :
602 : #[serde(skip_serializing_if = "Option::is_none")]
603 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
604 :
605 : #[serde(skip_serializing_if = "Option::is_none")]
606 : pub compaction_l0_first: Option<bool>,
607 :
608 : #[serde(skip_serializing_if = "Option::is_none")]
609 : pub compaction_l0_semaphore: Option<bool>,
610 :
611 : #[serde(skip_serializing_if = "Option::is_none")]
612 : pub l0_flush_delay_threshold: Option<usize>,
613 :
614 : #[serde(skip_serializing_if = "Option::is_none")]
615 : pub l0_flush_stall_threshold: Option<usize>,
616 :
617 : #[serde(skip_serializing_if = "Option::is_none")]
618 : pub l0_flush_wait_upload: Option<bool>,
619 :
620 : #[serde(skip_serializing_if = "Option::is_none")]
621 : pub gc_horizon: Option<u64>,
622 :
623 : #[serde(skip_serializing_if = "Option::is_none")]
624 : #[serde(with = "humantime_serde")]
625 : pub gc_period: Option<Duration>,
626 :
627 : #[serde(skip_serializing_if = "Option::is_none")]
628 : pub image_creation_threshold: Option<usize>,
629 :
630 : #[serde(skip_serializing_if = "Option::is_none")]
631 : #[serde(with = "humantime_serde")]
632 : pub pitr_interval: Option<Duration>,
633 :
634 : #[serde(skip_serializing_if = "Option::is_none")]
635 : #[serde(with = "humantime_serde")]
636 : pub walreceiver_connect_timeout: Option<Duration>,
637 :
638 : #[serde(skip_serializing_if = "Option::is_none")]
639 : #[serde(with = "humantime_serde")]
640 : pub lagging_wal_timeout: Option<Duration>,
641 :
642 : #[serde(skip_serializing_if = "Option::is_none")]
643 : pub max_lsn_wal_lag: Option<NonZeroU64>,
644 :
645 : #[serde(skip_serializing_if = "Option::is_none")]
646 : pub eviction_policy: Option<EvictionPolicy>,
647 :
648 : #[serde(skip_serializing_if = "Option::is_none")]
649 : pub min_resident_size_override: Option<u64>,
650 :
651 : #[serde(skip_serializing_if = "Option::is_none")]
652 : #[serde(with = "humantime_serde")]
653 : pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
654 :
655 : #[serde(skip_serializing_if = "Option::is_none")]
656 : #[serde(with = "humantime_serde")]
657 : pub heatmap_period: Option<Duration>,
658 :
659 : #[serde(skip_serializing_if = "Option::is_none")]
660 : pub lazy_slru_download: Option<bool>,
661 :
662 : #[serde(skip_serializing_if = "Option::is_none")]
663 : pub timeline_get_throttle: Option<ThrottleConfig>,
664 :
665 : #[serde(skip_serializing_if = "Option::is_none")]
666 : pub image_layer_creation_check_threshold: Option<u8>,
667 :
668 : #[serde(skip_serializing_if = "Option::is_none")]
669 : pub image_creation_preempt_threshold: Option<usize>,
670 :
671 : #[serde(skip_serializing_if = "Option::is_none")]
672 : #[serde(with = "humantime_serde")]
673 : pub lsn_lease_length: Option<Duration>,
674 :
675 : #[serde(skip_serializing_if = "Option::is_none")]
676 : #[serde(with = "humantime_serde")]
677 : pub lsn_lease_length_for_ts: Option<Duration>,
678 :
679 : #[serde(skip_serializing_if = "Option::is_none")]
680 : pub timeline_offloading: Option<bool>,
681 :
682 : #[serde(skip_serializing_if = "Option::is_none")]
683 : pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
684 :
685 : #[serde(skip_serializing_if = "Option::is_none")]
686 : pub rel_size_v2_enabled: Option<bool>,
687 :
688 : #[serde(skip_serializing_if = "Option::is_none")]
689 : pub gc_compaction_enabled: Option<bool>,
690 :
691 : #[serde(skip_serializing_if = "Option::is_none")]
692 : pub gc_compaction_initial_threshold_kb: Option<u64>,
693 :
694 : #[serde(skip_serializing_if = "Option::is_none")]
695 : pub gc_compaction_ratio_percent: Option<u64>,
696 : }
697 :
698 : impl TenantConfig {
699 1 : pub fn apply_patch(
700 1 : self,
701 1 : patch: TenantConfigPatch,
702 1 : ) -> Result<TenantConfig, humantime::DurationError> {
703 1 : let Self {
704 1 : mut checkpoint_distance,
705 1 : mut checkpoint_timeout,
706 1 : mut compaction_target_size,
707 1 : mut compaction_period,
708 1 : mut compaction_threshold,
709 1 : mut compaction_upper_limit,
710 1 : mut compaction_algorithm,
711 1 : mut compaction_l0_first,
712 1 : mut compaction_l0_semaphore,
713 1 : mut l0_flush_delay_threshold,
714 1 : mut l0_flush_stall_threshold,
715 1 : mut l0_flush_wait_upload,
716 1 : mut gc_horizon,
717 1 : mut gc_period,
718 1 : mut image_creation_threshold,
719 1 : mut pitr_interval,
720 1 : mut walreceiver_connect_timeout,
721 1 : mut lagging_wal_timeout,
722 1 : mut max_lsn_wal_lag,
723 1 : mut eviction_policy,
724 1 : mut min_resident_size_override,
725 1 : mut evictions_low_residence_duration_metric_threshold,
726 1 : mut heatmap_period,
727 1 : mut lazy_slru_download,
728 1 : mut timeline_get_throttle,
729 1 : mut image_layer_creation_check_threshold,
730 1 : mut image_creation_preempt_threshold,
731 1 : mut lsn_lease_length,
732 1 : mut lsn_lease_length_for_ts,
733 1 : mut timeline_offloading,
734 1 : mut wal_receiver_protocol_override,
735 1 : mut rel_size_v2_enabled,
736 1 : mut gc_compaction_enabled,
737 1 : mut gc_compaction_initial_threshold_kb,
738 1 : mut gc_compaction_ratio_percent,
739 1 : } = self;
740 1 :
741 1 : patch.checkpoint_distance.apply(&mut checkpoint_distance);
742 1 : patch
743 1 : .checkpoint_timeout
744 1 : .map(|v| humantime::parse_duration(&v))?
745 1 : .apply(&mut checkpoint_timeout);
746 1 : patch
747 1 : .compaction_target_size
748 1 : .apply(&mut compaction_target_size);
749 1 : patch
750 1 : .compaction_period
751 1 : .map(|v| humantime::parse_duration(&v))?
752 1 : .apply(&mut compaction_period);
753 1 : patch.compaction_threshold.apply(&mut compaction_threshold);
754 1 : patch
755 1 : .compaction_upper_limit
756 1 : .apply(&mut compaction_upper_limit);
757 1 : patch.compaction_algorithm.apply(&mut compaction_algorithm);
758 1 : patch.compaction_l0_first.apply(&mut compaction_l0_first);
759 1 : patch
760 1 : .compaction_l0_semaphore
761 1 : .apply(&mut compaction_l0_semaphore);
762 1 : patch
763 1 : .l0_flush_delay_threshold
764 1 : .apply(&mut l0_flush_delay_threshold);
765 1 : patch
766 1 : .l0_flush_stall_threshold
767 1 : .apply(&mut l0_flush_stall_threshold);
768 1 : patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
769 1 : patch.gc_horizon.apply(&mut gc_horizon);
770 1 : patch
771 1 : .gc_period
772 1 : .map(|v| humantime::parse_duration(&v))?
773 1 : .apply(&mut gc_period);
774 1 : patch
775 1 : .image_creation_threshold
776 1 : .apply(&mut image_creation_threshold);
777 1 : patch
778 1 : .pitr_interval
779 1 : .map(|v| humantime::parse_duration(&v))?
780 1 : .apply(&mut pitr_interval);
781 1 : patch
782 1 : .walreceiver_connect_timeout
783 1 : .map(|v| humantime::parse_duration(&v))?
784 1 : .apply(&mut walreceiver_connect_timeout);
785 1 : patch
786 1 : .lagging_wal_timeout
787 1 : .map(|v| humantime::parse_duration(&v))?
788 1 : .apply(&mut lagging_wal_timeout);
789 1 : patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
790 1 : patch.eviction_policy.apply(&mut eviction_policy);
791 1 : patch
792 1 : .min_resident_size_override
793 1 : .apply(&mut min_resident_size_override);
794 1 : patch
795 1 : .evictions_low_residence_duration_metric_threshold
796 1 : .map(|v| humantime::parse_duration(&v))?
797 1 : .apply(&mut evictions_low_residence_duration_metric_threshold);
798 1 : patch
799 1 : .heatmap_period
800 1 : .map(|v| humantime::parse_duration(&v))?
801 1 : .apply(&mut heatmap_period);
802 1 : patch.lazy_slru_download.apply(&mut lazy_slru_download);
803 1 : patch
804 1 : .timeline_get_throttle
805 1 : .apply(&mut timeline_get_throttle);
806 1 : patch
807 1 : .image_layer_creation_check_threshold
808 1 : .apply(&mut image_layer_creation_check_threshold);
809 1 : patch
810 1 : .image_creation_preempt_threshold
811 1 : .apply(&mut image_creation_preempt_threshold);
812 1 : patch
813 1 : .lsn_lease_length
814 1 : .map(|v| humantime::parse_duration(&v))?
815 1 : .apply(&mut lsn_lease_length);
816 1 : patch
817 1 : .lsn_lease_length_for_ts
818 1 : .map(|v| humantime::parse_duration(&v))?
819 1 : .apply(&mut lsn_lease_length_for_ts);
820 1 : patch.timeline_offloading.apply(&mut timeline_offloading);
821 1 : patch
822 1 : .wal_receiver_protocol_override
823 1 : .apply(&mut wal_receiver_protocol_override);
824 1 : patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled);
825 1 : patch
826 1 : .gc_compaction_enabled
827 1 : .apply(&mut gc_compaction_enabled);
828 1 : patch
829 1 : .gc_compaction_initial_threshold_kb
830 1 : .apply(&mut gc_compaction_initial_threshold_kb);
831 1 : patch
832 1 : .gc_compaction_ratio_percent
833 1 : .apply(&mut gc_compaction_ratio_percent);
834 1 :
835 1 : Ok(Self {
836 1 : checkpoint_distance,
837 1 : checkpoint_timeout,
838 1 : compaction_target_size,
839 1 : compaction_period,
840 1 : compaction_threshold,
841 1 : compaction_upper_limit,
842 1 : compaction_algorithm,
843 1 : compaction_l0_first,
844 1 : compaction_l0_semaphore,
845 1 : l0_flush_delay_threshold,
846 1 : l0_flush_stall_threshold,
847 1 : l0_flush_wait_upload,
848 1 : gc_horizon,
849 1 : gc_period,
850 1 : image_creation_threshold,
851 1 : pitr_interval,
852 1 : walreceiver_connect_timeout,
853 1 : lagging_wal_timeout,
854 1 : max_lsn_wal_lag,
855 1 : eviction_policy,
856 1 : min_resident_size_override,
857 1 : evictions_low_residence_duration_metric_threshold,
858 1 : heatmap_period,
859 1 : lazy_slru_download,
860 1 : timeline_get_throttle,
861 1 : image_layer_creation_check_threshold,
862 1 : image_creation_preempt_threshold,
863 1 : lsn_lease_length,
864 1 : lsn_lease_length_for_ts,
865 1 : timeline_offloading,
866 1 : wal_receiver_protocol_override,
867 1 : rel_size_v2_enabled,
868 1 : gc_compaction_enabled,
869 1 : gc_compaction_initial_threshold_kb,
870 1 : gc_compaction_ratio_percent,
871 1 : })
872 1 : }
873 :
874 0 : pub fn merge(
875 0 : &self,
876 0 : global_conf: crate::config::TenantConfigToml,
877 0 : ) -> crate::config::TenantConfigToml {
878 0 : crate::config::TenantConfigToml {
879 0 : checkpoint_distance: self
880 0 : .checkpoint_distance
881 0 : .unwrap_or(global_conf.checkpoint_distance),
882 0 : checkpoint_timeout: self
883 0 : .checkpoint_timeout
884 0 : .unwrap_or(global_conf.checkpoint_timeout),
885 0 : compaction_target_size: self
886 0 : .compaction_target_size
887 0 : .unwrap_or(global_conf.compaction_target_size),
888 0 : compaction_period: self
889 0 : .compaction_period
890 0 : .unwrap_or(global_conf.compaction_period),
891 0 : compaction_threshold: self
892 0 : .compaction_threshold
893 0 : .unwrap_or(global_conf.compaction_threshold),
894 0 : compaction_upper_limit: self
895 0 : .compaction_upper_limit
896 0 : .unwrap_or(global_conf.compaction_upper_limit),
897 0 : compaction_algorithm: self
898 0 : .compaction_algorithm
899 0 : .as_ref()
900 0 : .unwrap_or(&global_conf.compaction_algorithm)
901 0 : .clone(),
902 0 : compaction_l0_first: self
903 0 : .compaction_l0_first
904 0 : .unwrap_or(global_conf.compaction_l0_first),
905 0 : compaction_l0_semaphore: self
906 0 : .compaction_l0_semaphore
907 0 : .unwrap_or(global_conf.compaction_l0_semaphore),
908 0 : l0_flush_delay_threshold: self
909 0 : .l0_flush_delay_threshold
910 0 : .or(global_conf.l0_flush_delay_threshold),
911 0 : l0_flush_stall_threshold: self
912 0 : .l0_flush_stall_threshold
913 0 : .or(global_conf.l0_flush_stall_threshold),
914 0 : l0_flush_wait_upload: self
915 0 : .l0_flush_wait_upload
916 0 : .unwrap_or(global_conf.l0_flush_wait_upload),
917 0 : gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
918 0 : gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
919 0 : image_creation_threshold: self
920 0 : .image_creation_threshold
921 0 : .unwrap_or(global_conf.image_creation_threshold),
922 0 : pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
923 0 : walreceiver_connect_timeout: self
924 0 : .walreceiver_connect_timeout
925 0 : .unwrap_or(global_conf.walreceiver_connect_timeout),
926 0 : lagging_wal_timeout: self
927 0 : .lagging_wal_timeout
928 0 : .unwrap_or(global_conf.lagging_wal_timeout),
929 0 : max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
930 0 : eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
931 0 : min_resident_size_override: self
932 0 : .min_resident_size_override
933 0 : .or(global_conf.min_resident_size_override),
934 0 : evictions_low_residence_duration_metric_threshold: self
935 0 : .evictions_low_residence_duration_metric_threshold
936 0 : .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
937 0 : heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
938 0 : lazy_slru_download: self
939 0 : .lazy_slru_download
940 0 : .unwrap_or(global_conf.lazy_slru_download),
941 0 : timeline_get_throttle: self
942 0 : .timeline_get_throttle
943 0 : .clone()
944 0 : .unwrap_or(global_conf.timeline_get_throttle),
945 0 : image_layer_creation_check_threshold: self
946 0 : .image_layer_creation_check_threshold
947 0 : .unwrap_or(global_conf.image_layer_creation_check_threshold),
948 0 : image_creation_preempt_threshold: self
949 0 : .image_creation_preempt_threshold
950 0 : .unwrap_or(global_conf.image_creation_preempt_threshold),
951 0 : lsn_lease_length: self
952 0 : .lsn_lease_length
953 0 : .unwrap_or(global_conf.lsn_lease_length),
954 0 : lsn_lease_length_for_ts: self
955 0 : .lsn_lease_length_for_ts
956 0 : .unwrap_or(global_conf.lsn_lease_length_for_ts),
957 0 : timeline_offloading: self
958 0 : .timeline_offloading
959 0 : .unwrap_or(global_conf.timeline_offloading),
960 0 : wal_receiver_protocol_override: self
961 0 : .wal_receiver_protocol_override
962 0 : .or(global_conf.wal_receiver_protocol_override),
963 0 : rel_size_v2_enabled: self
964 0 : .rel_size_v2_enabled
965 0 : .unwrap_or(global_conf.rel_size_v2_enabled),
966 0 : gc_compaction_enabled: self
967 0 : .gc_compaction_enabled
968 0 : .unwrap_or(global_conf.gc_compaction_enabled),
969 0 : gc_compaction_initial_threshold_kb: self
970 0 : .gc_compaction_initial_threshold_kb
971 0 : .unwrap_or(global_conf.gc_compaction_initial_threshold_kb),
972 0 : gc_compaction_ratio_percent: self
973 0 : .gc_compaction_ratio_percent
974 0 : .unwrap_or(global_conf.gc_compaction_ratio_percent),
975 0 : }
976 0 : }
977 : }
978 :
979 : /// The policy for the aux file storage.
980 : ///
981 : /// It can be switched through `switch_aux_file_policy` tenant config.
982 : /// When the first aux file written, the policy will be persisted in the
983 : /// `index_part.json` file and has a limited migration path.
984 : ///
985 : /// Currently, we only allow the following migration path:
986 : ///
987 : /// Unset -> V1
988 : /// -> V2
989 : /// -> CrossValidation -> V2
990 : #[derive(
991 : Eq,
992 : PartialEq,
993 : Debug,
994 : Copy,
995 : Clone,
996 0 : strum_macros::EnumString,
997 : strum_macros::Display,
998 4 : serde_with::DeserializeFromStr,
999 : serde_with::SerializeDisplay,
1000 : )]
1001 : #[strum(serialize_all = "kebab-case")]
1002 : pub enum AuxFilePolicy {
1003 : /// V1 aux file policy: store everything in AUX_FILE_KEY
1004 : #[strum(ascii_case_insensitive)]
1005 : V1,
1006 : /// V2 aux file policy: store in the AUX_FILE keyspace
1007 : #[strum(ascii_case_insensitive)]
1008 : V2,
1009 : /// Cross validation runs both formats on the write path and does validation
1010 : /// on the read path.
1011 : #[strum(ascii_case_insensitive)]
1012 : CrossValidation,
1013 : }
1014 :
1015 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1016 : #[serde(tag = "kind")]
1017 : pub enum EvictionPolicy {
1018 : NoEviction,
1019 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
1020 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
1021 : }
1022 :
1023 : impl EvictionPolicy {
1024 0 : pub fn discriminant_str(&self) -> &'static str {
1025 0 : match self {
1026 0 : EvictionPolicy::NoEviction => "NoEviction",
1027 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
1028 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
1029 : }
1030 0 : }
1031 : }
1032 :
1033 : #[derive(
1034 : Eq,
1035 : PartialEq,
1036 : Debug,
1037 : Copy,
1038 : Clone,
1039 0 : strum_macros::EnumString,
1040 : strum_macros::Display,
1041 0 : serde_with::DeserializeFromStr,
1042 : serde_with::SerializeDisplay,
1043 : )]
1044 : #[strum(serialize_all = "kebab-case")]
1045 : pub enum CompactionAlgorithm {
1046 : Legacy,
1047 : Tiered,
1048 : }
1049 :
1050 : #[derive(
1051 4 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
1052 : )]
1053 : pub enum ImageCompressionAlgorithm {
1054 : // Disabled for writes, support decompressing during read path
1055 : Disabled,
1056 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
1057 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
1058 : Zstd {
1059 : level: Option<i8>,
1060 : },
1061 : }
1062 :
1063 : impl FromStr for ImageCompressionAlgorithm {
1064 : type Err = anyhow::Error;
1065 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
1066 8 : let mut components = s.split(['(', ')']);
1067 8 : let first = components
1068 8 : .next()
1069 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
1070 8 : match first {
1071 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
1072 6 : "zstd" => {
1073 6 : let level = if let Some(v) = components.next() {
1074 4 : let v: i8 = v.parse()?;
1075 4 : Some(v)
1076 : } else {
1077 2 : None
1078 : };
1079 :
1080 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
1081 : }
1082 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
1083 : }
1084 8 : }
1085 : }
1086 :
1087 : impl Display for ImageCompressionAlgorithm {
1088 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1089 12 : match self {
1090 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
1091 9 : ImageCompressionAlgorithm::Zstd { level } => {
1092 9 : if let Some(level) = level {
1093 6 : write!(f, "zstd({})", level)
1094 : } else {
1095 3 : write!(f, "zstd")
1096 : }
1097 : }
1098 : }
1099 12 : }
1100 : }
1101 :
1102 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
1103 : pub struct CompactionAlgorithmSettings {
1104 : pub kind: CompactionAlgorithm,
1105 : }
1106 :
1107 0 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
1108 : #[serde(tag = "mode", rename_all = "kebab-case")]
1109 : pub enum L0FlushConfig {
1110 : #[serde(rename_all = "snake_case")]
1111 : Direct { max_concurrency: NonZeroUsize },
1112 : }
1113 :
1114 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1115 : pub struct EvictionPolicyLayerAccessThreshold {
1116 : #[serde(with = "humantime_serde")]
1117 : pub period: Duration,
1118 : #[serde(with = "humantime_serde")]
1119 : pub threshold: Duration,
1120 : }
1121 :
1122 6 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
1123 : pub struct ThrottleConfig {
1124 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
1125 : #[serde(rename = "task_kinds")]
1126 : pub enabled: ThrottleConfigTaskKinds,
1127 : pub initial: u32,
1128 : #[serde(with = "humantime_serde")]
1129 : pub refill_interval: Duration,
1130 : pub refill_amount: NonZeroU32,
1131 : pub max: u32,
1132 : }
1133 :
1134 : /// Before <https://github.com/neondatabase/neon/pull/9962>
1135 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
1136 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
1137 : /// were subject to the throttle.
1138 : ///
1139 : /// After that PR, the throttle is applied at pagestream request level
1140 : /// and the `task_kinds` field does not apply since the only task kind
1141 : /// that us subject to the throttle is that of the page service.
1142 : ///
1143 : /// However, we don't want to make a breaking config change right now
1144 : /// because it means we have to migrate all the tenant configs.
1145 : /// This will be done in a future PR.
1146 : ///
1147 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
1148 : /// field to determine if the throttle is enabled or not.
1149 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1150 : #[serde(transparent)]
1151 : pub struct ThrottleConfigTaskKinds(Vec<String>);
1152 :
1153 : impl ThrottleConfigTaskKinds {
1154 489 : pub fn disabled() -> Self {
1155 489 : Self(vec![])
1156 489 : }
1157 454 : pub fn is_enabled(&self) -> bool {
1158 454 : !self.0.is_empty()
1159 454 : }
1160 : }
1161 :
1162 : impl ThrottleConfig {
1163 489 : pub fn disabled() -> Self {
1164 489 : Self {
1165 489 : enabled: ThrottleConfigTaskKinds::disabled(),
1166 489 : // other values don't matter with emtpy `task_kinds`.
1167 489 : initial: 0,
1168 489 : refill_interval: Duration::from_millis(1),
1169 489 : refill_amount: NonZeroU32::new(1).unwrap(),
1170 489 : max: 1,
1171 489 : }
1172 489 : }
1173 : /// The requests per second allowed by the given config.
1174 0 : pub fn steady_rps(&self) -> f64 {
1175 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
1176 0 : }
1177 : }
1178 :
1179 : #[cfg(test)]
1180 : mod throttle_config_tests {
1181 : use super::*;
1182 :
1183 : #[test]
1184 1 : fn test_disabled_is_disabled() {
1185 1 : let config = ThrottleConfig::disabled();
1186 1 : assert!(!config.enabled.is_enabled());
1187 1 : }
1188 : #[test]
1189 1 : fn test_enabled_backwards_compat() {
1190 1 : let input = serde_json::json!({
1191 1 : "task_kinds": ["PageRequestHandler"],
1192 1 : "initial": 40000,
1193 1 : "refill_interval": "50ms",
1194 1 : "refill_amount": 1000,
1195 1 : "max": 40000,
1196 1 : "fair": true
1197 1 : });
1198 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
1199 1 : assert!(config.enabled.is_enabled());
1200 1 : }
1201 : }
1202 :
1203 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
1204 : /// lists out all possible states (and the virtual "Detached" state)
1205 : /// in a flat form rather than using rust-style enums.
1206 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
1207 : pub enum LocationConfigMode {
1208 : AttachedSingle,
1209 : AttachedMulti,
1210 : AttachedStale,
1211 : Secondary,
1212 : Detached,
1213 : }
1214 :
1215 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1216 : pub struct LocationConfigSecondary {
1217 : pub warm: bool,
1218 : }
1219 :
1220 : /// An alternative representation of `pageserver::tenant::LocationConf`,
1221 : /// for use in external-facing APIs.
1222 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1223 : pub struct LocationConfig {
1224 : pub mode: LocationConfigMode,
1225 : /// If attaching, in what generation?
1226 : #[serde(default)]
1227 : pub generation: Option<u32>,
1228 :
1229 : // If requesting mode `Secondary`, configuration for that.
1230 : #[serde(default)]
1231 : pub secondary_conf: Option<LocationConfigSecondary>,
1232 :
1233 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
1234 : // must be set accurately.
1235 : #[serde(default)]
1236 : pub shard_number: u8,
1237 : #[serde(default)]
1238 : pub shard_count: u8,
1239 : #[serde(default)]
1240 : pub shard_stripe_size: u32,
1241 :
1242 : // This configuration only affects attached mode, but should be provided irrespective
1243 : // of the mode, as a secondary location might transition on startup if the response
1244 : // to the `/re-attach` control plane API requests it.
1245 : pub tenant_conf: TenantConfig,
1246 : }
1247 :
1248 0 : #[derive(Serialize, Deserialize)]
1249 : pub struct LocationConfigListResponse {
1250 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
1251 : }
1252 :
1253 : #[derive(Serialize)]
1254 : pub struct StatusResponse {
1255 : pub id: NodeId,
1256 : }
1257 :
1258 0 : #[derive(Serialize, Deserialize, Debug)]
1259 : #[serde(deny_unknown_fields)]
1260 : pub struct TenantLocationConfigRequest {
1261 : #[serde(flatten)]
1262 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
1263 : }
1264 :
1265 0 : #[derive(Serialize, Deserialize, Debug)]
1266 : #[serde(deny_unknown_fields)]
1267 : pub struct TenantTimeTravelRequest {
1268 : pub shard_counts: Vec<ShardCount>,
1269 : }
1270 :
1271 0 : #[derive(Serialize, Deserialize, Debug)]
1272 : #[serde(deny_unknown_fields)]
1273 : pub struct TenantShardLocation {
1274 : pub shard_id: TenantShardId,
1275 : pub node_id: NodeId,
1276 : }
1277 :
1278 0 : #[derive(Serialize, Deserialize, Debug)]
1279 : #[serde(deny_unknown_fields)]
1280 : pub struct TenantLocationConfigResponse {
1281 : pub shards: Vec<TenantShardLocation>,
1282 : // If the shards' ShardCount count is >1, stripe_size will be set.
1283 : pub stripe_size: Option<ShardStripeSize>,
1284 : }
1285 :
1286 2 : #[derive(Serialize, Deserialize, Debug)]
1287 : #[serde(deny_unknown_fields)]
1288 : pub struct TenantConfigRequest {
1289 : pub tenant_id: TenantId,
1290 : #[serde(flatten)]
1291 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
1292 : }
1293 :
1294 : impl std::ops::Deref for TenantConfigRequest {
1295 : type Target = TenantConfig;
1296 :
1297 0 : fn deref(&self) -> &Self::Target {
1298 0 : &self.config
1299 0 : }
1300 : }
1301 :
1302 : impl TenantConfigRequest {
1303 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
1304 0 : let config = TenantConfig::default();
1305 0 : TenantConfigRequest { tenant_id, config }
1306 0 : }
1307 : }
1308 :
1309 3 : #[derive(Serialize, Deserialize, Debug)]
1310 : #[serde(deny_unknown_fields)]
1311 : pub struct TenantConfigPatchRequest {
1312 : pub tenant_id: TenantId,
1313 : #[serde(flatten)]
1314 : pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
1315 : }
1316 :
1317 0 : #[derive(Serialize, Deserialize, Debug)]
1318 : pub struct TenantWaitLsnRequest {
1319 : #[serde(flatten)]
1320 : pub timelines: HashMap<TimelineId, Lsn>,
1321 : pub timeout: Duration,
1322 : }
1323 :
1324 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
1325 0 : #[derive(Serialize, Deserialize, Clone)]
1326 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
1327 : pub enum TenantAttachmentStatus {
1328 : Maybe,
1329 : Attached,
1330 : Failed { reason: String },
1331 : }
1332 :
1333 0 : #[derive(Serialize, Deserialize, Clone)]
1334 : pub struct TenantInfo {
1335 : pub id: TenantShardId,
1336 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
1337 : pub state: TenantState,
1338 : /// Sum of the size of all layer files.
1339 : /// If a layer is present in both local FS and S3, it counts only once.
1340 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
1341 : pub attachment_status: TenantAttachmentStatus,
1342 : pub generation: u32,
1343 :
1344 : /// Opaque explanation if gc is being blocked.
1345 : ///
1346 : /// Only looked up for the individual tenant detail, not the listing.
1347 : #[serde(skip_serializing_if = "Option::is_none")]
1348 : pub gc_blocking: Option<String>,
1349 : }
1350 :
1351 0 : #[derive(Serialize, Deserialize, Clone)]
1352 : pub struct TenantDetails {
1353 : #[serde(flatten)]
1354 : pub tenant_info: TenantInfo,
1355 :
1356 : pub walredo: Option<WalRedoManagerStatus>,
1357 :
1358 : pub timelines: Vec<TimelineId>,
1359 : }
1360 :
1361 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
1362 : pub enum TimelineArchivalState {
1363 : Archived,
1364 : Unarchived,
1365 : }
1366 :
1367 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1368 : pub struct TimelineArchivalConfigRequest {
1369 : pub state: TimelineArchivalState,
1370 : }
1371 :
1372 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1373 : pub struct TimelinePatchIndexPartRequest {
1374 : pub rel_size_migration: Option<RelSizeMigration>,
1375 : pub gc_compaction_last_completed_lsn: Option<Lsn>,
1376 : pub applied_gc_cutoff_lsn: Option<Lsn>,
1377 : #[serde(default)]
1378 : pub force_index_update: bool,
1379 : }
1380 :
1381 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1382 : pub struct TimelinesInfoAndOffloaded {
1383 : pub timelines: Vec<TimelineInfo>,
1384 : pub offloaded: Vec<OffloadedTimelineInfo>,
1385 : }
1386 :
1387 : /// Analog of [`TimelineInfo`] for offloaded timelines.
1388 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1389 : pub struct OffloadedTimelineInfo {
1390 : pub tenant_id: TenantShardId,
1391 : pub timeline_id: TimelineId,
1392 : /// Whether the timeline has a parent it has been branched off from or not
1393 : pub ancestor_timeline_id: Option<TimelineId>,
1394 : /// Whether to retain the branch lsn at the ancestor or not
1395 : pub ancestor_retain_lsn: Option<Lsn>,
1396 : /// The time point when the timeline was archived
1397 : pub archived_at: chrono::DateTime<chrono::Utc>,
1398 : }
1399 :
1400 12 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1401 : #[serde(rename_all = "camelCase")]
1402 : pub enum RelSizeMigration {
1403 : /// The tenant is using the old rel_size format.
1404 : /// Note that this enum is persisted as `Option<RelSizeMigration>` in the index part, so
1405 : /// `None` is the same as `Some(RelSizeMigration::Legacy)`.
1406 : Legacy,
1407 : /// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
1408 : /// persisted in the index part. The read path will read both formats and merge them.
1409 : Migrating,
1410 : /// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted
1411 : /// in the index part, and the read path will not read the old format.
1412 : Migrated,
1413 : }
1414 :
1415 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
1416 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1417 : pub struct TimelineInfo {
1418 : pub tenant_id: TenantShardId,
1419 : pub timeline_id: TimelineId,
1420 :
1421 : pub ancestor_timeline_id: Option<TimelineId>,
1422 : pub ancestor_lsn: Option<Lsn>,
1423 : pub last_record_lsn: Lsn,
1424 : pub prev_record_lsn: Option<Lsn>,
1425 :
1426 : /// Legacy field, retained for one version to enable old storage controller to
1427 : /// decode (it was a mandatory field).
1428 : #[serde(default, rename = "latest_gc_cutoff_lsn")]
1429 : pub _unused: Lsn,
1430 :
1431 : /// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
1432 : /// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
1433 : /// as it is easier to reason about.
1434 : #[serde(default)]
1435 : pub applied_gc_cutoff_lsn: Lsn,
1436 :
1437 : /// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
1438 : /// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
1439 : /// LSN at which it is legal to create a branch or ephemeral endpoint.
1440 : ///
1441 : /// Note that holders of valid LSN leases may be able to create branches and read pages earlier
1442 : /// than this LSN, but new leases may not be taken out earlier than this LSN.
1443 : #[serde(default)]
1444 : pub min_readable_lsn: Lsn,
1445 :
1446 : pub disk_consistent_lsn: Lsn,
1447 :
1448 : /// The LSN that we have succesfully uploaded to remote storage
1449 : pub remote_consistent_lsn: Lsn,
1450 :
1451 : /// The LSN that we are advertizing to safekeepers
1452 : pub remote_consistent_lsn_visible: Lsn,
1453 :
1454 : /// The LSN from the start of the root timeline (never changes)
1455 : pub initdb_lsn: Lsn,
1456 :
1457 : pub current_logical_size: u64,
1458 : pub current_logical_size_is_accurate: bool,
1459 :
1460 : pub directory_entries_counts: Vec<u64>,
1461 :
1462 : /// Sum of the size of all layer files.
1463 : /// If a layer is present in both local FS and S3, it counts only once.
1464 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
1465 : pub current_logical_size_non_incremental: Option<u64>,
1466 :
1467 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
1468 : /// beyond the branch's branch point, we only count up to the branch point.
1469 : pub pitr_history_size: u64,
1470 :
1471 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
1472 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
1473 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
1474 : /// otherwise be able to GC.
1475 : pub within_ancestor_pitr: bool,
1476 :
1477 : pub timeline_dir_layer_file_size_sum: Option<u64>,
1478 :
1479 : pub wal_source_connstr: Option<String>,
1480 : pub last_received_msg_lsn: Option<Lsn>,
1481 : /// the timestamp (in microseconds) of the last received message
1482 : pub last_received_msg_ts: Option<u128>,
1483 : pub pg_version: u32,
1484 :
1485 : pub state: TimelineState,
1486 :
1487 : pub walreceiver_status: String,
1488 :
1489 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
1490 : // Backward compatibility: you will get a JSON not containing the newly-added field.
1491 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
1492 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
1493 : // read.
1494 : /// Whether the timeline is archived.
1495 : pub is_archived: Option<bool>,
1496 :
1497 : /// The status of the rel_size migration.
1498 : pub rel_size_migration: Option<RelSizeMigration>,
1499 : }
1500 :
1501 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1502 : pub struct LayerMapInfo {
1503 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
1504 : pub historic_layers: Vec<HistoricLayerInfo>,
1505 : }
1506 :
1507 : /// The residence status of a layer
1508 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1509 : pub enum LayerResidenceStatus {
1510 : /// Residence status for a layer file that exists locally.
1511 : /// It may also exist on the remote, we don't care here.
1512 : Resident,
1513 : /// Residence status for a layer file that only exists on the remote.
1514 : Evicted,
1515 : }
1516 :
1517 : #[serde_as]
1518 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1519 : pub struct LayerAccessStats {
1520 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1521 : pub access_time: SystemTime,
1522 :
1523 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1524 : pub residence_time: SystemTime,
1525 :
1526 : pub visible: bool,
1527 : }
1528 :
1529 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1530 : #[serde(tag = "kind")]
1531 : pub enum InMemoryLayerInfo {
1532 : Open { lsn_start: Lsn },
1533 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
1534 : }
1535 :
1536 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1537 : #[serde(tag = "kind")]
1538 : pub enum HistoricLayerInfo {
1539 : Delta {
1540 : layer_file_name: String,
1541 : layer_file_size: u64,
1542 :
1543 : lsn_start: Lsn,
1544 : lsn_end: Lsn,
1545 : remote: bool,
1546 : access_stats: LayerAccessStats,
1547 :
1548 : l0: bool,
1549 : },
1550 : Image {
1551 : layer_file_name: String,
1552 : layer_file_size: u64,
1553 :
1554 : lsn_start: Lsn,
1555 : remote: bool,
1556 : access_stats: LayerAccessStats,
1557 : },
1558 : }
1559 :
1560 : impl HistoricLayerInfo {
1561 0 : pub fn layer_file_name(&self) -> &str {
1562 0 : match self {
1563 : HistoricLayerInfo::Delta {
1564 0 : layer_file_name, ..
1565 0 : } => layer_file_name,
1566 : HistoricLayerInfo::Image {
1567 0 : layer_file_name, ..
1568 0 : } => layer_file_name,
1569 : }
1570 0 : }
1571 0 : pub fn is_remote(&self) -> bool {
1572 0 : match self {
1573 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
1574 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
1575 : }
1576 0 : }
1577 0 : pub fn set_remote(&mut self, value: bool) {
1578 0 : let field = match self {
1579 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
1580 0 : HistoricLayerInfo::Image { remote, .. } => remote,
1581 : };
1582 0 : *field = value;
1583 0 : }
1584 0 : pub fn layer_file_size(&self) -> u64 {
1585 0 : match self {
1586 : HistoricLayerInfo::Delta {
1587 0 : layer_file_size, ..
1588 0 : } => *layer_file_size,
1589 : HistoricLayerInfo::Image {
1590 0 : layer_file_size, ..
1591 0 : } => *layer_file_size,
1592 : }
1593 0 : }
1594 : }
1595 :
1596 0 : #[derive(Debug, Serialize, Deserialize)]
1597 : pub struct DownloadRemoteLayersTaskSpawnRequest {
1598 : pub max_concurrent_downloads: NonZeroUsize,
1599 : }
1600 :
1601 0 : #[derive(Debug, Serialize, Deserialize)]
1602 : pub struct IngestAuxFilesRequest {
1603 : pub aux_files: HashMap<String, String>,
1604 : }
1605 :
1606 0 : #[derive(Debug, Serialize, Deserialize)]
1607 : pub struct ListAuxFilesRequest {
1608 : pub lsn: Lsn,
1609 : }
1610 :
1611 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1612 : pub struct DownloadRemoteLayersTaskInfo {
1613 : pub task_id: String,
1614 : pub state: DownloadRemoteLayersTaskState,
1615 : pub total_layer_count: u64, // stable once `completed`
1616 : pub successful_download_count: u64, // stable once `completed`
1617 : pub failed_download_count: u64, // stable once `completed`
1618 : }
1619 :
1620 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1621 : pub enum DownloadRemoteLayersTaskState {
1622 : Running,
1623 : Completed,
1624 : ShutDown,
1625 : }
1626 :
1627 0 : #[derive(Debug, Serialize, Deserialize)]
1628 : pub struct TimelineGcRequest {
1629 : pub gc_horizon: Option<u64>,
1630 : }
1631 :
1632 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1633 : pub struct WalRedoManagerProcessStatus {
1634 : pub pid: u32,
1635 : }
1636 :
1637 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1638 : pub struct WalRedoManagerStatus {
1639 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
1640 : pub process: Option<WalRedoManagerProcessStatus>,
1641 : }
1642 :
1643 : /// The progress of a secondary tenant.
1644 : ///
1645 : /// It is mostly useful when doing a long running download: e.g. initiating
1646 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
1647 : /// what's happening.
1648 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
1649 : pub struct SecondaryProgress {
1650 : /// The remote storage LastModified time of the heatmap object we last downloaded.
1651 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
1652 :
1653 : /// The number of layers currently on-disk
1654 : pub layers_downloaded: usize,
1655 : /// The number of layers in the most recently seen heatmap
1656 : pub layers_total: usize,
1657 :
1658 : /// The number of layer bytes currently on-disk
1659 : pub bytes_downloaded: u64,
1660 : /// The number of layer bytes in the most recently seen heatmap
1661 : pub bytes_total: u64,
1662 : }
1663 :
1664 0 : #[derive(Serialize, Deserialize, Debug)]
1665 : pub struct TenantScanRemoteStorageShard {
1666 : pub tenant_shard_id: TenantShardId,
1667 : pub generation: Option<u32>,
1668 : }
1669 :
1670 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1671 : pub struct TenantScanRemoteStorageResponse {
1672 : pub shards: Vec<TenantScanRemoteStorageShard>,
1673 : }
1674 :
1675 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1676 : #[serde(rename_all = "snake_case")]
1677 : pub enum TenantSorting {
1678 : /// Total size of layers on local disk for all timelines in a shard.
1679 : ResidentSize,
1680 : /// The logical size of the largest timeline within a _tenant_ (not shard). Only tracked on
1681 : /// shard 0, contains the sum across all shards.
1682 : MaxLogicalSize,
1683 : /// The logical size of the largest timeline within a _tenant_ (not shard), divided by number of
1684 : /// shards. Only tracked on shard 0, and estimates the per-shard logical size.
1685 : MaxLogicalSizePerShard,
1686 : }
1687 :
1688 : impl Default for TenantSorting {
1689 0 : fn default() -> Self {
1690 0 : Self::ResidentSize
1691 0 : }
1692 : }
1693 :
1694 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1695 : pub struct TopTenantShardsRequest {
1696 : // How would you like to sort the tenants?
1697 : pub order_by: TenantSorting,
1698 :
1699 : // How many results?
1700 : pub limit: usize,
1701 :
1702 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1703 : // that the caller would ever split to)
1704 : pub where_shards_lt: Option<ShardCount>,
1705 :
1706 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1707 : // let us quickly exclude numerous tiny shards)
1708 : pub where_gt: Option<u64>,
1709 : }
1710 :
1711 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1712 : pub struct TopTenantShardItem {
1713 : pub id: TenantShardId,
1714 :
1715 : /// Total size of layers on local disk for all timelines in this shard.
1716 : pub resident_size: u64,
1717 :
1718 : /// Total size of layers in remote storage for all timelines in this shard.
1719 : pub physical_size: u64,
1720 :
1721 : /// The largest logical size of a timeline within this _tenant_ (not shard). This is only
1722 : /// tracked on shard 0, and contains the sum of the logical size across all shards.
1723 : pub max_logical_size: u64,
1724 :
1725 : /// The largest logical size of a timeline within this _tenant_ (not shard) divided by number of
1726 : /// shards. This is only tracked on shard 0, and is only an estimate as we divide it evenly by
1727 : /// shard count, rounded up.
1728 : pub max_logical_size_per_shard: u64,
1729 : }
1730 :
1731 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1732 : pub struct TopTenantShardsResponse {
1733 : pub shards: Vec<TopTenantShardItem>,
1734 : }
1735 :
1736 : pub mod virtual_file {
1737 : #[derive(
1738 : Copy,
1739 : Clone,
1740 : PartialEq,
1741 : Eq,
1742 : Hash,
1743 0 : strum_macros::EnumString,
1744 : strum_macros::Display,
1745 0 : serde_with::DeserializeFromStr,
1746 : serde_with::SerializeDisplay,
1747 : Debug,
1748 : )]
1749 : #[strum(serialize_all = "kebab-case")]
1750 : pub enum IoEngineKind {
1751 : StdFs,
1752 : #[cfg(target_os = "linux")]
1753 : TokioEpollUring,
1754 : }
1755 :
1756 : /// Direct IO modes for a pageserver.
1757 : #[derive(
1758 : Copy,
1759 : Clone,
1760 : PartialEq,
1761 : Eq,
1762 : Hash,
1763 0 : strum_macros::EnumString,
1764 : strum_macros::Display,
1765 0 : serde_with::DeserializeFromStr,
1766 : serde_with::SerializeDisplay,
1767 : Debug,
1768 : )]
1769 : #[strum(serialize_all = "kebab-case")]
1770 : #[repr(u8)]
1771 : pub enum IoMode {
1772 : /// Uses buffered IO.
1773 : Buffered,
1774 : /// Uses direct IO, error out if the operation fails.
1775 : #[cfg(target_os = "linux")]
1776 : Direct,
1777 : }
1778 :
1779 : impl IoMode {
1780 488 : pub const fn preferred() -> Self {
1781 488 : Self::Buffered
1782 488 : }
1783 : }
1784 :
1785 : impl TryFrom<u8> for IoMode {
1786 : type Error = u8;
1787 :
1788 5096 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1789 5096 : Ok(match value {
1790 5096 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1791 : #[cfg(target_os = "linux")]
1792 0 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1793 0 : x => return Err(x),
1794 : })
1795 5096 : }
1796 : }
1797 : }
1798 :
1799 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1800 : pub struct ScanDisposableKeysResponse {
1801 : pub disposable_count: usize,
1802 : pub not_disposable_count: usize,
1803 : }
1804 :
1805 : // Wrapped in libpq CopyData
1806 : #[derive(PartialEq, Eq, Debug)]
1807 : pub enum PagestreamFeMessage {
1808 : Exists(PagestreamExistsRequest),
1809 : Nblocks(PagestreamNblocksRequest),
1810 : GetPage(PagestreamGetPageRequest),
1811 : DbSize(PagestreamDbSizeRequest),
1812 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1813 : #[cfg(feature = "testing")]
1814 : Test(PagestreamTestRequest),
1815 : }
1816 :
1817 : // Wrapped in libpq CopyData
1818 : #[derive(strum_macros::EnumProperty)]
1819 : pub enum PagestreamBeMessage {
1820 : Exists(PagestreamExistsResponse),
1821 : Nblocks(PagestreamNblocksResponse),
1822 : GetPage(PagestreamGetPageResponse),
1823 : Error(PagestreamErrorResponse),
1824 : DbSize(PagestreamDbSizeResponse),
1825 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1826 : #[cfg(feature = "testing")]
1827 : Test(PagestreamTestResponse),
1828 : }
1829 :
1830 : // Keep in sync with `pagestore_client.h`
1831 : #[repr(u8)]
1832 : enum PagestreamFeMessageTag {
1833 : Exists = 0,
1834 : Nblocks = 1,
1835 : GetPage = 2,
1836 : DbSize = 3,
1837 : GetSlruSegment = 4,
1838 : /* future tags above this line */
1839 : /// For testing purposes, not available in production.
1840 : #[cfg(feature = "testing")]
1841 : Test = 99,
1842 : }
1843 :
1844 : // Keep in sync with `pagestore_client.h`
1845 : #[repr(u8)]
1846 : enum PagestreamBeMessageTag {
1847 : Exists = 100,
1848 : Nblocks = 101,
1849 : GetPage = 102,
1850 : Error = 103,
1851 : DbSize = 104,
1852 : GetSlruSegment = 105,
1853 : /* future tags above this line */
1854 : /// For testing purposes, not available in production.
1855 : #[cfg(feature = "testing")]
1856 : Test = 199,
1857 : }
1858 :
1859 : impl TryFrom<u8> for PagestreamFeMessageTag {
1860 : type Error = u8;
1861 4 : fn try_from(value: u8) -> Result<Self, u8> {
1862 4 : match value {
1863 1 : 0 => Ok(PagestreamFeMessageTag::Exists),
1864 1 : 1 => Ok(PagestreamFeMessageTag::Nblocks),
1865 1 : 2 => Ok(PagestreamFeMessageTag::GetPage),
1866 1 : 3 => Ok(PagestreamFeMessageTag::DbSize),
1867 0 : 4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
1868 : #[cfg(feature = "testing")]
1869 0 : 99 => Ok(PagestreamFeMessageTag::Test),
1870 0 : _ => Err(value),
1871 : }
1872 4 : }
1873 : }
1874 :
1875 : impl TryFrom<u8> for PagestreamBeMessageTag {
1876 : type Error = u8;
1877 0 : fn try_from(value: u8) -> Result<Self, u8> {
1878 0 : match value {
1879 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1880 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1881 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1882 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1883 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1884 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1885 : #[cfg(feature = "testing")]
1886 0 : 199 => Ok(PagestreamBeMessageTag::Test),
1887 0 : _ => Err(value),
1888 : }
1889 0 : }
1890 : }
1891 :
1892 : // A GetPage request contains two LSN values:
1893 : //
1894 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
1895 : // "get the latest version present". It's used by the primary server, which knows that no one else
1896 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
1897 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
1898 : //
1899 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
1900 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
1901 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
1902 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
1903 : // request without waiting for 'request_lsn' to arrive.
1904 : //
1905 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
1906 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
1907 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
1908 : // standby to request a page at a particular non-latest LSN, and also include the
1909 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
1910 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
1911 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
1912 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
1913 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
1914 : // difference in the responses between V1 and V2.
1915 : //
1916 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
1917 : // as well as other fields from requests, which allows to verify that we receive response for our request.
1918 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
1919 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
1920 : //
1921 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1922 : pub enum PagestreamProtocolVersion {
1923 : V2,
1924 : V3,
1925 : }
1926 :
1927 : pub type RequestId = u64;
1928 :
1929 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1930 : pub struct PagestreamRequest {
1931 : pub reqid: RequestId,
1932 : pub request_lsn: Lsn,
1933 : pub not_modified_since: Lsn,
1934 : }
1935 :
1936 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1937 : pub struct PagestreamExistsRequest {
1938 : pub hdr: PagestreamRequest,
1939 : pub rel: RelTag,
1940 : }
1941 :
1942 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1943 : pub struct PagestreamNblocksRequest {
1944 : pub hdr: PagestreamRequest,
1945 : pub rel: RelTag,
1946 : }
1947 :
1948 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1949 : pub struct PagestreamGetPageRequest {
1950 : pub hdr: PagestreamRequest,
1951 : pub rel: RelTag,
1952 : pub blkno: u32,
1953 : }
1954 :
1955 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1956 : pub struct PagestreamDbSizeRequest {
1957 : pub hdr: PagestreamRequest,
1958 : pub dbnode: u32,
1959 : }
1960 :
1961 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1962 : pub struct PagestreamGetSlruSegmentRequest {
1963 : pub hdr: PagestreamRequest,
1964 : pub kind: u8,
1965 : pub segno: u32,
1966 : }
1967 :
1968 : #[derive(Debug)]
1969 : pub struct PagestreamExistsResponse {
1970 : pub req: PagestreamExistsRequest,
1971 : pub exists: bool,
1972 : }
1973 :
1974 : #[derive(Debug)]
1975 : pub struct PagestreamNblocksResponse {
1976 : pub req: PagestreamNblocksRequest,
1977 : pub n_blocks: u32,
1978 : }
1979 :
1980 : #[derive(Debug)]
1981 : pub struct PagestreamGetPageResponse {
1982 : pub req: PagestreamGetPageRequest,
1983 : pub page: Bytes,
1984 : }
1985 :
1986 : #[derive(Debug)]
1987 : pub struct PagestreamGetSlruSegmentResponse {
1988 : pub req: PagestreamGetSlruSegmentRequest,
1989 : pub segment: Bytes,
1990 : }
1991 :
1992 : #[derive(Debug)]
1993 : pub struct PagestreamErrorResponse {
1994 : pub req: PagestreamRequest,
1995 : pub message: String,
1996 : }
1997 :
1998 : #[derive(Debug)]
1999 : pub struct PagestreamDbSizeResponse {
2000 : pub req: PagestreamDbSizeRequest,
2001 : pub db_size: i64,
2002 : }
2003 :
2004 : #[cfg(feature = "testing")]
2005 : #[derive(Debug, PartialEq, Eq, Clone)]
2006 : pub struct PagestreamTestRequest {
2007 : pub hdr: PagestreamRequest,
2008 : pub batch_key: u64,
2009 : pub message: String,
2010 : }
2011 :
2012 : #[cfg(feature = "testing")]
2013 : #[derive(Debug)]
2014 : pub struct PagestreamTestResponse {
2015 : pub req: PagestreamTestRequest,
2016 : }
2017 :
2018 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
2019 : // that require pageserver-internal types. It is sufficient to get the total size.
2020 0 : #[derive(Serialize, Deserialize, Debug)]
2021 : pub struct TenantHistorySize {
2022 : pub id: TenantId,
2023 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
2024 : ///
2025 : /// Will be none if `?inputs_only=true` was given.
2026 : pub size: Option<u64>,
2027 : }
2028 :
2029 : impl PagestreamFeMessage {
2030 : /// Serialize a compute -> pageserver message. This is currently only used in testing
2031 : /// tools. Always uses protocol version 3.
2032 4 : pub fn serialize(&self) -> Bytes {
2033 4 : let mut bytes = BytesMut::new();
2034 4 :
2035 4 : match self {
2036 1 : Self::Exists(req) => {
2037 1 : bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
2038 1 : bytes.put_u64(req.hdr.reqid);
2039 1 : bytes.put_u64(req.hdr.request_lsn.0);
2040 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2041 1 : bytes.put_u32(req.rel.spcnode);
2042 1 : bytes.put_u32(req.rel.dbnode);
2043 1 : bytes.put_u32(req.rel.relnode);
2044 1 : bytes.put_u8(req.rel.forknum);
2045 1 : }
2046 :
2047 1 : Self::Nblocks(req) => {
2048 1 : bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
2049 1 : bytes.put_u64(req.hdr.reqid);
2050 1 : bytes.put_u64(req.hdr.request_lsn.0);
2051 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2052 1 : bytes.put_u32(req.rel.spcnode);
2053 1 : bytes.put_u32(req.rel.dbnode);
2054 1 : bytes.put_u32(req.rel.relnode);
2055 1 : bytes.put_u8(req.rel.forknum);
2056 1 : }
2057 :
2058 1 : Self::GetPage(req) => {
2059 1 : bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
2060 1 : bytes.put_u64(req.hdr.reqid);
2061 1 : bytes.put_u64(req.hdr.request_lsn.0);
2062 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2063 1 : bytes.put_u32(req.rel.spcnode);
2064 1 : bytes.put_u32(req.rel.dbnode);
2065 1 : bytes.put_u32(req.rel.relnode);
2066 1 : bytes.put_u8(req.rel.forknum);
2067 1 : bytes.put_u32(req.blkno);
2068 1 : }
2069 :
2070 1 : Self::DbSize(req) => {
2071 1 : bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
2072 1 : bytes.put_u64(req.hdr.reqid);
2073 1 : bytes.put_u64(req.hdr.request_lsn.0);
2074 1 : bytes.put_u64(req.hdr.not_modified_since.0);
2075 1 : bytes.put_u32(req.dbnode);
2076 1 : }
2077 :
2078 0 : Self::GetSlruSegment(req) => {
2079 0 : bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
2080 0 : bytes.put_u64(req.hdr.reqid);
2081 0 : bytes.put_u64(req.hdr.request_lsn.0);
2082 0 : bytes.put_u64(req.hdr.not_modified_since.0);
2083 0 : bytes.put_u8(req.kind);
2084 0 : bytes.put_u32(req.segno);
2085 0 : }
2086 : #[cfg(feature = "testing")]
2087 0 : Self::Test(req) => {
2088 0 : bytes.put_u8(PagestreamFeMessageTag::Test as u8);
2089 0 : bytes.put_u64(req.hdr.reqid);
2090 0 : bytes.put_u64(req.hdr.request_lsn.0);
2091 0 : bytes.put_u64(req.hdr.not_modified_since.0);
2092 0 : bytes.put_u64(req.batch_key);
2093 0 : let message = req.message.as_bytes();
2094 0 : bytes.put_u64(message.len() as u64);
2095 0 : bytes.put_slice(message);
2096 0 : }
2097 : }
2098 :
2099 4 : bytes.into()
2100 4 : }
2101 :
2102 4 : pub fn parse<R: std::io::Read>(
2103 4 : body: &mut R,
2104 4 : protocol_version: PagestreamProtocolVersion,
2105 4 : ) -> anyhow::Result<PagestreamFeMessage> {
2106 : // these correspond to the NeonMessageTag enum in pagestore_client.h
2107 : //
2108 : // TODO: consider using protobuf or serde bincode for less error prone
2109 : // serialization.
2110 4 : let msg_tag = body.read_u8()?;
2111 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
2112 : PagestreamProtocolVersion::V2 => (
2113 : 0,
2114 0 : Lsn::from(body.read_u64::<BigEndian>()?),
2115 0 : Lsn::from(body.read_u64::<BigEndian>()?),
2116 : ),
2117 : PagestreamProtocolVersion::V3 => (
2118 4 : body.read_u64::<BigEndian>()?,
2119 4 : Lsn::from(body.read_u64::<BigEndian>()?),
2120 4 : Lsn::from(body.read_u64::<BigEndian>()?),
2121 : ),
2122 : };
2123 :
2124 4 : match PagestreamFeMessageTag::try_from(msg_tag)
2125 4 : .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
2126 : {
2127 : PagestreamFeMessageTag::Exists => {
2128 : Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
2129 1 : hdr: PagestreamRequest {
2130 1 : reqid,
2131 1 : request_lsn,
2132 1 : not_modified_since,
2133 1 : },
2134 1 : rel: RelTag {
2135 1 : spcnode: body.read_u32::<BigEndian>()?,
2136 1 : dbnode: body.read_u32::<BigEndian>()?,
2137 1 : relnode: body.read_u32::<BigEndian>()?,
2138 1 : forknum: body.read_u8()?,
2139 : },
2140 : }))
2141 : }
2142 : PagestreamFeMessageTag::Nblocks => {
2143 : Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2144 1 : hdr: PagestreamRequest {
2145 1 : reqid,
2146 1 : request_lsn,
2147 1 : not_modified_since,
2148 1 : },
2149 1 : rel: RelTag {
2150 1 : spcnode: body.read_u32::<BigEndian>()?,
2151 1 : dbnode: body.read_u32::<BigEndian>()?,
2152 1 : relnode: body.read_u32::<BigEndian>()?,
2153 1 : forknum: body.read_u8()?,
2154 : },
2155 : }))
2156 : }
2157 : PagestreamFeMessageTag::GetPage => {
2158 : Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2159 1 : hdr: PagestreamRequest {
2160 1 : reqid,
2161 1 : request_lsn,
2162 1 : not_modified_since,
2163 1 : },
2164 1 : rel: RelTag {
2165 1 : spcnode: body.read_u32::<BigEndian>()?,
2166 1 : dbnode: body.read_u32::<BigEndian>()?,
2167 1 : relnode: body.read_u32::<BigEndian>()?,
2168 1 : forknum: body.read_u8()?,
2169 : },
2170 1 : blkno: body.read_u32::<BigEndian>()?,
2171 : }))
2172 : }
2173 : PagestreamFeMessageTag::DbSize => {
2174 : Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2175 1 : hdr: PagestreamRequest {
2176 1 : reqid,
2177 1 : request_lsn,
2178 1 : not_modified_since,
2179 1 : },
2180 1 : dbnode: body.read_u32::<BigEndian>()?,
2181 : }))
2182 : }
2183 : PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
2184 : PagestreamGetSlruSegmentRequest {
2185 0 : hdr: PagestreamRequest {
2186 0 : reqid,
2187 0 : request_lsn,
2188 0 : not_modified_since,
2189 0 : },
2190 0 : kind: body.read_u8()?,
2191 0 : segno: body.read_u32::<BigEndian>()?,
2192 : },
2193 : )),
2194 : #[cfg(feature = "testing")]
2195 : PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
2196 0 : hdr: PagestreamRequest {
2197 0 : reqid,
2198 0 : request_lsn,
2199 0 : not_modified_since,
2200 0 : },
2201 0 : batch_key: body.read_u64::<BigEndian>()?,
2202 : message: {
2203 0 : let len = body.read_u64::<BigEndian>()?;
2204 0 : let mut buf = vec![0; len as usize];
2205 0 : body.read_exact(&mut buf)?;
2206 0 : String::from_utf8(buf)?
2207 : },
2208 : })),
2209 : }
2210 4 : }
2211 : }
2212 :
2213 : impl PagestreamBeMessage {
2214 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
2215 0 : let mut bytes = BytesMut::new();
2216 :
2217 : use PagestreamBeMessageTag as Tag;
2218 0 : match protocol_version {
2219 : PagestreamProtocolVersion::V2 => {
2220 0 : match self {
2221 0 : Self::Exists(resp) => {
2222 0 : bytes.put_u8(Tag::Exists as u8);
2223 0 : bytes.put_u8(resp.exists as u8);
2224 0 : }
2225 :
2226 0 : Self::Nblocks(resp) => {
2227 0 : bytes.put_u8(Tag::Nblocks as u8);
2228 0 : bytes.put_u32(resp.n_blocks);
2229 0 : }
2230 :
2231 0 : Self::GetPage(resp) => {
2232 0 : bytes.put_u8(Tag::GetPage as u8);
2233 0 : bytes.put(&resp.page[..])
2234 : }
2235 :
2236 0 : Self::Error(resp) => {
2237 0 : bytes.put_u8(Tag::Error as u8);
2238 0 : bytes.put(resp.message.as_bytes());
2239 0 : bytes.put_u8(0); // null terminator
2240 0 : }
2241 0 : Self::DbSize(resp) => {
2242 0 : bytes.put_u8(Tag::DbSize as u8);
2243 0 : bytes.put_i64(resp.db_size);
2244 0 : }
2245 :
2246 0 : Self::GetSlruSegment(resp) => {
2247 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2248 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2249 0 : bytes.put(&resp.segment[..]);
2250 0 : }
2251 :
2252 : #[cfg(feature = "testing")]
2253 0 : Self::Test(resp) => {
2254 0 : bytes.put_u8(Tag::Test as u8);
2255 0 : bytes.put_u64(resp.req.batch_key);
2256 0 : let message = resp.req.message.as_bytes();
2257 0 : bytes.put_u64(message.len() as u64);
2258 0 : bytes.put_slice(message);
2259 0 : }
2260 : }
2261 : }
2262 : PagestreamProtocolVersion::V3 => {
2263 0 : match self {
2264 0 : Self::Exists(resp) => {
2265 0 : bytes.put_u8(Tag::Exists as u8);
2266 0 : bytes.put_u64(resp.req.hdr.reqid);
2267 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2268 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2269 0 : bytes.put_u32(resp.req.rel.spcnode);
2270 0 : bytes.put_u32(resp.req.rel.dbnode);
2271 0 : bytes.put_u32(resp.req.rel.relnode);
2272 0 : bytes.put_u8(resp.req.rel.forknum);
2273 0 : bytes.put_u8(resp.exists as u8);
2274 0 : }
2275 :
2276 0 : Self::Nblocks(resp) => {
2277 0 : bytes.put_u8(Tag::Nblocks as u8);
2278 0 : bytes.put_u64(resp.req.hdr.reqid);
2279 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2280 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2281 0 : bytes.put_u32(resp.req.rel.spcnode);
2282 0 : bytes.put_u32(resp.req.rel.dbnode);
2283 0 : bytes.put_u32(resp.req.rel.relnode);
2284 0 : bytes.put_u8(resp.req.rel.forknum);
2285 0 : bytes.put_u32(resp.n_blocks);
2286 0 : }
2287 :
2288 0 : Self::GetPage(resp) => {
2289 0 : bytes.put_u8(Tag::GetPage as u8);
2290 0 : bytes.put_u64(resp.req.hdr.reqid);
2291 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2292 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2293 0 : bytes.put_u32(resp.req.rel.spcnode);
2294 0 : bytes.put_u32(resp.req.rel.dbnode);
2295 0 : bytes.put_u32(resp.req.rel.relnode);
2296 0 : bytes.put_u8(resp.req.rel.forknum);
2297 0 : bytes.put_u32(resp.req.blkno);
2298 0 : bytes.put(&resp.page[..])
2299 : }
2300 :
2301 0 : Self::Error(resp) => {
2302 0 : bytes.put_u8(Tag::Error as u8);
2303 0 : bytes.put_u64(resp.req.reqid);
2304 0 : bytes.put_u64(resp.req.request_lsn.0);
2305 0 : bytes.put_u64(resp.req.not_modified_since.0);
2306 0 : bytes.put(resp.message.as_bytes());
2307 0 : bytes.put_u8(0); // null terminator
2308 0 : }
2309 0 : Self::DbSize(resp) => {
2310 0 : bytes.put_u8(Tag::DbSize as u8);
2311 0 : bytes.put_u64(resp.req.hdr.reqid);
2312 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2313 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2314 0 : bytes.put_u32(resp.req.dbnode);
2315 0 : bytes.put_i64(resp.db_size);
2316 0 : }
2317 :
2318 0 : Self::GetSlruSegment(resp) => {
2319 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2320 0 : bytes.put_u64(resp.req.hdr.reqid);
2321 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2322 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2323 0 : bytes.put_u8(resp.req.kind);
2324 0 : bytes.put_u32(resp.req.segno);
2325 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2326 0 : bytes.put(&resp.segment[..]);
2327 0 : }
2328 :
2329 : #[cfg(feature = "testing")]
2330 0 : Self::Test(resp) => {
2331 0 : bytes.put_u8(Tag::Test as u8);
2332 0 : bytes.put_u64(resp.req.hdr.reqid);
2333 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2334 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2335 0 : bytes.put_u64(resp.req.batch_key);
2336 0 : let message = resp.req.message.as_bytes();
2337 0 : bytes.put_u64(message.len() as u64);
2338 0 : bytes.put_slice(message);
2339 0 : }
2340 : }
2341 : }
2342 : }
2343 0 : bytes.into()
2344 0 : }
2345 :
2346 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
2347 0 : let mut buf = buf.reader();
2348 0 : let msg_tag = buf.read_u8()?;
2349 :
2350 : use PagestreamBeMessageTag as Tag;
2351 0 : let ok =
2352 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
2353 : Tag::Exists => {
2354 0 : let reqid = buf.read_u64::<BigEndian>()?;
2355 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2356 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2357 0 : let rel = RelTag {
2358 0 : spcnode: buf.read_u32::<BigEndian>()?,
2359 0 : dbnode: buf.read_u32::<BigEndian>()?,
2360 0 : relnode: buf.read_u32::<BigEndian>()?,
2361 0 : forknum: buf.read_u8()?,
2362 : };
2363 0 : let exists = buf.read_u8()? != 0;
2364 0 : Self::Exists(PagestreamExistsResponse {
2365 0 : req: PagestreamExistsRequest {
2366 0 : hdr: PagestreamRequest {
2367 0 : reqid,
2368 0 : request_lsn,
2369 0 : not_modified_since,
2370 0 : },
2371 0 : rel,
2372 0 : },
2373 0 : exists,
2374 0 : })
2375 : }
2376 : Tag::Nblocks => {
2377 0 : let reqid = buf.read_u64::<BigEndian>()?;
2378 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2379 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2380 0 : let rel = RelTag {
2381 0 : spcnode: buf.read_u32::<BigEndian>()?,
2382 0 : dbnode: buf.read_u32::<BigEndian>()?,
2383 0 : relnode: buf.read_u32::<BigEndian>()?,
2384 0 : forknum: buf.read_u8()?,
2385 : };
2386 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2387 0 : Self::Nblocks(PagestreamNblocksResponse {
2388 0 : req: PagestreamNblocksRequest {
2389 0 : hdr: PagestreamRequest {
2390 0 : reqid,
2391 0 : request_lsn,
2392 0 : not_modified_since,
2393 0 : },
2394 0 : rel,
2395 0 : },
2396 0 : n_blocks,
2397 0 : })
2398 : }
2399 : Tag::GetPage => {
2400 0 : let reqid = buf.read_u64::<BigEndian>()?;
2401 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2402 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2403 0 : let rel = RelTag {
2404 0 : spcnode: buf.read_u32::<BigEndian>()?,
2405 0 : dbnode: buf.read_u32::<BigEndian>()?,
2406 0 : relnode: buf.read_u32::<BigEndian>()?,
2407 0 : forknum: buf.read_u8()?,
2408 : };
2409 0 : let blkno = buf.read_u32::<BigEndian>()?;
2410 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
2411 0 : buf.read_exact(&mut page)?;
2412 0 : Self::GetPage(PagestreamGetPageResponse {
2413 0 : req: PagestreamGetPageRequest {
2414 0 : hdr: PagestreamRequest {
2415 0 : reqid,
2416 0 : request_lsn,
2417 0 : not_modified_since,
2418 0 : },
2419 0 : rel,
2420 0 : blkno,
2421 0 : },
2422 0 : page: page.into(),
2423 0 : })
2424 : }
2425 : Tag::Error => {
2426 0 : let reqid = buf.read_u64::<BigEndian>()?;
2427 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2428 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2429 0 : let mut msg = Vec::new();
2430 0 : buf.read_until(0, &mut msg)?;
2431 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
2432 0 : let rust_str = cstring.to_str()?;
2433 0 : Self::Error(PagestreamErrorResponse {
2434 0 : req: PagestreamRequest {
2435 0 : reqid,
2436 0 : request_lsn,
2437 0 : not_modified_since,
2438 0 : },
2439 0 : message: rust_str.to_owned(),
2440 0 : })
2441 : }
2442 : Tag::DbSize => {
2443 0 : let reqid = buf.read_u64::<BigEndian>()?;
2444 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2445 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2446 0 : let dbnode = buf.read_u32::<BigEndian>()?;
2447 0 : let db_size = buf.read_i64::<BigEndian>()?;
2448 0 : Self::DbSize(PagestreamDbSizeResponse {
2449 0 : req: PagestreamDbSizeRequest {
2450 0 : hdr: PagestreamRequest {
2451 0 : reqid,
2452 0 : request_lsn,
2453 0 : not_modified_since,
2454 0 : },
2455 0 : dbnode,
2456 0 : },
2457 0 : db_size,
2458 0 : })
2459 : }
2460 : Tag::GetSlruSegment => {
2461 0 : let reqid = buf.read_u64::<BigEndian>()?;
2462 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2463 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2464 0 : let kind = buf.read_u8()?;
2465 0 : let segno = buf.read_u32::<BigEndian>()?;
2466 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2467 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
2468 0 : buf.read_exact(&mut segment)?;
2469 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
2470 0 : req: PagestreamGetSlruSegmentRequest {
2471 0 : hdr: PagestreamRequest {
2472 0 : reqid,
2473 0 : request_lsn,
2474 0 : not_modified_since,
2475 0 : },
2476 0 : kind,
2477 0 : segno,
2478 0 : },
2479 0 : segment: segment.into(),
2480 0 : })
2481 : }
2482 : #[cfg(feature = "testing")]
2483 : Tag::Test => {
2484 0 : let reqid = buf.read_u64::<BigEndian>()?;
2485 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2486 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2487 0 : let batch_key = buf.read_u64::<BigEndian>()?;
2488 0 : let len = buf.read_u64::<BigEndian>()?;
2489 0 : let mut msg = vec![0; len as usize];
2490 0 : buf.read_exact(&mut msg)?;
2491 0 : let message = String::from_utf8(msg)?;
2492 0 : Self::Test(PagestreamTestResponse {
2493 0 : req: PagestreamTestRequest {
2494 0 : hdr: PagestreamRequest {
2495 0 : reqid,
2496 0 : request_lsn,
2497 0 : not_modified_since,
2498 0 : },
2499 0 : batch_key,
2500 0 : message,
2501 0 : },
2502 0 : })
2503 : }
2504 : };
2505 0 : let remaining = buf.into_inner();
2506 0 : if !remaining.is_empty() {
2507 0 : anyhow::bail!(
2508 0 : "remaining bytes in msg with tag={msg_tag}: {}",
2509 0 : remaining.len()
2510 0 : );
2511 0 : }
2512 0 : Ok(ok)
2513 0 : }
2514 :
2515 0 : pub fn kind(&self) -> &'static str {
2516 0 : match self {
2517 0 : Self::Exists(_) => "Exists",
2518 0 : Self::Nblocks(_) => "Nblocks",
2519 0 : Self::GetPage(_) => "GetPage",
2520 0 : Self::Error(_) => "Error",
2521 0 : Self::DbSize(_) => "DbSize",
2522 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
2523 : #[cfg(feature = "testing")]
2524 0 : Self::Test(_) => "Test",
2525 : }
2526 0 : }
2527 : }
2528 :
2529 0 : #[derive(Debug, Serialize, Deserialize)]
2530 : pub struct PageTraceEvent {
2531 : pub key: CompactKey,
2532 : pub effective_lsn: Lsn,
2533 : pub time: SystemTime,
2534 : }
2535 :
2536 : impl Default for PageTraceEvent {
2537 0 : fn default() -> Self {
2538 0 : Self {
2539 0 : key: Default::default(),
2540 0 : effective_lsn: Default::default(),
2541 0 : time: std::time::UNIX_EPOCH,
2542 0 : }
2543 0 : }
2544 : }
2545 :
2546 : #[cfg(test)]
2547 : mod tests {
2548 : use std::str::FromStr;
2549 :
2550 : use serde_json::json;
2551 :
2552 : use super::*;
2553 :
2554 : #[test]
2555 1 : fn test_pagestream() {
2556 1 : // Test serialization/deserialization of PagestreamFeMessage
2557 1 : let messages = vec![
2558 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
2559 1 : hdr: PagestreamRequest {
2560 1 : reqid: 0,
2561 1 : request_lsn: Lsn(4),
2562 1 : not_modified_since: Lsn(3),
2563 1 : },
2564 1 : rel: RelTag {
2565 1 : forknum: 1,
2566 1 : spcnode: 2,
2567 1 : dbnode: 3,
2568 1 : relnode: 4,
2569 1 : },
2570 1 : }),
2571 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2572 1 : hdr: PagestreamRequest {
2573 1 : reqid: 0,
2574 1 : request_lsn: Lsn(4),
2575 1 : not_modified_since: Lsn(4),
2576 1 : },
2577 1 : rel: RelTag {
2578 1 : forknum: 1,
2579 1 : spcnode: 2,
2580 1 : dbnode: 3,
2581 1 : relnode: 4,
2582 1 : },
2583 1 : }),
2584 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2585 1 : hdr: PagestreamRequest {
2586 1 : reqid: 0,
2587 1 : request_lsn: Lsn(4),
2588 1 : not_modified_since: Lsn(3),
2589 1 : },
2590 1 : rel: RelTag {
2591 1 : forknum: 1,
2592 1 : spcnode: 2,
2593 1 : dbnode: 3,
2594 1 : relnode: 4,
2595 1 : },
2596 1 : blkno: 7,
2597 1 : }),
2598 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2599 1 : hdr: PagestreamRequest {
2600 1 : reqid: 0,
2601 1 : request_lsn: Lsn(4),
2602 1 : not_modified_since: Lsn(3),
2603 1 : },
2604 1 : dbnode: 7,
2605 1 : }),
2606 1 : ];
2607 5 : for msg in messages {
2608 4 : let bytes = msg.serialize();
2609 4 : let reconstructed =
2610 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
2611 4 : .unwrap();
2612 4 : assert!(msg == reconstructed);
2613 : }
2614 1 : }
2615 :
2616 : #[test]
2617 1 : fn test_tenantinfo_serde() {
2618 1 : // Test serialization/deserialization of TenantInfo
2619 1 : let original_active = TenantInfo {
2620 1 : id: TenantShardId::unsharded(TenantId::generate()),
2621 1 : state: TenantState::Active,
2622 1 : current_physical_size: Some(42),
2623 1 : attachment_status: TenantAttachmentStatus::Attached,
2624 1 : generation: 1,
2625 1 : gc_blocking: None,
2626 1 : };
2627 1 : let expected_active = json!({
2628 1 : "id": original_active.id.to_string(),
2629 1 : "state": {
2630 1 : "slug": "Active",
2631 1 : },
2632 1 : "current_physical_size": 42,
2633 1 : "attachment_status": {
2634 1 : "slug":"attached",
2635 1 : },
2636 1 : "generation" : 1
2637 1 : });
2638 1 :
2639 1 : let original_broken = TenantInfo {
2640 1 : id: TenantShardId::unsharded(TenantId::generate()),
2641 1 : state: TenantState::Broken {
2642 1 : reason: "reason".into(),
2643 1 : backtrace: "backtrace info".into(),
2644 1 : },
2645 1 : current_physical_size: Some(42),
2646 1 : attachment_status: TenantAttachmentStatus::Attached,
2647 1 : generation: 1,
2648 1 : gc_blocking: None,
2649 1 : };
2650 1 : let expected_broken = json!({
2651 1 : "id": original_broken.id.to_string(),
2652 1 : "state": {
2653 1 : "slug": "Broken",
2654 1 : "data": {
2655 1 : "backtrace": "backtrace info",
2656 1 : "reason": "reason",
2657 1 : }
2658 1 : },
2659 1 : "current_physical_size": 42,
2660 1 : "attachment_status": {
2661 1 : "slug":"attached",
2662 1 : },
2663 1 : "generation" : 1
2664 1 : });
2665 1 :
2666 1 : assert_eq!(
2667 1 : serde_json::to_value(&original_active).unwrap(),
2668 1 : expected_active
2669 1 : );
2670 :
2671 1 : assert_eq!(
2672 1 : serde_json::to_value(&original_broken).unwrap(),
2673 1 : expected_broken
2674 1 : );
2675 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
2676 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
2677 1 : }
2678 :
2679 : #[test]
2680 1 : fn test_reject_unknown_field() {
2681 1 : let id = TenantId::generate();
2682 1 : let config_request = json!({
2683 1 : "tenant_id": id.to_string(),
2684 1 : "unknown_field": "unknown_value".to_string(),
2685 1 : });
2686 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
2687 1 : assert!(
2688 1 : err.to_string().contains("unknown field `unknown_field`"),
2689 0 : "expect unknown field `unknown_field` error, got: {}",
2690 : err
2691 : );
2692 1 : }
2693 :
2694 : #[test]
2695 1 : fn tenantstatus_activating_serde() {
2696 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
2697 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
2698 1 :
2699 1 : let actual = serde_json::to_string(&states).unwrap();
2700 1 :
2701 1 : assert_eq!(actual, expected);
2702 :
2703 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
2704 1 :
2705 1 : assert_eq!(states.as_slice(), &parsed);
2706 1 : }
2707 :
2708 : #[test]
2709 1 : fn tenantstatus_activating_strum() {
2710 1 : // tests added, because we use these for metrics
2711 1 : let examples = [
2712 1 : (line!(), TenantState::Attaching, "Attaching"),
2713 1 : (
2714 1 : line!(),
2715 1 : TenantState::Activating(ActivatingFrom::Attaching),
2716 1 : "Activating",
2717 1 : ),
2718 1 : (line!(), TenantState::Active, "Active"),
2719 1 : (
2720 1 : line!(),
2721 1 : TenantState::Stopping {
2722 1 : progress: utils::completion::Barrier::default(),
2723 1 : },
2724 1 : "Stopping",
2725 1 : ),
2726 1 : (
2727 1 : line!(),
2728 1 : TenantState::Broken {
2729 1 : reason: "Example".into(),
2730 1 : backtrace: "Looooong backtrace".into(),
2731 1 : },
2732 1 : "Broken",
2733 1 : ),
2734 1 : ];
2735 :
2736 6 : for (line, rendered, expected) in examples {
2737 5 : let actual: &'static str = rendered.into();
2738 5 : assert_eq!(actual, expected, "example on {line}");
2739 : }
2740 1 : }
2741 :
2742 : #[test]
2743 1 : fn test_image_compression_algorithm_parsing() {
2744 : use ImageCompressionAlgorithm::*;
2745 1 : let cases = [
2746 1 : ("disabled", Disabled),
2747 1 : ("zstd", Zstd { level: None }),
2748 1 : ("zstd(18)", Zstd { level: Some(18) }),
2749 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
2750 1 : ];
2751 :
2752 5 : for (display, expected) in cases {
2753 4 : assert_eq!(
2754 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
2755 : expected,
2756 0 : "parsing works"
2757 : );
2758 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
2759 :
2760 4 : let ser = serde_json::to_string(&expected).expect("serialization");
2761 4 : assert_eq!(
2762 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
2763 : expected,
2764 0 : "serde roundtrip"
2765 : );
2766 :
2767 4 : assert_eq!(
2768 4 : serde_json::Value::String(display.to_string()),
2769 4 : serde_json::to_value(expected).unwrap(),
2770 0 : "Display is the serde serialization"
2771 : );
2772 : }
2773 1 : }
2774 :
2775 : #[test]
2776 1 : fn test_tenant_config_patch_request_serde() {
2777 1 : let patch_request = TenantConfigPatchRequest {
2778 1 : tenant_id: TenantId::from_str("17c6d121946a61e5ab0fe5a2fd4d8215").unwrap(),
2779 1 : config: TenantConfigPatch {
2780 1 : checkpoint_distance: FieldPatch::Upsert(42),
2781 1 : gc_horizon: FieldPatch::Remove,
2782 1 : compaction_threshold: FieldPatch::Noop,
2783 1 : ..TenantConfigPatch::default()
2784 1 : },
2785 1 : };
2786 1 :
2787 1 : let json = serde_json::to_string(&patch_request).unwrap();
2788 1 :
2789 1 : let expected = r#"{"tenant_id":"17c6d121946a61e5ab0fe5a2fd4d8215","checkpoint_distance":42,"gc_horizon":null}"#;
2790 1 : assert_eq!(json, expected);
2791 :
2792 1 : let decoded: TenantConfigPatchRequest = serde_json::from_str(&json).unwrap();
2793 1 : assert_eq!(decoded.tenant_id, patch_request.tenant_id);
2794 1 : assert_eq!(decoded.config, patch_request.config);
2795 :
2796 : // Now apply the patch to a config to demonstrate semantics
2797 :
2798 1 : let base = TenantConfig {
2799 1 : checkpoint_distance: Some(28),
2800 1 : gc_horizon: Some(100),
2801 1 : compaction_target_size: Some(1024),
2802 1 : ..Default::default()
2803 1 : };
2804 1 :
2805 1 : let expected = TenantConfig {
2806 1 : checkpoint_distance: Some(42),
2807 1 : gc_horizon: None,
2808 1 : ..base.clone()
2809 1 : };
2810 1 :
2811 1 : let patched = base.apply_patch(decoded.config).unwrap();
2812 1 :
2813 1 : assert_eq!(patched, expected);
2814 1 : }
2815 : }
|