Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : #[cfg(feature = "testing")]
6 : use camino::Utf8PathBuf;
7 : pub use utilization::PageserverUtilization;
8 :
9 : use core::ops::Range;
10 : use std::{
11 : collections::HashMap,
12 : fmt::Display,
13 : io::{BufRead, Read},
14 : num::{NonZeroU32, NonZeroU64, NonZeroUsize},
15 : str::FromStr,
16 : time::{Duration, SystemTime},
17 : };
18 :
19 : use byteorder::{BigEndian, ReadBytesExt};
20 : use postgres_ffi::BLCKSZ;
21 : use serde::{Deserialize, Deserializer, Serialize, Serializer};
22 : use serde_with::serde_as;
23 : use utils::{
24 : completion,
25 : id::{NodeId, TenantId, TimelineId},
26 : lsn::Lsn,
27 : postgres_client::PostgresClientProtocol,
28 : serde_system_time,
29 : };
30 :
31 : use crate::{
32 : key::{CompactKey, Key},
33 : reltag::RelTag,
34 : shard::{ShardCount, ShardStripeSize, TenantShardId},
35 : };
36 : use bytes::{Buf, BufMut, Bytes, BytesMut};
37 :
38 : /// The state of a tenant in this pageserver.
39 : ///
40 : /// ```mermaid
41 : /// stateDiagram-v2
42 : ///
43 : /// [*] --> Attaching: spawn_attach()
44 : ///
45 : /// Attaching --> Activating: activate()
46 : /// Activating --> Active: infallible
47 : ///
48 : /// Attaching --> Broken: attach() failure
49 : ///
50 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
51 : /// Stopping --> Broken: late error in remove_tenant_from_memory
52 : ///
53 : /// Broken --> [*]: ignore / detach / shutdown
54 : /// Stopping --> [*]: remove_from_memory complete
55 : ///
56 : /// Active --> Broken: cfg(testing)-only tenant break point
57 : /// ```
58 : #[derive(
59 : Clone,
60 : PartialEq,
61 : Eq,
62 0 : serde::Serialize,
63 1 : serde::Deserialize,
64 : strum_macros::Display,
65 : strum_macros::VariantNames,
66 : strum_macros::AsRefStr,
67 : strum_macros::IntoStaticStr,
68 : )]
69 : #[serde(tag = "slug", content = "data")]
70 : pub enum TenantState {
71 : /// This tenant is being attached to the pageserver.
72 : ///
73 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
74 : Attaching,
75 : /// The tenant is transitioning from Loading/Attaching to Active.
76 : ///
77 : /// While in this state, the individual timelines are being activated.
78 : ///
79 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
80 : Activating(ActivatingFrom),
81 : /// The tenant has finished activating and is open for business.
82 : ///
83 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
84 : Active,
85 : /// The tenant is recognized by pageserver, but it is being detached or the
86 : /// system is being shut down.
87 : ///
88 : /// Transitions out of this state are possible through `set_broken()`.
89 : Stopping {
90 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
91 : // otherwise it will not be skipped during deserialization
92 : #[serde(skip)]
93 : progress: completion::Barrier,
94 : },
95 : /// The tenant is recognized by the pageserver, but can no longer be used for
96 : /// any operations.
97 : ///
98 : /// If the tenant fails to load or attach, it will transition to this state
99 : /// and it is guaranteed that no background tasks are running in its name.
100 : ///
101 : /// The other way to transition into this state is from `Stopping` state
102 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
103 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
104 : Broken { reason: String, backtrace: String },
105 : }
106 :
107 : impl TenantState {
108 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
109 : use TenantAttachmentStatus::*;
110 :
111 : // Below TenantState::Activating is used as "transient" or "transparent" state for
112 : // attachment_status determining.
113 0 : match self {
114 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
115 : // So, technically, we can return Attached here.
116 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
117 : // But, our attach task might still be fetching the remote timelines, etc.
118 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
119 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
120 : // We only reach Active after successful load / attach.
121 : // So, call atttachment status Attached.
122 0 : Self::Active => Attached,
123 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
124 : // However, it also becomes Broken if the regular load fails.
125 : // From Console's perspective there's no practical difference
126 : // because attachment_status is polled by console only during attach operation execution.
127 0 : Self::Broken { reason, .. } => Failed {
128 0 : reason: reason.to_owned(),
129 0 : },
130 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
131 : // we set the Stopping state irrespective of whether the tenant
132 : // has finished attaching or not.
133 0 : Self::Stopping { .. } => Maybe,
134 : }
135 0 : }
136 :
137 0 : pub fn broken_from_reason(reason: String) -> Self {
138 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
139 0 : Self::Broken {
140 0 : reason,
141 0 : backtrace: backtrace_str,
142 0 : }
143 0 : }
144 : }
145 :
146 : impl std::fmt::Debug for TenantState {
147 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 2 : match self {
149 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
150 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
151 : }
152 0 : _ => write!(f, "{self}"),
153 : }
154 2 : }
155 : }
156 :
157 : /// A temporary lease to a specific lsn inside a timeline.
158 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
159 : #[serde_as]
160 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
161 : pub struct LsnLease {
162 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
163 : pub valid_until: SystemTime,
164 : }
165 :
166 : serde_with::serde_conv!(
167 : SystemTimeAsRfc3339Millis,
168 : SystemTime,
169 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
170 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
171 : );
172 :
173 : impl LsnLease {
174 : /// The default length for an explicit LSN lease request (10 minutes).
175 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
176 :
177 : /// The default length for an implicit LSN lease granted during
178 : /// `get_lsn_by_timestamp` request (1 minutes).
179 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
180 :
181 : /// Checks whether the lease is expired.
182 12 : pub fn is_expired(&self, now: &SystemTime) -> bool {
183 12 : now > &self.valid_until
184 12 : }
185 : }
186 :
187 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
188 : ///
189 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
190 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
191 1 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
192 : pub enum ActivatingFrom {
193 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
194 : Attaching,
195 : }
196 :
197 : /// A state of a timeline in pageserver's memory.
198 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
199 : pub enum TimelineState {
200 : /// The timeline is recognized by the pageserver but is not yet operational.
201 : /// In particular, the walreceiver connection loop is not running for this timeline.
202 : /// It will eventually transition to state Active or Broken.
203 : Loading,
204 : /// The timeline is fully operational.
205 : /// It can be queried, and the walreceiver connection loop is running.
206 : Active,
207 : /// The timeline was previously Loading or Active but is shutting down.
208 : /// It cannot transition back into any other state.
209 : Stopping,
210 : /// The timeline is broken and not operational (previous states: Loading or Active).
211 : Broken { reason: String, backtrace: String },
212 : }
213 :
214 : #[serde_with::serde_as]
215 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
216 : pub struct CompactLsnRange {
217 : pub start: Lsn,
218 : pub end: Lsn,
219 : }
220 :
221 : #[serde_with::serde_as]
222 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
223 : pub struct CompactKeyRange {
224 : #[serde_as(as = "serde_with::DisplayFromStr")]
225 : pub start: Key,
226 : #[serde_as(as = "serde_with::DisplayFromStr")]
227 : pub end: Key,
228 : }
229 :
230 : impl From<Range<Lsn>> for CompactLsnRange {
231 12 : fn from(range: Range<Lsn>) -> Self {
232 12 : Self {
233 12 : start: range.start,
234 12 : end: range.end,
235 12 : }
236 12 : }
237 : }
238 :
239 : impl From<Range<Key>> for CompactKeyRange {
240 32 : fn from(range: Range<Key>) -> Self {
241 32 : Self {
242 32 : start: range.start,
243 32 : end: range.end,
244 32 : }
245 32 : }
246 : }
247 :
248 : impl From<CompactLsnRange> for Range<Lsn> {
249 20 : fn from(range: CompactLsnRange) -> Self {
250 20 : range.start..range.end
251 20 : }
252 : }
253 :
254 : impl From<CompactKeyRange> for Range<Key> {
255 32 : fn from(range: CompactKeyRange) -> Self {
256 32 : range.start..range.end
257 32 : }
258 : }
259 :
260 : impl CompactLsnRange {
261 8 : pub fn above(lsn: Lsn) -> Self {
262 8 : Self {
263 8 : start: lsn,
264 8 : end: Lsn::MAX,
265 8 : }
266 8 : }
267 : }
268 :
269 : #[derive(Debug, Clone, Serialize)]
270 : pub struct CompactInfoResponse {
271 : pub compact_key_range: Option<CompactKeyRange>,
272 : pub compact_lsn_range: Option<CompactLsnRange>,
273 : pub sub_compaction: bool,
274 : pub running: bool,
275 : pub job_id: usize,
276 : }
277 :
278 0 : #[derive(Serialize, Deserialize, Clone)]
279 : pub struct TimelineCreateRequest {
280 : pub new_timeline_id: TimelineId,
281 : #[serde(flatten)]
282 : pub mode: TimelineCreateRequestMode,
283 : }
284 :
285 0 : #[derive(Serialize, Deserialize, Clone)]
286 : #[serde(untagged)]
287 : pub enum TimelineCreateRequestMode {
288 : Branch {
289 : ancestor_timeline_id: TimelineId,
290 : #[serde(default)]
291 : ancestor_start_lsn: Option<Lsn>,
292 : // TODO: cplane sets this, but, the branching code always
293 : // inherits the ancestor's pg_version. Earlier code wasn't
294 : // using a flattened enum, so, it was an accepted field, and
295 : // we continue to accept it by having it here.
296 : pg_version: Option<u32>,
297 : },
298 : ImportPgdata {
299 : import_pgdata: TimelineCreateRequestModeImportPgdata,
300 : },
301 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
302 : // (serde picks the first matching enum variant, in declaration order).
303 : Bootstrap {
304 : #[serde(default)]
305 : existing_initdb_timeline_id: Option<TimelineId>,
306 : pg_version: Option<u32>,
307 : },
308 : }
309 :
310 0 : #[derive(Serialize, Deserialize, Clone)]
311 : pub struct TimelineCreateRequestModeImportPgdata {
312 : pub location: ImportPgdataLocation,
313 : pub idempotency_key: ImportPgdataIdempotencyKey,
314 : }
315 :
316 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
317 : pub enum ImportPgdataLocation {
318 : #[cfg(feature = "testing")]
319 : LocalFs { path: Utf8PathBuf },
320 : AwsS3 {
321 : region: String,
322 : bucket: String,
323 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
324 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
325 : key: String,
326 : },
327 : }
328 :
329 0 : #[derive(Serialize, Deserialize, Clone)]
330 : #[serde(transparent)]
331 : pub struct ImportPgdataIdempotencyKey(pub String);
332 :
333 : impl ImportPgdataIdempotencyKey {
334 0 : pub fn random() -> Self {
335 : use rand::{distributions::Alphanumeric, Rng};
336 0 : Self(
337 0 : rand::thread_rng()
338 0 : .sample_iter(&Alphanumeric)
339 0 : .take(20)
340 0 : .map(char::from)
341 0 : .collect(),
342 0 : )
343 0 : }
344 : }
345 :
346 0 : #[derive(Serialize, Deserialize, Clone)]
347 : pub struct LsnLeaseRequest {
348 : pub lsn: Lsn,
349 : }
350 :
351 0 : #[derive(Serialize, Deserialize)]
352 : pub struct TenantShardSplitRequest {
353 : pub new_shard_count: u8,
354 :
355 : // A tenant's stripe size is only meaningful the first time their shard count goes
356 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
357 : //
358 : // If this is set while the stripe count is being increased from an already >1 value,
359 : // then the request will fail with 400.
360 : pub new_stripe_size: Option<ShardStripeSize>,
361 : }
362 :
363 0 : #[derive(Serialize, Deserialize)]
364 : pub struct TenantShardSplitResponse {
365 : pub new_shards: Vec<TenantShardId>,
366 : }
367 :
368 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
369 0 : #[derive(Serialize, Deserialize, Debug)]
370 : #[serde(deny_unknown_fields)]
371 : pub struct ShardParameters {
372 : pub count: ShardCount,
373 : pub stripe_size: ShardStripeSize,
374 : }
375 :
376 : impl ShardParameters {
377 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
378 :
379 0 : pub fn is_unsharded(&self) -> bool {
380 0 : self.count.is_unsharded()
381 0 : }
382 : }
383 :
384 : impl Default for ShardParameters {
385 445 : fn default() -> Self {
386 445 : Self {
387 445 : count: ShardCount::new(0),
388 445 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
389 445 : }
390 445 : }
391 : }
392 :
393 : #[derive(Debug, Default, Clone, Eq, PartialEq)]
394 : pub enum FieldPatch<T> {
395 : Upsert(T),
396 : Remove,
397 : #[default]
398 : Noop,
399 : }
400 :
401 : impl<T> FieldPatch<T> {
402 70 : fn is_noop(&self) -> bool {
403 70 : matches!(self, FieldPatch::Noop)
404 70 : }
405 :
406 35 : pub fn apply(self, target: &mut Option<T>) {
407 35 : match self {
408 1 : Self::Upsert(v) => *target = Some(v),
409 1 : Self::Remove => *target = None,
410 33 : Self::Noop => {}
411 : }
412 35 : }
413 :
414 0 : pub fn map<U, E, F: FnOnce(T) -> Result<U, E>>(self, map: F) -> Result<FieldPatch<U>, E> {
415 0 : match self {
416 0 : Self::Upsert(v) => Ok(FieldPatch::<U>::Upsert(map(v)?)),
417 0 : Self::Remove => Ok(FieldPatch::<U>::Remove),
418 0 : Self::Noop => Ok(FieldPatch::<U>::Noop),
419 : }
420 0 : }
421 : }
422 :
423 : impl<'de, T: Deserialize<'de>> Deserialize<'de> for FieldPatch<T> {
424 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
425 2 : where
426 2 : D: Deserializer<'de>,
427 2 : {
428 2 : Option::deserialize(deserializer).map(|opt| match opt {
429 1 : None => FieldPatch::Remove,
430 1 : Some(val) => FieldPatch::Upsert(val),
431 2 : })
432 2 : }
433 : }
434 :
435 : impl<T: Serialize> Serialize for FieldPatch<T> {
436 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
437 2 : where
438 2 : S: Serializer,
439 2 : {
440 2 : match self {
441 1 : FieldPatch::Upsert(val) => serializer.serialize_some(val),
442 1 : FieldPatch::Remove => serializer.serialize_none(),
443 0 : FieldPatch::Noop => unreachable!(),
444 : }
445 2 : }
446 : }
447 :
448 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
449 : #[serde(default)]
450 : pub struct TenantConfigPatch {
451 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
452 : pub checkpoint_distance: FieldPatch<u64>,
453 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
454 : pub checkpoint_timeout: FieldPatch<String>,
455 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
456 : pub compaction_target_size: FieldPatch<u64>,
457 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
458 : pub compaction_period: FieldPatch<String>,
459 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
460 : pub compaction_threshold: FieldPatch<usize>,
461 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
462 : pub compaction_upper_limit: FieldPatch<usize>,
463 : // defer parsing compaction_algorithm, like eviction_policy
464 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
465 : pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
466 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
467 : pub compaction_l0_first: FieldPatch<bool>,
468 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
469 : pub compaction_l0_semaphore: FieldPatch<bool>,
470 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
471 : pub l0_flush_delay_threshold: FieldPatch<usize>,
472 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
473 : pub l0_flush_stall_threshold: FieldPatch<usize>,
474 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
475 : pub l0_flush_wait_upload: FieldPatch<bool>,
476 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
477 : pub gc_horizon: FieldPatch<u64>,
478 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
479 : pub gc_period: FieldPatch<String>,
480 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
481 : pub image_creation_threshold: FieldPatch<usize>,
482 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
483 : pub pitr_interval: FieldPatch<String>,
484 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
485 : pub walreceiver_connect_timeout: FieldPatch<String>,
486 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
487 : pub lagging_wal_timeout: FieldPatch<String>,
488 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
489 : pub max_lsn_wal_lag: FieldPatch<NonZeroU64>,
490 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
491 : pub eviction_policy: FieldPatch<EvictionPolicy>,
492 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
493 : pub min_resident_size_override: FieldPatch<u64>,
494 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
495 : pub evictions_low_residence_duration_metric_threshold: FieldPatch<String>,
496 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
497 : pub heatmap_period: FieldPatch<String>,
498 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
499 : pub lazy_slru_download: FieldPatch<bool>,
500 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
501 : pub timeline_get_throttle: FieldPatch<ThrottleConfig>,
502 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
503 : pub image_layer_creation_check_threshold: FieldPatch<u8>,
504 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
505 : pub image_creation_preempt_threshold: FieldPatch<usize>,
506 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
507 : pub lsn_lease_length: FieldPatch<String>,
508 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
509 : pub lsn_lease_length_for_ts: FieldPatch<String>,
510 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
511 : pub timeline_offloading: FieldPatch<bool>,
512 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
513 : pub wal_receiver_protocol_override: FieldPatch<PostgresClientProtocol>,
514 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
515 : pub rel_size_v2_enabled: FieldPatch<bool>,
516 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
517 : pub gc_compaction_enabled: FieldPatch<bool>,
518 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
519 : pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
520 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
521 : pub gc_compaction_ratio_percent: FieldPatch<u64>,
522 : }
523 :
524 : /// An alternative representation of `pageserver::tenant::TenantConf` with
525 : /// simpler types.
526 0 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
527 : pub struct TenantConfig {
528 : pub checkpoint_distance: Option<u64>,
529 : pub checkpoint_timeout: Option<String>,
530 : pub compaction_target_size: Option<u64>,
531 : pub compaction_period: Option<String>,
532 : pub compaction_threshold: Option<usize>,
533 : pub compaction_upper_limit: Option<usize>,
534 : // defer parsing compaction_algorithm, like eviction_policy
535 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
536 : pub compaction_l0_first: Option<bool>,
537 : pub compaction_l0_semaphore: Option<bool>,
538 : pub l0_flush_delay_threshold: Option<usize>,
539 : pub l0_flush_stall_threshold: Option<usize>,
540 : pub l0_flush_wait_upload: Option<bool>,
541 : pub gc_horizon: Option<u64>,
542 : pub gc_period: Option<String>,
543 : pub image_creation_threshold: Option<usize>,
544 : pub pitr_interval: Option<String>,
545 : pub walreceiver_connect_timeout: Option<String>,
546 : pub lagging_wal_timeout: Option<String>,
547 : pub max_lsn_wal_lag: Option<NonZeroU64>,
548 : pub eviction_policy: Option<EvictionPolicy>,
549 : pub min_resident_size_override: Option<u64>,
550 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
551 : pub heatmap_period: Option<String>,
552 : pub lazy_slru_download: Option<bool>,
553 : pub timeline_get_throttle: Option<ThrottleConfig>,
554 : pub image_layer_creation_check_threshold: Option<u8>,
555 : pub image_creation_preempt_threshold: Option<usize>,
556 : pub lsn_lease_length: Option<String>,
557 : pub lsn_lease_length_for_ts: Option<String>,
558 : pub timeline_offloading: Option<bool>,
559 : pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
560 : pub rel_size_v2_enabled: Option<bool>,
561 : pub gc_compaction_enabled: Option<bool>,
562 : pub gc_compaction_initial_threshold_kb: Option<u64>,
563 : pub gc_compaction_ratio_percent: Option<u64>,
564 : }
565 :
566 : impl TenantConfig {
567 1 : pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
568 1 : let Self {
569 1 : mut checkpoint_distance,
570 1 : mut checkpoint_timeout,
571 1 : mut compaction_target_size,
572 1 : mut compaction_period,
573 1 : mut compaction_threshold,
574 1 : mut compaction_upper_limit,
575 1 : mut compaction_algorithm,
576 1 : mut compaction_l0_first,
577 1 : mut compaction_l0_semaphore,
578 1 : mut l0_flush_delay_threshold,
579 1 : mut l0_flush_stall_threshold,
580 1 : mut l0_flush_wait_upload,
581 1 : mut gc_horizon,
582 1 : mut gc_period,
583 1 : mut image_creation_threshold,
584 1 : mut pitr_interval,
585 1 : mut walreceiver_connect_timeout,
586 1 : mut lagging_wal_timeout,
587 1 : mut max_lsn_wal_lag,
588 1 : mut eviction_policy,
589 1 : mut min_resident_size_override,
590 1 : mut evictions_low_residence_duration_metric_threshold,
591 1 : mut heatmap_period,
592 1 : mut lazy_slru_download,
593 1 : mut timeline_get_throttle,
594 1 : mut image_layer_creation_check_threshold,
595 1 : mut image_creation_preempt_threshold,
596 1 : mut lsn_lease_length,
597 1 : mut lsn_lease_length_for_ts,
598 1 : mut timeline_offloading,
599 1 : mut wal_receiver_protocol_override,
600 1 : mut rel_size_v2_enabled,
601 1 : mut gc_compaction_enabled,
602 1 : mut gc_compaction_initial_threshold_kb,
603 1 : mut gc_compaction_ratio_percent,
604 1 : } = self;
605 1 :
606 1 : patch.checkpoint_distance.apply(&mut checkpoint_distance);
607 1 : patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
608 1 : patch
609 1 : .compaction_target_size
610 1 : .apply(&mut compaction_target_size);
611 1 : patch.compaction_period.apply(&mut compaction_period);
612 1 : patch.compaction_threshold.apply(&mut compaction_threshold);
613 1 : patch
614 1 : .compaction_upper_limit
615 1 : .apply(&mut compaction_upper_limit);
616 1 : patch.compaction_algorithm.apply(&mut compaction_algorithm);
617 1 : patch.compaction_l0_first.apply(&mut compaction_l0_first);
618 1 : patch
619 1 : .compaction_l0_semaphore
620 1 : .apply(&mut compaction_l0_semaphore);
621 1 : patch
622 1 : .l0_flush_delay_threshold
623 1 : .apply(&mut l0_flush_delay_threshold);
624 1 : patch
625 1 : .l0_flush_stall_threshold
626 1 : .apply(&mut l0_flush_stall_threshold);
627 1 : patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
628 1 : patch.gc_horizon.apply(&mut gc_horizon);
629 1 : patch.gc_period.apply(&mut gc_period);
630 1 : patch
631 1 : .image_creation_threshold
632 1 : .apply(&mut image_creation_threshold);
633 1 : patch.pitr_interval.apply(&mut pitr_interval);
634 1 : patch
635 1 : .walreceiver_connect_timeout
636 1 : .apply(&mut walreceiver_connect_timeout);
637 1 : patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
638 1 : patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
639 1 : patch.eviction_policy.apply(&mut eviction_policy);
640 1 : patch
641 1 : .min_resident_size_override
642 1 : .apply(&mut min_resident_size_override);
643 1 : patch
644 1 : .evictions_low_residence_duration_metric_threshold
645 1 : .apply(&mut evictions_low_residence_duration_metric_threshold);
646 1 : patch.heatmap_period.apply(&mut heatmap_period);
647 1 : patch.lazy_slru_download.apply(&mut lazy_slru_download);
648 1 : patch
649 1 : .timeline_get_throttle
650 1 : .apply(&mut timeline_get_throttle);
651 1 : patch
652 1 : .image_layer_creation_check_threshold
653 1 : .apply(&mut image_layer_creation_check_threshold);
654 1 : patch
655 1 : .image_creation_preempt_threshold
656 1 : .apply(&mut image_creation_preempt_threshold);
657 1 : patch.lsn_lease_length.apply(&mut lsn_lease_length);
658 1 : patch
659 1 : .lsn_lease_length_for_ts
660 1 : .apply(&mut lsn_lease_length_for_ts);
661 1 : patch.timeline_offloading.apply(&mut timeline_offloading);
662 1 : patch
663 1 : .wal_receiver_protocol_override
664 1 : .apply(&mut wal_receiver_protocol_override);
665 1 : patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled);
666 1 : patch
667 1 : .gc_compaction_enabled
668 1 : .apply(&mut gc_compaction_enabled);
669 1 : patch
670 1 : .gc_compaction_initial_threshold_kb
671 1 : .apply(&mut gc_compaction_initial_threshold_kb);
672 1 : patch
673 1 : .gc_compaction_ratio_percent
674 1 : .apply(&mut gc_compaction_ratio_percent);
675 1 :
676 1 : Self {
677 1 : checkpoint_distance,
678 1 : checkpoint_timeout,
679 1 : compaction_target_size,
680 1 : compaction_period,
681 1 : compaction_threshold,
682 1 : compaction_upper_limit,
683 1 : compaction_algorithm,
684 1 : compaction_l0_first,
685 1 : compaction_l0_semaphore,
686 1 : l0_flush_delay_threshold,
687 1 : l0_flush_stall_threshold,
688 1 : l0_flush_wait_upload,
689 1 : gc_horizon,
690 1 : gc_period,
691 1 : image_creation_threshold,
692 1 : pitr_interval,
693 1 : walreceiver_connect_timeout,
694 1 : lagging_wal_timeout,
695 1 : max_lsn_wal_lag,
696 1 : eviction_policy,
697 1 : min_resident_size_override,
698 1 : evictions_low_residence_duration_metric_threshold,
699 1 : heatmap_period,
700 1 : lazy_slru_download,
701 1 : timeline_get_throttle,
702 1 : image_layer_creation_check_threshold,
703 1 : image_creation_preempt_threshold,
704 1 : lsn_lease_length,
705 1 : lsn_lease_length_for_ts,
706 1 : timeline_offloading,
707 1 : wal_receiver_protocol_override,
708 1 : rel_size_v2_enabled,
709 1 : gc_compaction_enabled,
710 1 : gc_compaction_initial_threshold_kb,
711 1 : gc_compaction_ratio_percent,
712 1 : }
713 1 : }
714 : }
715 :
716 : /// The policy for the aux file storage.
717 : ///
718 : /// It can be switched through `switch_aux_file_policy` tenant config.
719 : /// When the first aux file written, the policy will be persisted in the
720 : /// `index_part.json` file and has a limited migration path.
721 : ///
722 : /// Currently, we only allow the following migration path:
723 : ///
724 : /// Unset -> V1
725 : /// -> V2
726 : /// -> CrossValidation -> V2
727 : #[derive(
728 : Eq,
729 : PartialEq,
730 : Debug,
731 : Copy,
732 : Clone,
733 0 : strum_macros::EnumString,
734 : strum_macros::Display,
735 4 : serde_with::DeserializeFromStr,
736 : serde_with::SerializeDisplay,
737 : )]
738 : #[strum(serialize_all = "kebab-case")]
739 : pub enum AuxFilePolicy {
740 : /// V1 aux file policy: store everything in AUX_FILE_KEY
741 : #[strum(ascii_case_insensitive)]
742 : V1,
743 : /// V2 aux file policy: store in the AUX_FILE keyspace
744 : #[strum(ascii_case_insensitive)]
745 : V2,
746 : /// Cross validation runs both formats on the write path and does validation
747 : /// on the read path.
748 : #[strum(ascii_case_insensitive)]
749 : CrossValidation,
750 : }
751 :
752 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
753 : #[serde(tag = "kind")]
754 : pub enum EvictionPolicy {
755 : NoEviction,
756 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
757 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
758 : }
759 :
760 : impl EvictionPolicy {
761 0 : pub fn discriminant_str(&self) -> &'static str {
762 0 : match self {
763 0 : EvictionPolicy::NoEviction => "NoEviction",
764 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
765 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
766 : }
767 0 : }
768 : }
769 :
770 : #[derive(
771 : Eq,
772 : PartialEq,
773 : Debug,
774 : Copy,
775 : Clone,
776 0 : strum_macros::EnumString,
777 : strum_macros::Display,
778 0 : serde_with::DeserializeFromStr,
779 : serde_with::SerializeDisplay,
780 : )]
781 : #[strum(serialize_all = "kebab-case")]
782 : pub enum CompactionAlgorithm {
783 : Legacy,
784 : Tiered,
785 : }
786 :
787 : #[derive(
788 4 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
789 : )]
790 : pub enum ImageCompressionAlgorithm {
791 : // Disabled for writes, support decompressing during read path
792 : Disabled,
793 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
794 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
795 : Zstd {
796 : level: Option<i8>,
797 : },
798 : }
799 :
800 : impl FromStr for ImageCompressionAlgorithm {
801 : type Err = anyhow::Error;
802 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
803 8 : let mut components = s.split(['(', ')']);
804 8 : let first = components
805 8 : .next()
806 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
807 8 : match first {
808 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
809 6 : "zstd" => {
810 6 : let level = if let Some(v) = components.next() {
811 4 : let v: i8 = v.parse()?;
812 4 : Some(v)
813 : } else {
814 2 : None
815 : };
816 :
817 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
818 : }
819 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
820 : }
821 8 : }
822 : }
823 :
824 : impl Display for ImageCompressionAlgorithm {
825 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 12 : match self {
827 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
828 9 : ImageCompressionAlgorithm::Zstd { level } => {
829 9 : if let Some(level) = level {
830 6 : write!(f, "zstd({})", level)
831 : } else {
832 3 : write!(f, "zstd")
833 : }
834 : }
835 : }
836 12 : }
837 : }
838 :
839 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
840 : pub struct CompactionAlgorithmSettings {
841 : pub kind: CompactionAlgorithm,
842 : }
843 :
844 8 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
845 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
846 : pub enum L0FlushConfig {
847 : #[serde(rename_all = "snake_case")]
848 : Direct { max_concurrency: NonZeroUsize },
849 : }
850 :
851 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
852 : pub struct EvictionPolicyLayerAccessThreshold {
853 : #[serde(with = "humantime_serde")]
854 : pub period: Duration,
855 : #[serde(with = "humantime_serde")]
856 : pub threshold: Duration,
857 : }
858 :
859 6 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
860 : pub struct ThrottleConfig {
861 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
862 : #[serde(rename = "task_kinds")]
863 : pub enabled: ThrottleConfigTaskKinds,
864 : pub initial: u32,
865 : #[serde(with = "humantime_serde")]
866 : pub refill_interval: Duration,
867 : pub refill_amount: NonZeroU32,
868 : pub max: u32,
869 : }
870 :
871 : /// Before <https://github.com/neondatabase/neon/pull/9962>
872 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
873 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
874 : /// were subject to the throttle.
875 : ///
876 : /// After that PR, the throttle is applied at pagestream request level
877 : /// and the `task_kinds` field does not apply since the only task kind
878 : /// that us subject to the throttle is that of the page service.
879 : ///
880 : /// However, we don't want to make a breaking config change right now
881 : /// because it means we have to migrate all the tenant configs.
882 : /// This will be done in a future PR.
883 : ///
884 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
885 : /// field to determine if the throttle is enabled or not.
886 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
887 : #[serde(transparent)]
888 : pub struct ThrottleConfigTaskKinds(Vec<String>);
889 :
890 : impl ThrottleConfigTaskKinds {
891 909 : pub fn disabled() -> Self {
892 909 : Self(vec![])
893 909 : }
894 446 : pub fn is_enabled(&self) -> bool {
895 446 : !self.0.is_empty()
896 446 : }
897 : }
898 :
899 : impl ThrottleConfig {
900 909 : pub fn disabled() -> Self {
901 909 : Self {
902 909 : enabled: ThrottleConfigTaskKinds::disabled(),
903 909 : // other values don't matter with emtpy `task_kinds`.
904 909 : initial: 0,
905 909 : refill_interval: Duration::from_millis(1),
906 909 : refill_amount: NonZeroU32::new(1).unwrap(),
907 909 : max: 1,
908 909 : }
909 909 : }
910 : /// The requests per second allowed by the given config.
911 0 : pub fn steady_rps(&self) -> f64 {
912 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
913 0 : }
914 : }
915 :
916 : #[cfg(test)]
917 : mod throttle_config_tests {
918 : use super::*;
919 :
920 : #[test]
921 1 : fn test_disabled_is_disabled() {
922 1 : let config = ThrottleConfig::disabled();
923 1 : assert!(!config.enabled.is_enabled());
924 1 : }
925 : #[test]
926 1 : fn test_enabled_backwards_compat() {
927 1 : let input = serde_json::json!({
928 1 : "task_kinds": ["PageRequestHandler"],
929 1 : "initial": 40000,
930 1 : "refill_interval": "50ms",
931 1 : "refill_amount": 1000,
932 1 : "max": 40000,
933 1 : "fair": true
934 1 : });
935 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
936 1 : assert!(config.enabled.is_enabled());
937 1 : }
938 : }
939 :
940 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
941 : /// lists out all possible states (and the virtual "Detached" state)
942 : /// in a flat form rather than using rust-style enums.
943 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
944 : pub enum LocationConfigMode {
945 : AttachedSingle,
946 : AttachedMulti,
947 : AttachedStale,
948 : Secondary,
949 : Detached,
950 : }
951 :
952 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
953 : pub struct LocationConfigSecondary {
954 : pub warm: bool,
955 : }
956 :
957 : /// An alternative representation of `pageserver::tenant::LocationConf`,
958 : /// for use in external-facing APIs.
959 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
960 : pub struct LocationConfig {
961 : pub mode: LocationConfigMode,
962 : /// If attaching, in what generation?
963 : #[serde(default)]
964 : pub generation: Option<u32>,
965 :
966 : // If requesting mode `Secondary`, configuration for that.
967 : #[serde(default)]
968 : pub secondary_conf: Option<LocationConfigSecondary>,
969 :
970 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
971 : // must be set accurately.
972 : #[serde(default)]
973 : pub shard_number: u8,
974 : #[serde(default)]
975 : pub shard_count: u8,
976 : #[serde(default)]
977 : pub shard_stripe_size: u32,
978 :
979 : // This configuration only affects attached mode, but should be provided irrespective
980 : // of the mode, as a secondary location might transition on startup if the response
981 : // to the `/re-attach` control plane API requests it.
982 : pub tenant_conf: TenantConfig,
983 : }
984 :
985 0 : #[derive(Serialize, Deserialize)]
986 : pub struct LocationConfigListResponse {
987 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
988 : }
989 :
990 : #[derive(Serialize)]
991 : pub struct StatusResponse {
992 : pub id: NodeId,
993 : }
994 :
995 0 : #[derive(Serialize, Deserialize, Debug)]
996 : #[serde(deny_unknown_fields)]
997 : pub struct TenantLocationConfigRequest {
998 : #[serde(flatten)]
999 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
1000 : }
1001 :
1002 0 : #[derive(Serialize, Deserialize, Debug)]
1003 : #[serde(deny_unknown_fields)]
1004 : pub struct TenantTimeTravelRequest {
1005 : pub shard_counts: Vec<ShardCount>,
1006 : }
1007 :
1008 0 : #[derive(Serialize, Deserialize, Debug)]
1009 : #[serde(deny_unknown_fields)]
1010 : pub struct TenantShardLocation {
1011 : pub shard_id: TenantShardId,
1012 : pub node_id: NodeId,
1013 : }
1014 :
1015 0 : #[derive(Serialize, Deserialize, Debug)]
1016 : #[serde(deny_unknown_fields)]
1017 : pub struct TenantLocationConfigResponse {
1018 : pub shards: Vec<TenantShardLocation>,
1019 : // If the shards' ShardCount count is >1, stripe_size will be set.
1020 : pub stripe_size: Option<ShardStripeSize>,
1021 : }
1022 :
1023 2 : #[derive(Serialize, Deserialize, Debug)]
1024 : #[serde(deny_unknown_fields)]
1025 : pub struct TenantConfigRequest {
1026 : pub tenant_id: TenantId,
1027 : #[serde(flatten)]
1028 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
1029 : }
1030 :
1031 : impl std::ops::Deref for TenantConfigRequest {
1032 : type Target = TenantConfig;
1033 :
1034 0 : fn deref(&self) -> &Self::Target {
1035 0 : &self.config
1036 0 : }
1037 : }
1038 :
1039 : impl TenantConfigRequest {
1040 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
1041 0 : let config = TenantConfig::default();
1042 0 : TenantConfigRequest { tenant_id, config }
1043 0 : }
1044 : }
1045 :
1046 3 : #[derive(Serialize, Deserialize, Debug)]
1047 : #[serde(deny_unknown_fields)]
1048 : pub struct TenantConfigPatchRequest {
1049 : pub tenant_id: TenantId,
1050 : #[serde(flatten)]
1051 : pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
1052 : }
1053 :
1054 0 : #[derive(Serialize, Deserialize, Debug)]
1055 : pub struct TenantWaitLsnRequest {
1056 : #[serde(flatten)]
1057 : pub timelines: HashMap<TimelineId, Lsn>,
1058 : pub timeout: Duration,
1059 : }
1060 :
1061 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
1062 0 : #[derive(Serialize, Deserialize, Clone)]
1063 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
1064 : pub enum TenantAttachmentStatus {
1065 : Maybe,
1066 : Attached,
1067 : Failed { reason: String },
1068 : }
1069 :
1070 0 : #[derive(Serialize, Deserialize, Clone)]
1071 : pub struct TenantInfo {
1072 : pub id: TenantShardId,
1073 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
1074 : pub state: TenantState,
1075 : /// Sum of the size of all layer files.
1076 : /// If a layer is present in both local FS and S3, it counts only once.
1077 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
1078 : pub attachment_status: TenantAttachmentStatus,
1079 : pub generation: u32,
1080 :
1081 : /// Opaque explanation if gc is being blocked.
1082 : ///
1083 : /// Only looked up for the individual tenant detail, not the listing.
1084 : #[serde(skip_serializing_if = "Option::is_none")]
1085 : pub gc_blocking: Option<String>,
1086 : }
1087 :
1088 0 : #[derive(Serialize, Deserialize, Clone)]
1089 : pub struct TenantDetails {
1090 : #[serde(flatten)]
1091 : pub tenant_info: TenantInfo,
1092 :
1093 : pub walredo: Option<WalRedoManagerStatus>,
1094 :
1095 : pub timelines: Vec<TimelineId>,
1096 : }
1097 :
1098 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
1099 : pub enum TimelineArchivalState {
1100 : Archived,
1101 : Unarchived,
1102 : }
1103 :
1104 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1105 : pub struct TimelineArchivalConfigRequest {
1106 : pub state: TimelineArchivalState,
1107 : }
1108 :
1109 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1110 : pub struct TimelinesInfoAndOffloaded {
1111 : pub timelines: Vec<TimelineInfo>,
1112 : pub offloaded: Vec<OffloadedTimelineInfo>,
1113 : }
1114 :
1115 : /// Analog of [`TimelineInfo`] for offloaded timelines.
1116 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1117 : pub struct OffloadedTimelineInfo {
1118 : pub tenant_id: TenantShardId,
1119 : pub timeline_id: TimelineId,
1120 : /// Whether the timeline has a parent it has been branched off from or not
1121 : pub ancestor_timeline_id: Option<TimelineId>,
1122 : /// Whether to retain the branch lsn at the ancestor or not
1123 : pub ancestor_retain_lsn: Option<Lsn>,
1124 : /// The time point when the timeline was archived
1125 : pub archived_at: chrono::DateTime<chrono::Utc>,
1126 : }
1127 :
1128 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
1129 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1130 : pub struct TimelineInfo {
1131 : pub tenant_id: TenantShardId,
1132 : pub timeline_id: TimelineId,
1133 :
1134 : pub ancestor_timeline_id: Option<TimelineId>,
1135 : pub ancestor_lsn: Option<Lsn>,
1136 : pub last_record_lsn: Lsn,
1137 : pub prev_record_lsn: Option<Lsn>,
1138 :
1139 : /// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
1140 : /// TODO: remove once control plane no longer reads it.
1141 : pub latest_gc_cutoff_lsn: Lsn,
1142 :
1143 : /// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
1144 : /// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
1145 : /// as it is easier to reason about.
1146 : #[serde(default)]
1147 : pub applied_gc_cutoff_lsn: Lsn,
1148 :
1149 : /// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
1150 : /// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
1151 : /// LSN at which it is legal to create a branch or ephemeral endpoint.
1152 : ///
1153 : /// Note that holders of valid LSN leases may be able to create branches and read pages earlier
1154 : /// than this LSN, but new leases may not be taken out earlier than this LSN.
1155 : #[serde(default)]
1156 : pub min_readable_lsn: Lsn,
1157 :
1158 : pub disk_consistent_lsn: Lsn,
1159 :
1160 : /// The LSN that we have succesfully uploaded to remote storage
1161 : pub remote_consistent_lsn: Lsn,
1162 :
1163 : /// The LSN that we are advertizing to safekeepers
1164 : pub remote_consistent_lsn_visible: Lsn,
1165 :
1166 : /// The LSN from the start of the root timeline (never changes)
1167 : pub initdb_lsn: Lsn,
1168 :
1169 : pub current_logical_size: u64,
1170 : pub current_logical_size_is_accurate: bool,
1171 :
1172 : pub directory_entries_counts: Vec<u64>,
1173 :
1174 : /// Sum of the size of all layer files.
1175 : /// If a layer is present in both local FS and S3, it counts only once.
1176 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
1177 : pub current_logical_size_non_incremental: Option<u64>,
1178 :
1179 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
1180 : /// beyond the branch's branch point, we only count up to the branch point.
1181 : pub pitr_history_size: u64,
1182 :
1183 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
1184 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
1185 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
1186 : /// otherwise be able to GC.
1187 : pub within_ancestor_pitr: bool,
1188 :
1189 : pub timeline_dir_layer_file_size_sum: Option<u64>,
1190 :
1191 : pub wal_source_connstr: Option<String>,
1192 : pub last_received_msg_lsn: Option<Lsn>,
1193 : /// the timestamp (in microseconds) of the last received message
1194 : pub last_received_msg_ts: Option<u128>,
1195 : pub pg_version: u32,
1196 :
1197 : pub state: TimelineState,
1198 :
1199 : pub walreceiver_status: String,
1200 :
1201 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
1202 : // Backward compatibility: you will get a JSON not containing the newly-added field.
1203 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
1204 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
1205 : // read.
1206 : pub is_archived: Option<bool>,
1207 : }
1208 :
1209 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1210 : pub struct LayerMapInfo {
1211 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
1212 : pub historic_layers: Vec<HistoricLayerInfo>,
1213 : }
1214 :
1215 : /// The residence status of a layer
1216 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1217 : pub enum LayerResidenceStatus {
1218 : /// Residence status for a layer file that exists locally.
1219 : /// It may also exist on the remote, we don't care here.
1220 : Resident,
1221 : /// Residence status for a layer file that only exists on the remote.
1222 : Evicted,
1223 : }
1224 :
1225 : #[serde_as]
1226 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1227 : pub struct LayerAccessStats {
1228 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1229 : pub access_time: SystemTime,
1230 :
1231 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1232 : pub residence_time: SystemTime,
1233 :
1234 : pub visible: bool,
1235 : }
1236 :
1237 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1238 : #[serde(tag = "kind")]
1239 : pub enum InMemoryLayerInfo {
1240 : Open { lsn_start: Lsn },
1241 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
1242 : }
1243 :
1244 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1245 : #[serde(tag = "kind")]
1246 : pub enum HistoricLayerInfo {
1247 : Delta {
1248 : layer_file_name: String,
1249 : layer_file_size: u64,
1250 :
1251 : lsn_start: Lsn,
1252 : lsn_end: Lsn,
1253 : remote: bool,
1254 : access_stats: LayerAccessStats,
1255 :
1256 : l0: bool,
1257 : },
1258 : Image {
1259 : layer_file_name: String,
1260 : layer_file_size: u64,
1261 :
1262 : lsn_start: Lsn,
1263 : remote: bool,
1264 : access_stats: LayerAccessStats,
1265 : },
1266 : }
1267 :
1268 : impl HistoricLayerInfo {
1269 0 : pub fn layer_file_name(&self) -> &str {
1270 0 : match self {
1271 : HistoricLayerInfo::Delta {
1272 0 : layer_file_name, ..
1273 0 : } => layer_file_name,
1274 : HistoricLayerInfo::Image {
1275 0 : layer_file_name, ..
1276 0 : } => layer_file_name,
1277 : }
1278 0 : }
1279 0 : pub fn is_remote(&self) -> bool {
1280 0 : match self {
1281 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
1282 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
1283 : }
1284 0 : }
1285 0 : pub fn set_remote(&mut self, value: bool) {
1286 0 : let field = match self {
1287 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
1288 0 : HistoricLayerInfo::Image { remote, .. } => remote,
1289 : };
1290 0 : *field = value;
1291 0 : }
1292 0 : pub fn layer_file_size(&self) -> u64 {
1293 0 : match self {
1294 : HistoricLayerInfo::Delta {
1295 0 : layer_file_size, ..
1296 0 : } => *layer_file_size,
1297 : HistoricLayerInfo::Image {
1298 0 : layer_file_size, ..
1299 0 : } => *layer_file_size,
1300 : }
1301 0 : }
1302 : }
1303 :
1304 0 : #[derive(Debug, Serialize, Deserialize)]
1305 : pub struct DownloadRemoteLayersTaskSpawnRequest {
1306 : pub max_concurrent_downloads: NonZeroUsize,
1307 : }
1308 :
1309 0 : #[derive(Debug, Serialize, Deserialize)]
1310 : pub struct IngestAuxFilesRequest {
1311 : pub aux_files: HashMap<String, String>,
1312 : }
1313 :
1314 0 : #[derive(Debug, Serialize, Deserialize)]
1315 : pub struct ListAuxFilesRequest {
1316 : pub lsn: Lsn,
1317 : }
1318 :
1319 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1320 : pub struct DownloadRemoteLayersTaskInfo {
1321 : pub task_id: String,
1322 : pub state: DownloadRemoteLayersTaskState,
1323 : pub total_layer_count: u64, // stable once `completed`
1324 : pub successful_download_count: u64, // stable once `completed`
1325 : pub failed_download_count: u64, // stable once `completed`
1326 : }
1327 :
1328 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1329 : pub enum DownloadRemoteLayersTaskState {
1330 : Running,
1331 : Completed,
1332 : ShutDown,
1333 : }
1334 :
1335 0 : #[derive(Debug, Serialize, Deserialize)]
1336 : pub struct TimelineGcRequest {
1337 : pub gc_horizon: Option<u64>,
1338 : }
1339 :
1340 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1341 : pub struct WalRedoManagerProcessStatus {
1342 : pub pid: u32,
1343 : }
1344 :
1345 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1346 : pub struct WalRedoManagerStatus {
1347 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
1348 : pub process: Option<WalRedoManagerProcessStatus>,
1349 : }
1350 :
1351 : /// The progress of a secondary tenant.
1352 : ///
1353 : /// It is mostly useful when doing a long running download: e.g. initiating
1354 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
1355 : /// what's happening.
1356 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
1357 : pub struct SecondaryProgress {
1358 : /// The remote storage LastModified time of the heatmap object we last downloaded.
1359 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
1360 :
1361 : /// The number of layers currently on-disk
1362 : pub layers_downloaded: usize,
1363 : /// The number of layers in the most recently seen heatmap
1364 : pub layers_total: usize,
1365 :
1366 : /// The number of layer bytes currently on-disk
1367 : pub bytes_downloaded: u64,
1368 : /// The number of layer bytes in the most recently seen heatmap
1369 : pub bytes_total: u64,
1370 : }
1371 :
1372 0 : #[derive(Serialize, Deserialize, Debug)]
1373 : pub struct TenantScanRemoteStorageShard {
1374 : pub tenant_shard_id: TenantShardId,
1375 : pub generation: Option<u32>,
1376 : }
1377 :
1378 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1379 : pub struct TenantScanRemoteStorageResponse {
1380 : pub shards: Vec<TenantScanRemoteStorageShard>,
1381 : }
1382 :
1383 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1384 : #[serde(rename_all = "snake_case")]
1385 : pub enum TenantSorting {
1386 : ResidentSize,
1387 : MaxLogicalSize,
1388 : }
1389 :
1390 : impl Default for TenantSorting {
1391 0 : fn default() -> Self {
1392 0 : Self::ResidentSize
1393 0 : }
1394 : }
1395 :
1396 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1397 : pub struct TopTenantShardsRequest {
1398 : // How would you like to sort the tenants?
1399 : pub order_by: TenantSorting,
1400 :
1401 : // How many results?
1402 : pub limit: usize,
1403 :
1404 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1405 : // that the caller would ever split to)
1406 : pub where_shards_lt: Option<ShardCount>,
1407 :
1408 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1409 : // let us quickly exclude numerous tiny shards)
1410 : pub where_gt: Option<u64>,
1411 : }
1412 :
1413 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1414 : pub struct TopTenantShardItem {
1415 : pub id: TenantShardId,
1416 :
1417 : /// Total size of layers on local disk for all timelines in this tenant
1418 : pub resident_size: u64,
1419 :
1420 : /// Total size of layers in remote storage for all timelines in this tenant
1421 : pub physical_size: u64,
1422 :
1423 : /// The largest logical size of a timeline within this tenant
1424 : pub max_logical_size: u64,
1425 : }
1426 :
1427 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1428 : pub struct TopTenantShardsResponse {
1429 : pub shards: Vec<TopTenantShardItem>,
1430 : }
1431 :
1432 : pub mod virtual_file {
1433 : #[derive(
1434 : Copy,
1435 : Clone,
1436 : PartialEq,
1437 : Eq,
1438 : Hash,
1439 0 : strum_macros::EnumString,
1440 : strum_macros::Display,
1441 0 : serde_with::DeserializeFromStr,
1442 : serde_with::SerializeDisplay,
1443 : Debug,
1444 : )]
1445 : #[strum(serialize_all = "kebab-case")]
1446 : pub enum IoEngineKind {
1447 : StdFs,
1448 : #[cfg(target_os = "linux")]
1449 : TokioEpollUring,
1450 : }
1451 :
1452 : /// Direct IO modes for a pageserver.
1453 : #[derive(
1454 : Copy,
1455 : Clone,
1456 : PartialEq,
1457 : Eq,
1458 : Hash,
1459 0 : strum_macros::EnumString,
1460 : strum_macros::Display,
1461 0 : serde_with::DeserializeFromStr,
1462 : serde_with::SerializeDisplay,
1463 : Debug,
1464 : )]
1465 : #[strum(serialize_all = "kebab-case")]
1466 : #[repr(u8)]
1467 : pub enum IoMode {
1468 : /// Uses buffered IO.
1469 : Buffered,
1470 : /// Uses direct IO, error out if the operation fails.
1471 : #[cfg(target_os = "linux")]
1472 : Direct,
1473 : }
1474 :
1475 : impl IoMode {
1476 480 : pub const fn preferred() -> Self {
1477 480 : Self::Buffered
1478 480 : }
1479 : }
1480 :
1481 : impl TryFrom<u8> for IoMode {
1482 : type Error = u8;
1483 :
1484 5068 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1485 5068 : Ok(match value {
1486 5068 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1487 : #[cfg(target_os = "linux")]
1488 0 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1489 0 : x => return Err(x),
1490 : })
1491 5068 : }
1492 : }
1493 : }
1494 :
1495 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1496 : pub struct ScanDisposableKeysResponse {
1497 : pub disposable_count: usize,
1498 : pub not_disposable_count: usize,
1499 : }
1500 :
1501 : // Wrapped in libpq CopyData
1502 : #[derive(PartialEq, Eq, Debug)]
1503 : pub enum PagestreamFeMessage {
1504 : Exists(PagestreamExistsRequest),
1505 : Nblocks(PagestreamNblocksRequest),
1506 : GetPage(PagestreamGetPageRequest),
1507 : DbSize(PagestreamDbSizeRequest),
1508 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1509 : #[cfg(feature = "testing")]
1510 : Test(PagestreamTestRequest),
1511 : }
1512 :
1513 : // Wrapped in libpq CopyData
1514 : #[derive(strum_macros::EnumProperty)]
1515 : pub enum PagestreamBeMessage {
1516 : Exists(PagestreamExistsResponse),
1517 : Nblocks(PagestreamNblocksResponse),
1518 : GetPage(PagestreamGetPageResponse),
1519 : Error(PagestreamErrorResponse),
1520 : DbSize(PagestreamDbSizeResponse),
1521 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1522 : #[cfg(feature = "testing")]
1523 : Test(PagestreamTestResponse),
1524 : }
1525 :
1526 : // Keep in sync with `pagestore_client.h`
1527 : #[repr(u8)]
1528 : enum PagestreamFeMessageTag {
1529 : Exists = 0,
1530 : Nblocks = 1,
1531 : GetPage = 2,
1532 : DbSize = 3,
1533 : GetSlruSegment = 4,
1534 : /* future tags above this line */
1535 : /// For testing purposes, not available in production.
1536 : #[cfg(feature = "testing")]
1537 : Test = 99,
1538 : }
1539 :
1540 : // Keep in sync with `pagestore_client.h`
1541 : #[repr(u8)]
1542 : enum PagestreamBeMessageTag {
1543 : Exists = 100,
1544 : Nblocks = 101,
1545 : GetPage = 102,
1546 : Error = 103,
1547 : DbSize = 104,
1548 : GetSlruSegment = 105,
1549 : /* future tags above this line */
1550 : /// For testing purposes, not available in production.
1551 : #[cfg(feature = "testing")]
1552 : Test = 199,
1553 : }
1554 :
1555 : impl TryFrom<u8> for PagestreamFeMessageTag {
1556 : type Error = u8;
1557 4 : fn try_from(value: u8) -> Result<Self, u8> {
1558 4 : match value {
1559 1 : 0 => Ok(PagestreamFeMessageTag::Exists),
1560 1 : 1 => Ok(PagestreamFeMessageTag::Nblocks),
1561 1 : 2 => Ok(PagestreamFeMessageTag::GetPage),
1562 1 : 3 => Ok(PagestreamFeMessageTag::DbSize),
1563 0 : 4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
1564 : #[cfg(feature = "testing")]
1565 0 : 99 => Ok(PagestreamFeMessageTag::Test),
1566 0 : _ => Err(value),
1567 : }
1568 4 : }
1569 : }
1570 :
1571 : impl TryFrom<u8> for PagestreamBeMessageTag {
1572 : type Error = u8;
1573 0 : fn try_from(value: u8) -> Result<Self, u8> {
1574 0 : match value {
1575 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1576 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1577 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1578 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1579 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1580 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1581 : #[cfg(feature = "testing")]
1582 0 : 199 => Ok(PagestreamBeMessageTag::Test),
1583 0 : _ => Err(value),
1584 : }
1585 0 : }
1586 : }
1587 :
1588 : // A GetPage request contains two LSN values:
1589 : //
1590 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
1591 : // "get the latest version present". It's used by the primary server, which knows that no one else
1592 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
1593 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
1594 : //
1595 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
1596 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
1597 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
1598 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
1599 : // request without waiting for 'request_lsn' to arrive.
1600 : //
1601 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
1602 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
1603 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
1604 : // standby to request a page at a particular non-latest LSN, and also include the
1605 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
1606 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
1607 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
1608 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
1609 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
1610 : // difference in the responses between V1 and V2.
1611 : //
1612 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
1613 : // as well as other fields from requests, which allows to verify that we receive response for our request.
1614 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
1615 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
1616 : //
1617 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1618 : pub enum PagestreamProtocolVersion {
1619 : V2,
1620 : V3,
1621 : }
1622 :
1623 : pub type RequestId = u64;
1624 :
1625 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1626 : pub struct PagestreamRequest {
1627 : pub reqid: RequestId,
1628 : pub request_lsn: Lsn,
1629 : pub not_modified_since: Lsn,
1630 : }
1631 :
1632 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1633 : pub struct PagestreamExistsRequest {
1634 : pub hdr: PagestreamRequest,
1635 : pub rel: RelTag,
1636 : }
1637 :
1638 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1639 : pub struct PagestreamNblocksRequest {
1640 : pub hdr: PagestreamRequest,
1641 : pub rel: RelTag,
1642 : }
1643 :
1644 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1645 : pub struct PagestreamGetPageRequest {
1646 : pub hdr: PagestreamRequest,
1647 : pub rel: RelTag,
1648 : pub blkno: u32,
1649 : }
1650 :
1651 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1652 : pub struct PagestreamDbSizeRequest {
1653 : pub hdr: PagestreamRequest,
1654 : pub dbnode: u32,
1655 : }
1656 :
1657 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1658 : pub struct PagestreamGetSlruSegmentRequest {
1659 : pub hdr: PagestreamRequest,
1660 : pub kind: u8,
1661 : pub segno: u32,
1662 : }
1663 :
1664 : #[derive(Debug)]
1665 : pub struct PagestreamExistsResponse {
1666 : pub req: PagestreamExistsRequest,
1667 : pub exists: bool,
1668 : }
1669 :
1670 : #[derive(Debug)]
1671 : pub struct PagestreamNblocksResponse {
1672 : pub req: PagestreamNblocksRequest,
1673 : pub n_blocks: u32,
1674 : }
1675 :
1676 : #[derive(Debug)]
1677 : pub struct PagestreamGetPageResponse {
1678 : pub req: PagestreamGetPageRequest,
1679 : pub page: Bytes,
1680 : }
1681 :
1682 : #[derive(Debug)]
1683 : pub struct PagestreamGetSlruSegmentResponse {
1684 : pub req: PagestreamGetSlruSegmentRequest,
1685 : pub segment: Bytes,
1686 : }
1687 :
1688 : #[derive(Debug)]
1689 : pub struct PagestreamErrorResponse {
1690 : pub req: PagestreamRequest,
1691 : pub message: String,
1692 : }
1693 :
1694 : #[derive(Debug)]
1695 : pub struct PagestreamDbSizeResponse {
1696 : pub req: PagestreamDbSizeRequest,
1697 : pub db_size: i64,
1698 : }
1699 :
1700 : #[cfg(feature = "testing")]
1701 : #[derive(Debug, PartialEq, Eq, Clone)]
1702 : pub struct PagestreamTestRequest {
1703 : pub hdr: PagestreamRequest,
1704 : pub batch_key: u64,
1705 : pub message: String,
1706 : }
1707 :
1708 : #[cfg(feature = "testing")]
1709 : #[derive(Debug)]
1710 : pub struct PagestreamTestResponse {
1711 : pub req: PagestreamTestRequest,
1712 : }
1713 :
1714 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
1715 : // that require pageserver-internal types. It is sufficient to get the total size.
1716 0 : #[derive(Serialize, Deserialize, Debug)]
1717 : pub struct TenantHistorySize {
1718 : pub id: TenantId,
1719 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1720 : ///
1721 : /// Will be none if `?inputs_only=true` was given.
1722 : pub size: Option<u64>,
1723 : }
1724 :
1725 : impl PagestreamFeMessage {
1726 : /// Serialize a compute -> pageserver message. This is currently only used in testing
1727 : /// tools. Always uses protocol version 3.
1728 4 : pub fn serialize(&self) -> Bytes {
1729 4 : let mut bytes = BytesMut::new();
1730 4 :
1731 4 : match self {
1732 1 : Self::Exists(req) => {
1733 1 : bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
1734 1 : bytes.put_u64(req.hdr.reqid);
1735 1 : bytes.put_u64(req.hdr.request_lsn.0);
1736 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1737 1 : bytes.put_u32(req.rel.spcnode);
1738 1 : bytes.put_u32(req.rel.dbnode);
1739 1 : bytes.put_u32(req.rel.relnode);
1740 1 : bytes.put_u8(req.rel.forknum);
1741 1 : }
1742 :
1743 1 : Self::Nblocks(req) => {
1744 1 : bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
1745 1 : bytes.put_u64(req.hdr.reqid);
1746 1 : bytes.put_u64(req.hdr.request_lsn.0);
1747 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1748 1 : bytes.put_u32(req.rel.spcnode);
1749 1 : bytes.put_u32(req.rel.dbnode);
1750 1 : bytes.put_u32(req.rel.relnode);
1751 1 : bytes.put_u8(req.rel.forknum);
1752 1 : }
1753 :
1754 1 : Self::GetPage(req) => {
1755 1 : bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
1756 1 : bytes.put_u64(req.hdr.reqid);
1757 1 : bytes.put_u64(req.hdr.request_lsn.0);
1758 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1759 1 : bytes.put_u32(req.rel.spcnode);
1760 1 : bytes.put_u32(req.rel.dbnode);
1761 1 : bytes.put_u32(req.rel.relnode);
1762 1 : bytes.put_u8(req.rel.forknum);
1763 1 : bytes.put_u32(req.blkno);
1764 1 : }
1765 :
1766 1 : Self::DbSize(req) => {
1767 1 : bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
1768 1 : bytes.put_u64(req.hdr.reqid);
1769 1 : bytes.put_u64(req.hdr.request_lsn.0);
1770 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1771 1 : bytes.put_u32(req.dbnode);
1772 1 : }
1773 :
1774 0 : Self::GetSlruSegment(req) => {
1775 0 : bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
1776 0 : bytes.put_u64(req.hdr.reqid);
1777 0 : bytes.put_u64(req.hdr.request_lsn.0);
1778 0 : bytes.put_u64(req.hdr.not_modified_since.0);
1779 0 : bytes.put_u8(req.kind);
1780 0 : bytes.put_u32(req.segno);
1781 0 : }
1782 : #[cfg(feature = "testing")]
1783 0 : Self::Test(req) => {
1784 0 : bytes.put_u8(PagestreamFeMessageTag::Test as u8);
1785 0 : bytes.put_u64(req.hdr.reqid);
1786 0 : bytes.put_u64(req.hdr.request_lsn.0);
1787 0 : bytes.put_u64(req.hdr.not_modified_since.0);
1788 0 : bytes.put_u64(req.batch_key);
1789 0 : let message = req.message.as_bytes();
1790 0 : bytes.put_u64(message.len() as u64);
1791 0 : bytes.put_slice(message);
1792 0 : }
1793 : }
1794 :
1795 4 : bytes.into()
1796 4 : }
1797 :
1798 4 : pub fn parse<R: std::io::Read>(
1799 4 : body: &mut R,
1800 4 : protocol_version: PagestreamProtocolVersion,
1801 4 : ) -> anyhow::Result<PagestreamFeMessage> {
1802 : // these correspond to the NeonMessageTag enum in pagestore_client.h
1803 : //
1804 : // TODO: consider using protobuf or serde bincode for less error prone
1805 : // serialization.
1806 4 : let msg_tag = body.read_u8()?;
1807 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
1808 : PagestreamProtocolVersion::V2 => (
1809 : 0,
1810 0 : Lsn::from(body.read_u64::<BigEndian>()?),
1811 0 : Lsn::from(body.read_u64::<BigEndian>()?),
1812 : ),
1813 : PagestreamProtocolVersion::V3 => (
1814 4 : body.read_u64::<BigEndian>()?,
1815 4 : Lsn::from(body.read_u64::<BigEndian>()?),
1816 4 : Lsn::from(body.read_u64::<BigEndian>()?),
1817 : ),
1818 : };
1819 :
1820 4 : match PagestreamFeMessageTag::try_from(msg_tag)
1821 4 : .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
1822 : {
1823 : PagestreamFeMessageTag::Exists => {
1824 : Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
1825 1 : hdr: PagestreamRequest {
1826 1 : reqid,
1827 1 : request_lsn,
1828 1 : not_modified_since,
1829 1 : },
1830 1 : rel: RelTag {
1831 1 : spcnode: body.read_u32::<BigEndian>()?,
1832 1 : dbnode: body.read_u32::<BigEndian>()?,
1833 1 : relnode: body.read_u32::<BigEndian>()?,
1834 1 : forknum: body.read_u8()?,
1835 : },
1836 : }))
1837 : }
1838 : PagestreamFeMessageTag::Nblocks => {
1839 : Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1840 1 : hdr: PagestreamRequest {
1841 1 : reqid,
1842 1 : request_lsn,
1843 1 : not_modified_since,
1844 1 : },
1845 1 : rel: RelTag {
1846 1 : spcnode: body.read_u32::<BigEndian>()?,
1847 1 : dbnode: body.read_u32::<BigEndian>()?,
1848 1 : relnode: body.read_u32::<BigEndian>()?,
1849 1 : forknum: body.read_u8()?,
1850 : },
1851 : }))
1852 : }
1853 : PagestreamFeMessageTag::GetPage => {
1854 : Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1855 1 : hdr: PagestreamRequest {
1856 1 : reqid,
1857 1 : request_lsn,
1858 1 : not_modified_since,
1859 1 : },
1860 1 : rel: RelTag {
1861 1 : spcnode: body.read_u32::<BigEndian>()?,
1862 1 : dbnode: body.read_u32::<BigEndian>()?,
1863 1 : relnode: body.read_u32::<BigEndian>()?,
1864 1 : forknum: body.read_u8()?,
1865 : },
1866 1 : blkno: body.read_u32::<BigEndian>()?,
1867 : }))
1868 : }
1869 : PagestreamFeMessageTag::DbSize => {
1870 : Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1871 1 : hdr: PagestreamRequest {
1872 1 : reqid,
1873 1 : request_lsn,
1874 1 : not_modified_since,
1875 1 : },
1876 1 : dbnode: body.read_u32::<BigEndian>()?,
1877 : }))
1878 : }
1879 : PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
1880 : PagestreamGetSlruSegmentRequest {
1881 0 : hdr: PagestreamRequest {
1882 0 : reqid,
1883 0 : request_lsn,
1884 0 : not_modified_since,
1885 0 : },
1886 0 : kind: body.read_u8()?,
1887 0 : segno: body.read_u32::<BigEndian>()?,
1888 : },
1889 : )),
1890 : #[cfg(feature = "testing")]
1891 : PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
1892 0 : hdr: PagestreamRequest {
1893 0 : reqid,
1894 0 : request_lsn,
1895 0 : not_modified_since,
1896 0 : },
1897 0 : batch_key: body.read_u64::<BigEndian>()?,
1898 : message: {
1899 0 : let len = body.read_u64::<BigEndian>()?;
1900 0 : let mut buf = vec![0; len as usize];
1901 0 : body.read_exact(&mut buf)?;
1902 0 : String::from_utf8(buf)?
1903 : },
1904 : })),
1905 : }
1906 4 : }
1907 : }
1908 :
1909 : impl PagestreamBeMessage {
1910 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
1911 0 : let mut bytes = BytesMut::new();
1912 :
1913 : use PagestreamBeMessageTag as Tag;
1914 0 : match protocol_version {
1915 : PagestreamProtocolVersion::V2 => {
1916 0 : match self {
1917 0 : Self::Exists(resp) => {
1918 0 : bytes.put_u8(Tag::Exists as u8);
1919 0 : bytes.put_u8(resp.exists as u8);
1920 0 : }
1921 :
1922 0 : Self::Nblocks(resp) => {
1923 0 : bytes.put_u8(Tag::Nblocks as u8);
1924 0 : bytes.put_u32(resp.n_blocks);
1925 0 : }
1926 :
1927 0 : Self::GetPage(resp) => {
1928 0 : bytes.put_u8(Tag::GetPage as u8);
1929 0 : bytes.put(&resp.page[..])
1930 : }
1931 :
1932 0 : Self::Error(resp) => {
1933 0 : bytes.put_u8(Tag::Error as u8);
1934 0 : bytes.put(resp.message.as_bytes());
1935 0 : bytes.put_u8(0); // null terminator
1936 0 : }
1937 0 : Self::DbSize(resp) => {
1938 0 : bytes.put_u8(Tag::DbSize as u8);
1939 0 : bytes.put_i64(resp.db_size);
1940 0 : }
1941 :
1942 0 : Self::GetSlruSegment(resp) => {
1943 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1944 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1945 0 : bytes.put(&resp.segment[..]);
1946 0 : }
1947 :
1948 : #[cfg(feature = "testing")]
1949 0 : Self::Test(resp) => {
1950 0 : bytes.put_u8(Tag::Test as u8);
1951 0 : bytes.put_u64(resp.req.batch_key);
1952 0 : let message = resp.req.message.as_bytes();
1953 0 : bytes.put_u64(message.len() as u64);
1954 0 : bytes.put_slice(message);
1955 0 : }
1956 : }
1957 : }
1958 : PagestreamProtocolVersion::V3 => {
1959 0 : match self {
1960 0 : Self::Exists(resp) => {
1961 0 : bytes.put_u8(Tag::Exists as u8);
1962 0 : bytes.put_u64(resp.req.hdr.reqid);
1963 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1964 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1965 0 : bytes.put_u32(resp.req.rel.spcnode);
1966 0 : bytes.put_u32(resp.req.rel.dbnode);
1967 0 : bytes.put_u32(resp.req.rel.relnode);
1968 0 : bytes.put_u8(resp.req.rel.forknum);
1969 0 : bytes.put_u8(resp.exists as u8);
1970 0 : }
1971 :
1972 0 : Self::Nblocks(resp) => {
1973 0 : bytes.put_u8(Tag::Nblocks as u8);
1974 0 : bytes.put_u64(resp.req.hdr.reqid);
1975 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1976 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1977 0 : bytes.put_u32(resp.req.rel.spcnode);
1978 0 : bytes.put_u32(resp.req.rel.dbnode);
1979 0 : bytes.put_u32(resp.req.rel.relnode);
1980 0 : bytes.put_u8(resp.req.rel.forknum);
1981 0 : bytes.put_u32(resp.n_blocks);
1982 0 : }
1983 :
1984 0 : Self::GetPage(resp) => {
1985 0 : bytes.put_u8(Tag::GetPage as u8);
1986 0 : bytes.put_u64(resp.req.hdr.reqid);
1987 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1988 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1989 0 : bytes.put_u32(resp.req.rel.spcnode);
1990 0 : bytes.put_u32(resp.req.rel.dbnode);
1991 0 : bytes.put_u32(resp.req.rel.relnode);
1992 0 : bytes.put_u8(resp.req.rel.forknum);
1993 0 : bytes.put_u32(resp.req.blkno);
1994 0 : bytes.put(&resp.page[..])
1995 : }
1996 :
1997 0 : Self::Error(resp) => {
1998 0 : bytes.put_u8(Tag::Error as u8);
1999 0 : bytes.put_u64(resp.req.reqid);
2000 0 : bytes.put_u64(resp.req.request_lsn.0);
2001 0 : bytes.put_u64(resp.req.not_modified_since.0);
2002 0 : bytes.put(resp.message.as_bytes());
2003 0 : bytes.put_u8(0); // null terminator
2004 0 : }
2005 0 : Self::DbSize(resp) => {
2006 0 : bytes.put_u8(Tag::DbSize as u8);
2007 0 : bytes.put_u64(resp.req.hdr.reqid);
2008 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2009 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2010 0 : bytes.put_u32(resp.req.dbnode);
2011 0 : bytes.put_i64(resp.db_size);
2012 0 : }
2013 :
2014 0 : Self::GetSlruSegment(resp) => {
2015 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2016 0 : bytes.put_u64(resp.req.hdr.reqid);
2017 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2018 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2019 0 : bytes.put_u8(resp.req.kind);
2020 0 : bytes.put_u32(resp.req.segno);
2021 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2022 0 : bytes.put(&resp.segment[..]);
2023 0 : }
2024 :
2025 : #[cfg(feature = "testing")]
2026 0 : Self::Test(resp) => {
2027 0 : bytes.put_u8(Tag::Test as u8);
2028 0 : bytes.put_u64(resp.req.hdr.reqid);
2029 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2030 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2031 0 : bytes.put_u64(resp.req.batch_key);
2032 0 : let message = resp.req.message.as_bytes();
2033 0 : bytes.put_u64(message.len() as u64);
2034 0 : bytes.put_slice(message);
2035 0 : }
2036 : }
2037 : }
2038 : }
2039 0 : bytes.into()
2040 0 : }
2041 :
2042 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
2043 0 : let mut buf = buf.reader();
2044 0 : let msg_tag = buf.read_u8()?;
2045 :
2046 : use PagestreamBeMessageTag as Tag;
2047 0 : let ok =
2048 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
2049 : Tag::Exists => {
2050 0 : let reqid = buf.read_u64::<BigEndian>()?;
2051 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2052 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2053 0 : let rel = RelTag {
2054 0 : spcnode: buf.read_u32::<BigEndian>()?,
2055 0 : dbnode: buf.read_u32::<BigEndian>()?,
2056 0 : relnode: buf.read_u32::<BigEndian>()?,
2057 0 : forknum: buf.read_u8()?,
2058 : };
2059 0 : let exists = buf.read_u8()? != 0;
2060 0 : Self::Exists(PagestreamExistsResponse {
2061 0 : req: PagestreamExistsRequest {
2062 0 : hdr: PagestreamRequest {
2063 0 : reqid,
2064 0 : request_lsn,
2065 0 : not_modified_since,
2066 0 : },
2067 0 : rel,
2068 0 : },
2069 0 : exists,
2070 0 : })
2071 : }
2072 : Tag::Nblocks => {
2073 0 : let reqid = buf.read_u64::<BigEndian>()?;
2074 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2075 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2076 0 : let rel = RelTag {
2077 0 : spcnode: buf.read_u32::<BigEndian>()?,
2078 0 : dbnode: buf.read_u32::<BigEndian>()?,
2079 0 : relnode: buf.read_u32::<BigEndian>()?,
2080 0 : forknum: buf.read_u8()?,
2081 : };
2082 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2083 0 : Self::Nblocks(PagestreamNblocksResponse {
2084 0 : req: PagestreamNblocksRequest {
2085 0 : hdr: PagestreamRequest {
2086 0 : reqid,
2087 0 : request_lsn,
2088 0 : not_modified_since,
2089 0 : },
2090 0 : rel,
2091 0 : },
2092 0 : n_blocks,
2093 0 : })
2094 : }
2095 : Tag::GetPage => {
2096 0 : let reqid = buf.read_u64::<BigEndian>()?;
2097 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2098 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2099 0 : let rel = RelTag {
2100 0 : spcnode: buf.read_u32::<BigEndian>()?,
2101 0 : dbnode: buf.read_u32::<BigEndian>()?,
2102 0 : relnode: buf.read_u32::<BigEndian>()?,
2103 0 : forknum: buf.read_u8()?,
2104 : };
2105 0 : let blkno = buf.read_u32::<BigEndian>()?;
2106 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
2107 0 : buf.read_exact(&mut page)?;
2108 0 : Self::GetPage(PagestreamGetPageResponse {
2109 0 : req: PagestreamGetPageRequest {
2110 0 : hdr: PagestreamRequest {
2111 0 : reqid,
2112 0 : request_lsn,
2113 0 : not_modified_since,
2114 0 : },
2115 0 : rel,
2116 0 : blkno,
2117 0 : },
2118 0 : page: page.into(),
2119 0 : })
2120 : }
2121 : Tag::Error => {
2122 0 : let reqid = buf.read_u64::<BigEndian>()?;
2123 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2124 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2125 0 : let mut msg = Vec::new();
2126 0 : buf.read_until(0, &mut msg)?;
2127 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
2128 0 : let rust_str = cstring.to_str()?;
2129 0 : Self::Error(PagestreamErrorResponse {
2130 0 : req: PagestreamRequest {
2131 0 : reqid,
2132 0 : request_lsn,
2133 0 : not_modified_since,
2134 0 : },
2135 0 : message: rust_str.to_owned(),
2136 0 : })
2137 : }
2138 : Tag::DbSize => {
2139 0 : let reqid = buf.read_u64::<BigEndian>()?;
2140 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2141 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2142 0 : let dbnode = buf.read_u32::<BigEndian>()?;
2143 0 : let db_size = buf.read_i64::<BigEndian>()?;
2144 0 : Self::DbSize(PagestreamDbSizeResponse {
2145 0 : req: PagestreamDbSizeRequest {
2146 0 : hdr: PagestreamRequest {
2147 0 : reqid,
2148 0 : request_lsn,
2149 0 : not_modified_since,
2150 0 : },
2151 0 : dbnode,
2152 0 : },
2153 0 : db_size,
2154 0 : })
2155 : }
2156 : Tag::GetSlruSegment => {
2157 0 : let reqid = buf.read_u64::<BigEndian>()?;
2158 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2159 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2160 0 : let kind = buf.read_u8()?;
2161 0 : let segno = buf.read_u32::<BigEndian>()?;
2162 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2163 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
2164 0 : buf.read_exact(&mut segment)?;
2165 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
2166 0 : req: PagestreamGetSlruSegmentRequest {
2167 0 : hdr: PagestreamRequest {
2168 0 : reqid,
2169 0 : request_lsn,
2170 0 : not_modified_since,
2171 0 : },
2172 0 : kind,
2173 0 : segno,
2174 0 : },
2175 0 : segment: segment.into(),
2176 0 : })
2177 : }
2178 : #[cfg(feature = "testing")]
2179 : Tag::Test => {
2180 0 : let reqid = buf.read_u64::<BigEndian>()?;
2181 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2182 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2183 0 : let batch_key = buf.read_u64::<BigEndian>()?;
2184 0 : let len = buf.read_u64::<BigEndian>()?;
2185 0 : let mut msg = vec![0; len as usize];
2186 0 : buf.read_exact(&mut msg)?;
2187 0 : let message = String::from_utf8(msg)?;
2188 0 : Self::Test(PagestreamTestResponse {
2189 0 : req: PagestreamTestRequest {
2190 0 : hdr: PagestreamRequest {
2191 0 : reqid,
2192 0 : request_lsn,
2193 0 : not_modified_since,
2194 0 : },
2195 0 : batch_key,
2196 0 : message,
2197 0 : },
2198 0 : })
2199 : }
2200 : };
2201 0 : let remaining = buf.into_inner();
2202 0 : if !remaining.is_empty() {
2203 0 : anyhow::bail!(
2204 0 : "remaining bytes in msg with tag={msg_tag}: {}",
2205 0 : remaining.len()
2206 0 : );
2207 0 : }
2208 0 : Ok(ok)
2209 0 : }
2210 :
2211 0 : pub fn kind(&self) -> &'static str {
2212 0 : match self {
2213 0 : Self::Exists(_) => "Exists",
2214 0 : Self::Nblocks(_) => "Nblocks",
2215 0 : Self::GetPage(_) => "GetPage",
2216 0 : Self::Error(_) => "Error",
2217 0 : Self::DbSize(_) => "DbSize",
2218 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
2219 : #[cfg(feature = "testing")]
2220 0 : Self::Test(_) => "Test",
2221 : }
2222 0 : }
2223 : }
2224 :
2225 0 : #[derive(Debug, Serialize, Deserialize)]
2226 : pub struct PageTraceEvent {
2227 : pub key: CompactKey,
2228 : pub effective_lsn: Lsn,
2229 : pub time: SystemTime,
2230 : }
2231 :
2232 : impl Default for PageTraceEvent {
2233 0 : fn default() -> Self {
2234 0 : Self {
2235 0 : key: Default::default(),
2236 0 : effective_lsn: Default::default(),
2237 0 : time: std::time::UNIX_EPOCH,
2238 0 : }
2239 0 : }
2240 : }
2241 :
2242 : #[cfg(test)]
2243 : mod tests {
2244 : use serde_json::json;
2245 : use std::str::FromStr;
2246 :
2247 : use super::*;
2248 :
2249 : #[test]
2250 1 : fn test_pagestream() {
2251 1 : // Test serialization/deserialization of PagestreamFeMessage
2252 1 : let messages = vec![
2253 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
2254 1 : hdr: PagestreamRequest {
2255 1 : reqid: 0,
2256 1 : request_lsn: Lsn(4),
2257 1 : not_modified_since: Lsn(3),
2258 1 : },
2259 1 : rel: RelTag {
2260 1 : forknum: 1,
2261 1 : spcnode: 2,
2262 1 : dbnode: 3,
2263 1 : relnode: 4,
2264 1 : },
2265 1 : }),
2266 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2267 1 : hdr: PagestreamRequest {
2268 1 : reqid: 0,
2269 1 : request_lsn: Lsn(4),
2270 1 : not_modified_since: Lsn(4),
2271 1 : },
2272 1 : rel: RelTag {
2273 1 : forknum: 1,
2274 1 : spcnode: 2,
2275 1 : dbnode: 3,
2276 1 : relnode: 4,
2277 1 : },
2278 1 : }),
2279 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2280 1 : hdr: PagestreamRequest {
2281 1 : reqid: 0,
2282 1 : request_lsn: Lsn(4),
2283 1 : not_modified_since: Lsn(3),
2284 1 : },
2285 1 : rel: RelTag {
2286 1 : forknum: 1,
2287 1 : spcnode: 2,
2288 1 : dbnode: 3,
2289 1 : relnode: 4,
2290 1 : },
2291 1 : blkno: 7,
2292 1 : }),
2293 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2294 1 : hdr: PagestreamRequest {
2295 1 : reqid: 0,
2296 1 : request_lsn: Lsn(4),
2297 1 : not_modified_since: Lsn(3),
2298 1 : },
2299 1 : dbnode: 7,
2300 1 : }),
2301 1 : ];
2302 5 : for msg in messages {
2303 4 : let bytes = msg.serialize();
2304 4 : let reconstructed =
2305 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
2306 4 : .unwrap();
2307 4 : assert!(msg == reconstructed);
2308 : }
2309 1 : }
2310 :
2311 : #[test]
2312 1 : fn test_tenantinfo_serde() {
2313 1 : // Test serialization/deserialization of TenantInfo
2314 1 : let original_active = TenantInfo {
2315 1 : id: TenantShardId::unsharded(TenantId::generate()),
2316 1 : state: TenantState::Active,
2317 1 : current_physical_size: Some(42),
2318 1 : attachment_status: TenantAttachmentStatus::Attached,
2319 1 : generation: 1,
2320 1 : gc_blocking: None,
2321 1 : };
2322 1 : let expected_active = json!({
2323 1 : "id": original_active.id.to_string(),
2324 1 : "state": {
2325 1 : "slug": "Active",
2326 1 : },
2327 1 : "current_physical_size": 42,
2328 1 : "attachment_status": {
2329 1 : "slug":"attached",
2330 1 : },
2331 1 : "generation" : 1
2332 1 : });
2333 1 :
2334 1 : let original_broken = TenantInfo {
2335 1 : id: TenantShardId::unsharded(TenantId::generate()),
2336 1 : state: TenantState::Broken {
2337 1 : reason: "reason".into(),
2338 1 : backtrace: "backtrace info".into(),
2339 1 : },
2340 1 : current_physical_size: Some(42),
2341 1 : attachment_status: TenantAttachmentStatus::Attached,
2342 1 : generation: 1,
2343 1 : gc_blocking: None,
2344 1 : };
2345 1 : let expected_broken = json!({
2346 1 : "id": original_broken.id.to_string(),
2347 1 : "state": {
2348 1 : "slug": "Broken",
2349 1 : "data": {
2350 1 : "backtrace": "backtrace info",
2351 1 : "reason": "reason",
2352 1 : }
2353 1 : },
2354 1 : "current_physical_size": 42,
2355 1 : "attachment_status": {
2356 1 : "slug":"attached",
2357 1 : },
2358 1 : "generation" : 1
2359 1 : });
2360 1 :
2361 1 : assert_eq!(
2362 1 : serde_json::to_value(&original_active).unwrap(),
2363 1 : expected_active
2364 1 : );
2365 :
2366 1 : assert_eq!(
2367 1 : serde_json::to_value(&original_broken).unwrap(),
2368 1 : expected_broken
2369 1 : );
2370 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
2371 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
2372 1 : }
2373 :
2374 : #[test]
2375 1 : fn test_reject_unknown_field() {
2376 1 : let id = TenantId::generate();
2377 1 : let config_request = json!({
2378 1 : "tenant_id": id.to_string(),
2379 1 : "unknown_field": "unknown_value".to_string(),
2380 1 : });
2381 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
2382 1 : assert!(
2383 1 : err.to_string().contains("unknown field `unknown_field`"),
2384 0 : "expect unknown field `unknown_field` error, got: {}",
2385 : err
2386 : );
2387 1 : }
2388 :
2389 : #[test]
2390 1 : fn tenantstatus_activating_serde() {
2391 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
2392 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
2393 1 :
2394 1 : let actual = serde_json::to_string(&states).unwrap();
2395 1 :
2396 1 : assert_eq!(actual, expected);
2397 :
2398 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
2399 1 :
2400 1 : assert_eq!(states.as_slice(), &parsed);
2401 1 : }
2402 :
2403 : #[test]
2404 1 : fn tenantstatus_activating_strum() {
2405 1 : // tests added, because we use these for metrics
2406 1 : let examples = [
2407 1 : (line!(), TenantState::Attaching, "Attaching"),
2408 1 : (
2409 1 : line!(),
2410 1 : TenantState::Activating(ActivatingFrom::Attaching),
2411 1 : "Activating",
2412 1 : ),
2413 1 : (line!(), TenantState::Active, "Active"),
2414 1 : (
2415 1 : line!(),
2416 1 : TenantState::Stopping {
2417 1 : progress: utils::completion::Barrier::default(),
2418 1 : },
2419 1 : "Stopping",
2420 1 : ),
2421 1 : (
2422 1 : line!(),
2423 1 : TenantState::Broken {
2424 1 : reason: "Example".into(),
2425 1 : backtrace: "Looooong backtrace".into(),
2426 1 : },
2427 1 : "Broken",
2428 1 : ),
2429 1 : ];
2430 :
2431 6 : for (line, rendered, expected) in examples {
2432 5 : let actual: &'static str = rendered.into();
2433 5 : assert_eq!(actual, expected, "example on {line}");
2434 : }
2435 1 : }
2436 :
2437 : #[test]
2438 1 : fn test_image_compression_algorithm_parsing() {
2439 : use ImageCompressionAlgorithm::*;
2440 1 : let cases = [
2441 1 : ("disabled", Disabled),
2442 1 : ("zstd", Zstd { level: None }),
2443 1 : ("zstd(18)", Zstd { level: Some(18) }),
2444 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
2445 1 : ];
2446 :
2447 5 : for (display, expected) in cases {
2448 4 : assert_eq!(
2449 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
2450 : expected,
2451 0 : "parsing works"
2452 : );
2453 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
2454 :
2455 4 : let ser = serde_json::to_string(&expected).expect("serialization");
2456 4 : assert_eq!(
2457 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
2458 : expected,
2459 0 : "serde roundtrip"
2460 : );
2461 :
2462 4 : assert_eq!(
2463 4 : serde_json::Value::String(display.to_string()),
2464 4 : serde_json::to_value(expected).unwrap(),
2465 0 : "Display is the serde serialization"
2466 : );
2467 : }
2468 1 : }
2469 :
2470 : #[test]
2471 1 : fn test_tenant_config_patch_request_serde() {
2472 1 : let patch_request = TenantConfigPatchRequest {
2473 1 : tenant_id: TenantId::from_str("17c6d121946a61e5ab0fe5a2fd4d8215").unwrap(),
2474 1 : config: TenantConfigPatch {
2475 1 : checkpoint_distance: FieldPatch::Upsert(42),
2476 1 : gc_horizon: FieldPatch::Remove,
2477 1 : compaction_threshold: FieldPatch::Noop,
2478 1 : ..TenantConfigPatch::default()
2479 1 : },
2480 1 : };
2481 1 :
2482 1 : let json = serde_json::to_string(&patch_request).unwrap();
2483 1 :
2484 1 : let expected = r#"{"tenant_id":"17c6d121946a61e5ab0fe5a2fd4d8215","checkpoint_distance":42,"gc_horizon":null}"#;
2485 1 : assert_eq!(json, expected);
2486 :
2487 1 : let decoded: TenantConfigPatchRequest = serde_json::from_str(&json).unwrap();
2488 1 : assert_eq!(decoded.tenant_id, patch_request.tenant_id);
2489 1 : assert_eq!(decoded.config, patch_request.config);
2490 :
2491 : // Now apply the patch to a config to demonstrate semantics
2492 :
2493 1 : let base = TenantConfig {
2494 1 : checkpoint_distance: Some(28),
2495 1 : gc_horizon: Some(100),
2496 1 : compaction_target_size: Some(1024),
2497 1 : ..Default::default()
2498 1 : };
2499 1 :
2500 1 : let expected = TenantConfig {
2501 1 : checkpoint_distance: Some(42),
2502 1 : gc_horizon: None,
2503 1 : ..base.clone()
2504 1 : };
2505 1 :
2506 1 : let patched = base.apply_patch(decoded.config);
2507 1 :
2508 1 : assert_eq!(patched, expected);
2509 1 : }
2510 : }
|