Line data Source code
1 : pub mod partitioning;
2 : pub mod utilization;
3 :
4 : pub use utilization::PageserverUtilization;
5 :
6 : use std::{
7 : collections::HashMap,
8 : io::{BufRead, Read},
9 : num::{NonZeroU64, NonZeroUsize},
10 : time::{Duration, SystemTime},
11 : };
12 :
13 : use byteorder::{BigEndian, ReadBytesExt};
14 : use postgres_ffi::BLCKSZ;
15 : use serde::{Deserialize, Serialize};
16 : use serde_with::serde_as;
17 : use strum_macros;
18 : use utils::{
19 : completion,
20 : history_buffer::HistoryBufferWithDropCounter,
21 : id::{NodeId, TenantId, TimelineId},
22 : lsn::Lsn,
23 : };
24 :
25 : use crate::{
26 : reltag::RelTag,
27 : shard::{ShardCount, ShardStripeSize, TenantShardId},
28 : };
29 : use anyhow::bail;
30 : use bytes::{Buf, BufMut, Bytes, BytesMut};
31 :
32 : /// The state of a tenant in this pageserver.
33 : ///
34 : /// ```mermaid
35 : /// stateDiagram-v2
36 : ///
37 : /// [*] --> Loading: spawn_load()
38 : /// [*] --> Attaching: spawn_attach()
39 : ///
40 : /// Loading --> Activating: activate()
41 : /// Attaching --> Activating: activate()
42 : /// Activating --> Active: infallible
43 : ///
44 : /// Loading --> Broken: load() failure
45 : /// Attaching --> Broken: attach() failure
46 : ///
47 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
48 : /// Stopping --> Broken: late error in remove_tenant_from_memory
49 : ///
50 : /// Broken --> [*]: ignore / detach / shutdown
51 : /// Stopping --> [*]: remove_from_memory complete
52 : ///
53 : /// Active --> Broken: cfg(testing)-only tenant break point
54 : /// ```
55 : #[derive(
56 98 : Clone,
57 88 : PartialEq,
58 : Eq,
59 10 : serde::Serialize,
60 12 : serde::Deserialize,
61 0 : strum_macros::Display,
62 : strum_macros::EnumVariantNames,
63 0 : strum_macros::AsRefStr,
64 100 : strum_macros::IntoStaticStr,
65 : )]
66 : #[serde(tag = "slug", content = "data")]
67 : pub enum TenantState {
68 : /// This tenant is being loaded from local disk.
69 : ///
70 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
71 : Loading,
72 : /// This tenant is being attached to the pageserver.
73 : ///
74 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
75 : Attaching,
76 : /// The tenant is transitioning from Loading/Attaching to Active.
77 : ///
78 : /// While in this state, the individual timelines are being activated.
79 : ///
80 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
81 : Activating(ActivatingFrom),
82 : /// The tenant has finished activating and is open for business.
83 : ///
84 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
85 : Active,
86 : /// The tenant is recognized by pageserver, but it is being detached or the
87 : /// system is being shut down.
88 : ///
89 : /// Transitions out of this state are possible through `set_broken()`.
90 : Stopping {
91 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
92 : // otherwise it will not be skipped during deserialization
93 : #[serde(skip)]
94 : progress: completion::Barrier,
95 : },
96 : /// The tenant is recognized by the pageserver, but can no longer be used for
97 : /// any operations.
98 : ///
99 : /// If the tenant fails to load or attach, it will transition to this state
100 : /// and it is guaranteed that no background tasks are running in its name.
101 : ///
102 : /// The other way to transition into this state is from `Stopping` state
103 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
104 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
105 : Broken { reason: String, backtrace: String },
106 : }
107 :
108 : impl TenantState {
109 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
110 : use TenantAttachmentStatus::*;
111 :
112 : // Below TenantState::Activating is used as "transient" or "transparent" state for
113 : // attachment_status determining.
114 0 : match self {
115 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
116 : // So, technically, we can return Attached here.
117 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
118 : // But, our attach task might still be fetching the remote timelines, etc.
119 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
120 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
121 : // tenant mgr startup distinguishes attaching from loading via marker file.
122 0 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
123 : // We only reach Active after successful load / attach.
124 : // So, call atttachment status Attached.
125 0 : Self::Active => Attached,
126 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
127 : // However, it also becomes Broken if the regular load fails.
128 : // From Console's perspective there's no practical difference
129 : // because attachment_status is polled by console only during attach operation execution.
130 0 : Self::Broken { reason, .. } => Failed {
131 0 : reason: reason.to_owned(),
132 0 : },
133 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
134 : // we set the Stopping state irrespective of whether the tenant
135 : // has finished attaching or not.
136 0 : Self::Stopping { .. } => Maybe,
137 : }
138 0 : }
139 :
140 0 : pub fn broken_from_reason(reason: String) -> Self {
141 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
142 0 : Self::Broken {
143 0 : reason,
144 0 : backtrace: backtrace_str,
145 0 : }
146 0 : }
147 : }
148 :
149 : impl std::fmt::Debug for TenantState {
150 4 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 4 : match self {
152 4 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
153 4 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
154 : }
155 0 : _ => write!(f, "{self}"),
156 : }
157 4 : }
158 : }
159 :
160 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
161 8 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
162 : pub enum ActivatingFrom {
163 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
164 : Loading,
165 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
166 : Attaching,
167 : }
168 :
169 : /// A state of a timeline in pageserver's memory.
170 454400 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
171 : pub enum TimelineState {
172 : /// The timeline is recognized by the pageserver but is not yet operational.
173 : /// In particular, the walreceiver connection loop is not running for this timeline.
174 : /// It will eventually transition to state Active or Broken.
175 : Loading,
176 : /// The timeline is fully operational.
177 : /// It can be queried, and the walreceiver connection loop is running.
178 : Active,
179 : /// The timeline was previously Loading or Active but is shutting down.
180 : /// It cannot transition back into any other state.
181 : Stopping,
182 : /// The timeline is broken and not operational (previous states: Loading or Active).
183 : Broken { reason: String, backtrace: String },
184 : }
185 :
186 0 : #[derive(Serialize, Deserialize, Clone)]
187 : pub struct TimelineCreateRequest {
188 : pub new_timeline_id: TimelineId,
189 : #[serde(default)]
190 : pub ancestor_timeline_id: Option<TimelineId>,
191 : #[serde(default)]
192 : pub existing_initdb_timeline_id: Option<TimelineId>,
193 : #[serde(default)]
194 : pub ancestor_start_lsn: Option<Lsn>,
195 : pub pg_version: Option<u32>,
196 : }
197 :
198 0 : #[derive(Serialize, Deserialize)]
199 : pub struct TenantShardSplitRequest {
200 : pub new_shard_count: u8,
201 : }
202 :
203 0 : #[derive(Serialize, Deserialize)]
204 : pub struct TenantShardSplitResponse {
205 : pub new_shards: Vec<TenantShardId>,
206 : }
207 :
208 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
209 0 : #[derive(Serialize, Deserialize, Debug)]
210 : #[serde(deny_unknown_fields)]
211 : pub struct ShardParameters {
212 : pub count: ShardCount,
213 : pub stripe_size: ShardStripeSize,
214 : }
215 :
216 : impl ShardParameters {
217 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
218 :
219 0 : pub fn is_unsharded(&self) -> bool {
220 0 : self.count.is_unsharded()
221 0 : }
222 : }
223 :
224 : impl Default for ShardParameters {
225 86 : fn default() -> Self {
226 86 : Self {
227 86 : count: ShardCount::new(0),
228 86 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
229 86 : }
230 86 : }
231 : }
232 :
233 8 : #[derive(Serialize, Deserialize, Debug)]
234 : #[serde(deny_unknown_fields)]
235 : pub struct TenantCreateRequest {
236 : pub new_tenant_id: TenantShardId,
237 : #[serde(default)]
238 : #[serde(skip_serializing_if = "Option::is_none")]
239 : pub generation: Option<u32>,
240 :
241 : // If omitted, create a single shard with TenantShardId::unsharded()
242 : #[serde(default)]
243 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
244 : pub shard_parameters: ShardParameters,
245 :
246 : #[serde(flatten)]
247 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
248 : }
249 :
250 0 : #[derive(Deserialize, Debug)]
251 : #[serde(deny_unknown_fields)]
252 : pub struct TenantLoadRequest {
253 : #[serde(default)]
254 : #[serde(skip_serializing_if = "Option::is_none")]
255 : pub generation: Option<u32>,
256 : }
257 :
258 : impl std::ops::Deref for TenantCreateRequest {
259 : type Target = TenantConfig;
260 :
261 0 : fn deref(&self) -> &Self::Target {
262 0 : &self.config
263 0 : }
264 : }
265 :
266 : /// An alternative representation of `pageserver::tenant::TenantConf` with
267 : /// simpler types.
268 6 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
269 : pub struct TenantConfig {
270 : pub checkpoint_distance: Option<u64>,
271 : pub checkpoint_timeout: Option<String>,
272 : pub compaction_target_size: Option<u64>,
273 : pub compaction_period: Option<String>,
274 : pub compaction_threshold: Option<usize>,
275 : pub gc_horizon: Option<u64>,
276 : pub gc_period: Option<String>,
277 : pub image_creation_threshold: Option<usize>,
278 : pub pitr_interval: Option<String>,
279 : pub walreceiver_connect_timeout: Option<String>,
280 : pub lagging_wal_timeout: Option<String>,
281 : pub max_lsn_wal_lag: Option<NonZeroU64>,
282 : pub trace_read_requests: Option<bool>,
283 : pub eviction_policy: Option<EvictionPolicy>,
284 : pub min_resident_size_override: Option<u64>,
285 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
286 : pub gc_feedback: Option<bool>,
287 : pub heatmap_period: Option<String>,
288 : pub lazy_slru_download: Option<bool>,
289 : pub timeline_get_throttle: Option<ThrottleConfig>,
290 : }
291 :
292 4206 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
293 : #[serde(tag = "kind")]
294 : pub enum EvictionPolicy {
295 : NoEviction,
296 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
297 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
298 : }
299 :
300 : impl EvictionPolicy {
301 0 : pub fn discriminant_str(&self) -> &'static str {
302 0 : match self {
303 0 : EvictionPolicy::NoEviction => "NoEviction",
304 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
305 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
306 : }
307 0 : }
308 : }
309 :
310 28 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
311 : pub struct EvictionPolicyLayerAccessThreshold {
312 : #[serde(with = "humantime_serde")]
313 : pub period: Duration,
314 : #[serde(with = "humantime_serde")]
315 : pub threshold: Duration,
316 : }
317 :
318 4374 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
319 : pub struct ThrottleConfig {
320 : pub task_kinds: Vec<String>, // TaskKind
321 : pub initial: usize,
322 : #[serde(with = "humantime_serde")]
323 : pub refill_interval: Duration,
324 : pub refill_amount: NonZeroUsize,
325 : pub max: usize,
326 : pub fair: bool,
327 : }
328 :
329 : impl ThrottleConfig {
330 216 : pub fn disabled() -> Self {
331 216 : Self {
332 216 : task_kinds: vec![], // effectively disables the throttle
333 216 : // other values don't matter with emtpy `task_kinds`.
334 216 : initial: 0,
335 216 : refill_interval: Duration::from_millis(1),
336 216 : refill_amount: NonZeroUsize::new(1).unwrap(),
337 216 : max: 1,
338 216 : fair: true,
339 216 : }
340 216 : }
341 : /// The requests per second allowed by the given config.
342 0 : pub fn steady_rps(&self) -> f64 {
343 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
344 0 : }
345 : }
346 :
347 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
348 : /// lists out all possible states (and the virtual "Detached" state)
349 : /// in a flat form rather than using rust-style enums.
350 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
351 : pub enum LocationConfigMode {
352 : AttachedSingle,
353 : AttachedMulti,
354 : AttachedStale,
355 : Secondary,
356 : Detached,
357 : }
358 :
359 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
360 : pub struct LocationConfigSecondary {
361 : pub warm: bool,
362 : }
363 :
364 : /// An alternative representation of `pageserver::tenant::LocationConf`,
365 : /// for use in external-facing APIs.
366 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
367 : pub struct LocationConfig {
368 : pub mode: LocationConfigMode,
369 : /// If attaching, in what generation?
370 : #[serde(default)]
371 : pub generation: Option<u32>,
372 :
373 : // If requesting mode `Secondary`, configuration for that.
374 : #[serde(default)]
375 : pub secondary_conf: Option<LocationConfigSecondary>,
376 :
377 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
378 : // must be set accurately.
379 : #[serde(default)]
380 : pub shard_number: u8,
381 : #[serde(default)]
382 : pub shard_count: u8,
383 : #[serde(default)]
384 : pub shard_stripe_size: u32,
385 :
386 : // This configuration only affects attached mode, but should be provided irrespective
387 : // of the mode, as a secondary location might transition on startup if the response
388 : // to the `/re-attach` control plane API requests it.
389 : pub tenant_conf: TenantConfig,
390 : }
391 :
392 0 : #[derive(Serialize, Deserialize)]
393 : pub struct LocationConfigListResponse {
394 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
395 : }
396 :
397 0 : #[derive(Serialize, Deserialize)]
398 : #[serde(transparent)]
399 : pub struct TenantCreateResponse(pub TenantId);
400 :
401 0 : #[derive(Serialize)]
402 : pub struct StatusResponse {
403 : pub id: NodeId,
404 : }
405 :
406 0 : #[derive(Serialize, Deserialize, Debug)]
407 : #[serde(deny_unknown_fields)]
408 : pub struct TenantLocationConfigRequest {
409 : pub tenant_id: TenantShardId,
410 : #[serde(flatten)]
411 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
412 : }
413 :
414 0 : #[derive(Serialize, Deserialize, Debug)]
415 : #[serde(deny_unknown_fields)]
416 : pub struct TenantTimeTravelRequest {
417 : pub shard_counts: Vec<ShardCount>,
418 : }
419 :
420 0 : #[derive(Serialize, Deserialize, Debug)]
421 : #[serde(deny_unknown_fields)]
422 : pub struct TenantShardLocation {
423 : pub shard_id: TenantShardId,
424 : pub node_id: NodeId,
425 : }
426 :
427 0 : #[derive(Serialize, Deserialize, Debug)]
428 : #[serde(deny_unknown_fields)]
429 : pub struct TenantLocationConfigResponse {
430 : pub shards: Vec<TenantShardLocation>,
431 : }
432 :
433 8 : #[derive(Serialize, Deserialize, Debug)]
434 : #[serde(deny_unknown_fields)]
435 : pub struct TenantConfigRequest {
436 : pub tenant_id: TenantId,
437 : #[serde(flatten)]
438 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
439 : }
440 :
441 : impl std::ops::Deref for TenantConfigRequest {
442 : type Target = TenantConfig;
443 :
444 0 : fn deref(&self) -> &Self::Target {
445 0 : &self.config
446 0 : }
447 : }
448 :
449 : impl TenantConfigRequest {
450 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
451 0 : let config = TenantConfig::default();
452 0 : TenantConfigRequest { tenant_id, config }
453 0 : }
454 : }
455 :
456 8 : #[derive(Debug, Deserialize)]
457 : pub struct TenantAttachRequest {
458 : #[serde(default)]
459 : pub config: TenantAttachConfig,
460 : #[serde(default)]
461 : pub generation: Option<u32>,
462 : }
463 :
464 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
465 : /// its usage inside `TenantAttachRequest`.
466 2 : #[derive(Debug, Serialize, Deserialize, Default)]
467 : #[serde(deny_unknown_fields)]
468 : pub struct TenantAttachConfig {
469 : #[serde(flatten)]
470 : allowing_unknown_fields: TenantConfig,
471 : }
472 :
473 : impl std::ops::Deref for TenantAttachConfig {
474 : type Target = TenantConfig;
475 :
476 0 : fn deref(&self) -> &Self::Target {
477 0 : &self.allowing_unknown_fields
478 0 : }
479 : }
480 :
481 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
482 4 : #[derive(Serialize, Deserialize, Clone)]
483 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
484 : pub enum TenantAttachmentStatus {
485 : Maybe,
486 : Attached,
487 : Failed { reason: String },
488 : }
489 :
490 4 : #[derive(Serialize, Deserialize, Clone)]
491 : pub struct TenantInfo {
492 : pub id: TenantShardId,
493 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
494 : pub state: TenantState,
495 : /// Sum of the size of all layer files.
496 : /// If a layer is present in both local FS and S3, it counts only once.
497 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
498 : pub attachment_status: TenantAttachmentStatus,
499 : #[serde(skip_serializing_if = "Option::is_none")]
500 : pub generation: Option<u32>,
501 : }
502 :
503 0 : #[derive(Serialize, Deserialize, Clone)]
504 : pub struct TenantDetails {
505 : #[serde(flatten)]
506 : pub tenant_info: TenantInfo,
507 :
508 : pub walredo: Option<WalRedoManagerStatus>,
509 :
510 : pub timelines: Vec<TimelineId>,
511 : }
512 :
513 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
514 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
515 : pub struct TimelineInfo {
516 : pub tenant_id: TenantShardId,
517 : pub timeline_id: TimelineId,
518 :
519 : pub ancestor_timeline_id: Option<TimelineId>,
520 : pub ancestor_lsn: Option<Lsn>,
521 : pub last_record_lsn: Lsn,
522 : pub prev_record_lsn: Option<Lsn>,
523 : pub latest_gc_cutoff_lsn: Lsn,
524 : pub disk_consistent_lsn: Lsn,
525 :
526 : /// The LSN that we have succesfully uploaded to remote storage
527 : pub remote_consistent_lsn: Lsn,
528 :
529 : /// The LSN that we are advertizing to safekeepers
530 : pub remote_consistent_lsn_visible: Lsn,
531 :
532 : /// The LSN from the start of the root timeline (never changes)
533 : pub initdb_lsn: Lsn,
534 :
535 : pub current_logical_size: u64,
536 : pub current_logical_size_is_accurate: bool,
537 :
538 : pub directory_entries_counts: Vec<u64>,
539 :
540 : /// Sum of the size of all layer files.
541 : /// If a layer is present in both local FS and S3, it counts only once.
542 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
543 : pub current_logical_size_non_incremental: Option<u64>,
544 :
545 : pub timeline_dir_layer_file_size_sum: Option<u64>,
546 :
547 : pub wal_source_connstr: Option<String>,
548 : pub last_received_msg_lsn: Option<Lsn>,
549 : /// the timestamp (in microseconds) of the last received message
550 : pub last_received_msg_ts: Option<u128>,
551 : pub pg_version: u32,
552 :
553 : pub state: TimelineState,
554 :
555 : pub walreceiver_status: String,
556 : }
557 :
558 0 : #[derive(Debug, Clone, Serialize)]
559 : pub struct LayerMapInfo {
560 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
561 : pub historic_layers: Vec<HistoricLayerInfo>,
562 : }
563 :
564 252914 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
565 : #[repr(usize)]
566 : pub enum LayerAccessKind {
567 : GetValueReconstructData,
568 : Iter,
569 : KeyIter,
570 : Dump,
571 : }
572 :
573 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
574 : pub struct LayerAccessStatFullDetails {
575 : pub when_millis_since_epoch: u64,
576 : pub task_kind: &'static str,
577 : pub access_kind: LayerAccessKind,
578 : }
579 :
580 : /// An event that impacts the layer's residence status.
581 : #[serde_as]
582 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
583 : pub struct LayerResidenceEvent {
584 : /// The time when the event occurred.
585 : /// NB: this timestamp is captured while the residence status changes.
586 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
587 : ///
588 : #[serde(rename = "timestamp_millis_since_epoch")]
589 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
590 : pub timestamp: SystemTime,
591 : /// The new residence status of the layer.
592 : pub status: LayerResidenceStatus,
593 : /// The reason why we had to record this event.
594 : pub reason: LayerResidenceEventReason,
595 : }
596 :
597 : /// The reason for recording a given [`LayerResidenceEvent`].
598 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
599 : pub enum LayerResidenceEventReason {
600 : /// The layer map is being populated, e.g. during timeline load or attach.
601 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
602 : /// We need to record such events because there is no persistent storage for the events.
603 : ///
604 : // https://github.com/rust-lang/rust/issues/74481
605 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
606 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
607 : LayerLoad,
608 : /// We just created the layer (e.g., freeze_and_flush or compaction).
609 : /// Such layers are always [`LayerResidenceStatus::Resident`].
610 : LayerCreate,
611 : /// We on-demand downloaded or evicted the given layer.
612 : ResidenceChange,
613 : }
614 :
615 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
616 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
617 : pub enum LayerResidenceStatus {
618 : /// Residence status for a layer file that exists locally.
619 : /// It may also exist on the remote, we don't care here.
620 : Resident,
621 : /// Residence status for a layer file that only exists on the remote.
622 : Evicted,
623 : }
624 :
625 : impl LayerResidenceEvent {
626 1152 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
627 1152 : Self {
628 1152 : status,
629 1152 : reason,
630 1152 : timestamp: SystemTime::now(),
631 1152 : }
632 1152 : }
633 : }
634 :
635 0 : #[derive(Debug, Clone, Serialize)]
636 : pub struct LayerAccessStats {
637 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
638 : pub task_kind_access_flag: Vec<&'static str>,
639 : pub first: Option<LayerAccessStatFullDetails>,
640 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
641 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
642 : }
643 :
644 0 : #[derive(Debug, Clone, Serialize)]
645 : #[serde(tag = "kind")]
646 : pub enum InMemoryLayerInfo {
647 : Open { lsn_start: Lsn },
648 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
649 : }
650 :
651 0 : #[derive(Debug, Clone, Serialize)]
652 : #[serde(tag = "kind")]
653 : pub enum HistoricLayerInfo {
654 : Delta {
655 : layer_file_name: String,
656 : layer_file_size: u64,
657 :
658 : lsn_start: Lsn,
659 : lsn_end: Lsn,
660 : remote: bool,
661 : access_stats: LayerAccessStats,
662 : },
663 : Image {
664 : layer_file_name: String,
665 : layer_file_size: u64,
666 :
667 : lsn_start: Lsn,
668 : remote: bool,
669 : access_stats: LayerAccessStats,
670 : },
671 : }
672 :
673 0 : #[derive(Debug, Serialize, Deserialize)]
674 : pub struct DownloadRemoteLayersTaskSpawnRequest {
675 : pub max_concurrent_downloads: NonZeroUsize,
676 : }
677 :
678 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
679 : pub struct DownloadRemoteLayersTaskInfo {
680 : pub task_id: String,
681 : pub state: DownloadRemoteLayersTaskState,
682 : pub total_layer_count: u64, // stable once `completed`
683 : pub successful_download_count: u64, // stable once `completed`
684 : pub failed_download_count: u64, // stable once `completed`
685 : }
686 :
687 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
688 : pub enum DownloadRemoteLayersTaskState {
689 : Running,
690 : Completed,
691 : ShutDown,
692 : }
693 :
694 0 : #[derive(Debug, Serialize, Deserialize)]
695 : pub struct TimelineGcRequest {
696 : pub gc_horizon: Option<u64>,
697 : }
698 :
699 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
700 : pub struct WalRedoManagerStatus {
701 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
702 : pub pid: Option<u32>,
703 : }
704 :
705 : pub mod virtual_file {
706 : #[derive(
707 : Copy,
708 0 : Clone,
709 4 : PartialEq,
710 : Eq,
711 0 : Hash,
712 220 : strum_macros::EnumString,
713 0 : strum_macros::Display,
714 0 : serde_with::DeserializeFromStr,
715 0 : serde_with::SerializeDisplay,
716 0 : Debug,
717 : )]
718 : #[strum(serialize_all = "kebab-case")]
719 : pub enum IoEngineKind {
720 : StdFs,
721 : #[cfg(target_os = "linux")]
722 : TokioEpollUring,
723 : }
724 : }
725 :
726 : // Wrapped in libpq CopyData
727 8 : #[derive(PartialEq, Eq, Debug)]
728 : pub enum PagestreamFeMessage {
729 : Exists(PagestreamExistsRequest),
730 : Nblocks(PagestreamNblocksRequest),
731 : GetPage(PagestreamGetPageRequest),
732 : DbSize(PagestreamDbSizeRequest),
733 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
734 : }
735 :
736 : // Wrapped in libpq CopyData
737 0 : #[derive(strum_macros::EnumProperty)]
738 : pub enum PagestreamBeMessage {
739 : Exists(PagestreamExistsResponse),
740 : Nblocks(PagestreamNblocksResponse),
741 : GetPage(PagestreamGetPageResponse),
742 : Error(PagestreamErrorResponse),
743 : DbSize(PagestreamDbSizeResponse),
744 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
745 : }
746 :
747 : // Keep in sync with `pagestore_client.h`
748 : #[repr(u8)]
749 : enum PagestreamBeMessageTag {
750 : Exists = 100,
751 : Nblocks = 101,
752 : GetPage = 102,
753 : Error = 103,
754 : DbSize = 104,
755 : GetSlruSegment = 105,
756 : }
757 : impl TryFrom<u8> for PagestreamBeMessageTag {
758 : type Error = u8;
759 0 : fn try_from(value: u8) -> Result<Self, u8> {
760 0 : match value {
761 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
762 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
763 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
764 0 : 103 => Ok(PagestreamBeMessageTag::Error),
765 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
766 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
767 0 : _ => Err(value),
768 : }
769 0 : }
770 : }
771 :
772 2 : #[derive(Debug, PartialEq, Eq)]
773 : pub struct PagestreamExistsRequest {
774 : pub latest: bool,
775 : pub lsn: Lsn,
776 : pub rel: RelTag,
777 : }
778 :
779 2 : #[derive(Debug, PartialEq, Eq)]
780 : pub struct PagestreamNblocksRequest {
781 : pub latest: bool,
782 : pub lsn: Lsn,
783 : pub rel: RelTag,
784 : }
785 :
786 2 : #[derive(Debug, PartialEq, Eq)]
787 : pub struct PagestreamGetPageRequest {
788 : pub latest: bool,
789 : pub lsn: Lsn,
790 : pub rel: RelTag,
791 : pub blkno: u32,
792 : }
793 :
794 2 : #[derive(Debug, PartialEq, Eq)]
795 : pub struct PagestreamDbSizeRequest {
796 : pub latest: bool,
797 : pub lsn: Lsn,
798 : pub dbnode: u32,
799 : }
800 :
801 0 : #[derive(Debug, PartialEq, Eq)]
802 : pub struct PagestreamGetSlruSegmentRequest {
803 : pub latest: bool,
804 : pub lsn: Lsn,
805 : pub kind: u8,
806 : pub segno: u32,
807 : }
808 :
809 0 : #[derive(Debug)]
810 : pub struct PagestreamExistsResponse {
811 : pub exists: bool,
812 : }
813 :
814 0 : #[derive(Debug)]
815 : pub struct PagestreamNblocksResponse {
816 : pub n_blocks: u32,
817 : }
818 :
819 0 : #[derive(Debug)]
820 : pub struct PagestreamGetPageResponse {
821 : pub page: Bytes,
822 : }
823 :
824 0 : #[derive(Debug)]
825 : pub struct PagestreamGetSlruSegmentResponse {
826 : pub segment: Bytes,
827 : }
828 :
829 0 : #[derive(Debug)]
830 : pub struct PagestreamErrorResponse {
831 : pub message: String,
832 : }
833 :
834 0 : #[derive(Debug)]
835 : pub struct PagestreamDbSizeResponse {
836 : pub db_size: i64,
837 : }
838 :
839 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
840 : // that require pageserver-internal types. It is sufficient to get the total size.
841 0 : #[derive(Serialize, Deserialize, Debug)]
842 : pub struct TenantHistorySize {
843 : pub id: TenantId,
844 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
845 : ///
846 : /// Will be none if `?inputs_only=true` was given.
847 : pub size: Option<u64>,
848 : }
849 :
850 : impl PagestreamFeMessage {
851 8 : pub fn serialize(&self) -> Bytes {
852 8 : let mut bytes = BytesMut::new();
853 8 :
854 8 : match self {
855 2 : Self::Exists(req) => {
856 2 : bytes.put_u8(0);
857 2 : bytes.put_u8(u8::from(req.latest));
858 2 : bytes.put_u64(req.lsn.0);
859 2 : bytes.put_u32(req.rel.spcnode);
860 2 : bytes.put_u32(req.rel.dbnode);
861 2 : bytes.put_u32(req.rel.relnode);
862 2 : bytes.put_u8(req.rel.forknum);
863 2 : }
864 :
865 2 : Self::Nblocks(req) => {
866 2 : bytes.put_u8(1);
867 2 : bytes.put_u8(u8::from(req.latest));
868 2 : bytes.put_u64(req.lsn.0);
869 2 : bytes.put_u32(req.rel.spcnode);
870 2 : bytes.put_u32(req.rel.dbnode);
871 2 : bytes.put_u32(req.rel.relnode);
872 2 : bytes.put_u8(req.rel.forknum);
873 2 : }
874 :
875 2 : Self::GetPage(req) => {
876 2 : bytes.put_u8(2);
877 2 : bytes.put_u8(u8::from(req.latest));
878 2 : bytes.put_u64(req.lsn.0);
879 2 : bytes.put_u32(req.rel.spcnode);
880 2 : bytes.put_u32(req.rel.dbnode);
881 2 : bytes.put_u32(req.rel.relnode);
882 2 : bytes.put_u8(req.rel.forknum);
883 2 : bytes.put_u32(req.blkno);
884 2 : }
885 :
886 2 : Self::DbSize(req) => {
887 2 : bytes.put_u8(3);
888 2 : bytes.put_u8(u8::from(req.latest));
889 2 : bytes.put_u64(req.lsn.0);
890 2 : bytes.put_u32(req.dbnode);
891 2 : }
892 :
893 0 : Self::GetSlruSegment(req) => {
894 0 : bytes.put_u8(4);
895 0 : bytes.put_u8(u8::from(req.latest));
896 0 : bytes.put_u64(req.lsn.0);
897 0 : bytes.put_u8(req.kind);
898 0 : bytes.put_u32(req.segno);
899 0 : }
900 : }
901 :
902 8 : bytes.into()
903 8 : }
904 :
905 8 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
906 : // TODO these gets can fail
907 :
908 : // these correspond to the NeonMessageTag enum in pagestore_client.h
909 : //
910 : // TODO: consider using protobuf or serde bincode for less error prone
911 : // serialization.
912 8 : let msg_tag = body.read_u8()?;
913 8 : match msg_tag {
914 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
915 2 : latest: body.read_u8()? != 0,
916 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
917 : rel: RelTag {
918 2 : spcnode: body.read_u32::<BigEndian>()?,
919 2 : dbnode: body.read_u32::<BigEndian>()?,
920 2 : relnode: body.read_u32::<BigEndian>()?,
921 2 : forknum: body.read_u8()?,
922 : },
923 : })),
924 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
925 2 : latest: body.read_u8()? != 0,
926 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
927 : rel: RelTag {
928 2 : spcnode: body.read_u32::<BigEndian>()?,
929 2 : dbnode: body.read_u32::<BigEndian>()?,
930 2 : relnode: body.read_u32::<BigEndian>()?,
931 2 : forknum: body.read_u8()?,
932 : },
933 : })),
934 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
935 2 : latest: body.read_u8()? != 0,
936 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
937 : rel: RelTag {
938 2 : spcnode: body.read_u32::<BigEndian>()?,
939 2 : dbnode: body.read_u32::<BigEndian>()?,
940 2 : relnode: body.read_u32::<BigEndian>()?,
941 2 : forknum: body.read_u8()?,
942 : },
943 2 : blkno: body.read_u32::<BigEndian>()?,
944 : })),
945 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
946 2 : latest: body.read_u8()? != 0,
947 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
948 2 : dbnode: body.read_u32::<BigEndian>()?,
949 : })),
950 : 4 => Ok(PagestreamFeMessage::GetSlruSegment(
951 : PagestreamGetSlruSegmentRequest {
952 0 : latest: body.read_u8()? != 0,
953 0 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
954 0 : kind: body.read_u8()?,
955 0 : segno: body.read_u32::<BigEndian>()?,
956 : },
957 : )),
958 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
959 : }
960 8 : }
961 : }
962 :
963 : impl PagestreamBeMessage {
964 0 : pub fn serialize(&self) -> Bytes {
965 0 : let mut bytes = BytesMut::new();
966 0 :
967 0 : use PagestreamBeMessageTag as Tag;
968 0 : match self {
969 0 : Self::Exists(resp) => {
970 0 : bytes.put_u8(Tag::Exists as u8);
971 0 : bytes.put_u8(resp.exists as u8);
972 0 : }
973 :
974 0 : Self::Nblocks(resp) => {
975 0 : bytes.put_u8(Tag::Nblocks as u8);
976 0 : bytes.put_u32(resp.n_blocks);
977 0 : }
978 :
979 0 : Self::GetPage(resp) => {
980 0 : bytes.put_u8(Tag::GetPage as u8);
981 0 : bytes.put(&resp.page[..]);
982 0 : }
983 :
984 0 : Self::Error(resp) => {
985 0 : bytes.put_u8(Tag::Error as u8);
986 0 : bytes.put(resp.message.as_bytes());
987 0 : bytes.put_u8(0); // null terminator
988 0 : }
989 0 : Self::DbSize(resp) => {
990 0 : bytes.put_u8(Tag::DbSize as u8);
991 0 : bytes.put_i64(resp.db_size);
992 0 : }
993 :
994 0 : Self::GetSlruSegment(resp) => {
995 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
996 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
997 0 : bytes.put(&resp.segment[..]);
998 0 : }
999 : }
1000 :
1001 0 : bytes.into()
1002 0 : }
1003 :
1004 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
1005 0 : let mut buf = buf.reader();
1006 0 : let msg_tag = buf.read_u8()?;
1007 :
1008 : use PagestreamBeMessageTag as Tag;
1009 0 : let ok =
1010 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
1011 : Tag::Exists => {
1012 0 : let exists = buf.read_u8()?;
1013 0 : Self::Exists(PagestreamExistsResponse {
1014 0 : exists: exists != 0,
1015 0 : })
1016 : }
1017 : Tag::Nblocks => {
1018 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1019 0 : Self::Nblocks(PagestreamNblocksResponse { n_blocks })
1020 : }
1021 : Tag::GetPage => {
1022 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
1023 0 : buf.read_exact(&mut page)?;
1024 0 : PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
1025 : }
1026 : Tag::Error => {
1027 0 : let mut msg = Vec::new();
1028 0 : buf.read_until(0, &mut msg)?;
1029 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
1030 0 : let rust_str = cstring.to_str()?;
1031 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1032 0 : message: rust_str.to_owned(),
1033 0 : })
1034 : }
1035 : Tag::DbSize => {
1036 0 : let db_size = buf.read_i64::<BigEndian>()?;
1037 0 : Self::DbSize(PagestreamDbSizeResponse { db_size })
1038 : }
1039 : Tag::GetSlruSegment => {
1040 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1041 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
1042 0 : buf.read_exact(&mut segment)?;
1043 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
1044 0 : segment: segment.into(),
1045 0 : })
1046 : }
1047 : };
1048 0 : let remaining = buf.into_inner();
1049 0 : if !remaining.is_empty() {
1050 0 : anyhow::bail!(
1051 0 : "remaining bytes in msg with tag={msg_tag}: {}",
1052 0 : remaining.len()
1053 0 : );
1054 0 : }
1055 0 : Ok(ok)
1056 0 : }
1057 :
1058 0 : pub fn kind(&self) -> &'static str {
1059 0 : match self {
1060 0 : Self::Exists(_) => "Exists",
1061 0 : Self::Nblocks(_) => "Nblocks",
1062 0 : Self::GetPage(_) => "GetPage",
1063 0 : Self::Error(_) => "Error",
1064 0 : Self::DbSize(_) => "DbSize",
1065 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
1066 : }
1067 0 : }
1068 : }
1069 :
1070 : #[cfg(test)]
1071 : mod tests {
1072 : use bytes::Buf;
1073 : use serde_json::json;
1074 :
1075 : use super::*;
1076 :
1077 2 : #[test]
1078 2 : fn test_pagestream() {
1079 2 : // Test serialization/deserialization of PagestreamFeMessage
1080 2 : let messages = vec![
1081 2 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
1082 2 : latest: true,
1083 2 : lsn: Lsn(4),
1084 2 : rel: RelTag {
1085 2 : forknum: 1,
1086 2 : spcnode: 2,
1087 2 : dbnode: 3,
1088 2 : relnode: 4,
1089 2 : },
1090 2 : }),
1091 2 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1092 2 : latest: false,
1093 2 : lsn: Lsn(4),
1094 2 : rel: RelTag {
1095 2 : forknum: 1,
1096 2 : spcnode: 2,
1097 2 : dbnode: 3,
1098 2 : relnode: 4,
1099 2 : },
1100 2 : }),
1101 2 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1102 2 : latest: true,
1103 2 : lsn: Lsn(4),
1104 2 : rel: RelTag {
1105 2 : forknum: 1,
1106 2 : spcnode: 2,
1107 2 : dbnode: 3,
1108 2 : relnode: 4,
1109 2 : },
1110 2 : blkno: 7,
1111 2 : }),
1112 2 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1113 2 : latest: true,
1114 2 : lsn: Lsn(4),
1115 2 : dbnode: 7,
1116 2 : }),
1117 2 : ];
1118 10 : for msg in messages {
1119 8 : let bytes = msg.serialize();
1120 8 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
1121 8 : assert!(msg == reconstructed);
1122 : }
1123 2 : }
1124 :
1125 2 : #[test]
1126 2 : fn test_tenantinfo_serde() {
1127 2 : // Test serialization/deserialization of TenantInfo
1128 2 : let original_active = TenantInfo {
1129 2 : id: TenantShardId::unsharded(TenantId::generate()),
1130 2 : state: TenantState::Active,
1131 2 : current_physical_size: Some(42),
1132 2 : attachment_status: TenantAttachmentStatus::Attached,
1133 2 : generation: None,
1134 2 : };
1135 2 : let expected_active = json!({
1136 2 : "id": original_active.id.to_string(),
1137 2 : "state": {
1138 2 : "slug": "Active",
1139 2 : },
1140 2 : "current_physical_size": 42,
1141 2 : "attachment_status": {
1142 2 : "slug":"attached",
1143 2 : }
1144 2 : });
1145 2 :
1146 2 : let original_broken = TenantInfo {
1147 2 : id: TenantShardId::unsharded(TenantId::generate()),
1148 2 : state: TenantState::Broken {
1149 2 : reason: "reason".into(),
1150 2 : backtrace: "backtrace info".into(),
1151 2 : },
1152 2 : current_physical_size: Some(42),
1153 2 : attachment_status: TenantAttachmentStatus::Attached,
1154 2 : generation: None,
1155 2 : };
1156 2 : let expected_broken = json!({
1157 2 : "id": original_broken.id.to_string(),
1158 2 : "state": {
1159 2 : "slug": "Broken",
1160 2 : "data": {
1161 2 : "backtrace": "backtrace info",
1162 2 : "reason": "reason",
1163 2 : }
1164 2 : },
1165 2 : "current_physical_size": 42,
1166 2 : "attachment_status": {
1167 2 : "slug":"attached",
1168 2 : }
1169 2 : });
1170 2 :
1171 2 : assert_eq!(
1172 2 : serde_json::to_value(&original_active).unwrap(),
1173 2 : expected_active
1174 2 : );
1175 :
1176 2 : assert_eq!(
1177 2 : serde_json::to_value(&original_broken).unwrap(),
1178 2 : expected_broken
1179 2 : );
1180 2 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
1181 2 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
1182 2 : }
1183 :
1184 2 : #[test]
1185 2 : fn test_reject_unknown_field() {
1186 2 : let id = TenantId::generate();
1187 2 : let create_request = json!({
1188 2 : "new_tenant_id": id.to_string(),
1189 2 : "unknown_field": "unknown_value".to_string(),
1190 2 : });
1191 2 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
1192 2 : assert!(
1193 2 : err.to_string().contains("unknown field `unknown_field`"),
1194 0 : "expect unknown field `unknown_field` error, got: {}",
1195 : err
1196 : );
1197 :
1198 2 : let id = TenantId::generate();
1199 2 : let config_request = json!({
1200 2 : "tenant_id": id.to_string(),
1201 2 : "unknown_field": "unknown_value".to_string(),
1202 2 : });
1203 2 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
1204 2 : assert!(
1205 2 : err.to_string().contains("unknown field `unknown_field`"),
1206 0 : "expect unknown field `unknown_field` error, got: {}",
1207 : err
1208 : );
1209 :
1210 2 : let attach_request = json!({
1211 2 : "config": {
1212 2 : "unknown_field": "unknown_value".to_string(),
1213 2 : },
1214 2 : });
1215 2 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
1216 2 : assert!(
1217 2 : err.to_string().contains("unknown field `unknown_field`"),
1218 0 : "expect unknown field `unknown_field` error, got: {}",
1219 : err
1220 : );
1221 2 : }
1222 :
1223 2 : #[test]
1224 2 : fn tenantstatus_activating_serde() {
1225 2 : let states = [
1226 2 : TenantState::Activating(ActivatingFrom::Loading),
1227 2 : TenantState::Activating(ActivatingFrom::Attaching),
1228 2 : ];
1229 2 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
1230 2 :
1231 2 : let actual = serde_json::to_string(&states).unwrap();
1232 2 :
1233 2 : assert_eq!(actual, expected);
1234 :
1235 2 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
1236 2 :
1237 2 : assert_eq!(states.as_slice(), &parsed);
1238 2 : }
1239 :
1240 2 : #[test]
1241 2 : fn tenantstatus_activating_strum() {
1242 2 : // tests added, because we use these for metrics
1243 2 : let examples = [
1244 2 : (line!(), TenantState::Loading, "Loading"),
1245 2 : (line!(), TenantState::Attaching, "Attaching"),
1246 2 : (
1247 2 : line!(),
1248 2 : TenantState::Activating(ActivatingFrom::Loading),
1249 2 : "Activating",
1250 2 : ),
1251 2 : (
1252 2 : line!(),
1253 2 : TenantState::Activating(ActivatingFrom::Attaching),
1254 2 : "Activating",
1255 2 : ),
1256 2 : (line!(), TenantState::Active, "Active"),
1257 2 : (
1258 2 : line!(),
1259 2 : TenantState::Stopping {
1260 2 : progress: utils::completion::Barrier::default(),
1261 2 : },
1262 2 : "Stopping",
1263 2 : ),
1264 2 : (
1265 2 : line!(),
1266 2 : TenantState::Broken {
1267 2 : reason: "Example".into(),
1268 2 : backtrace: "Looooong backtrace".into(),
1269 2 : },
1270 2 : "Broken",
1271 2 : ),
1272 2 : ];
1273 :
1274 16 : for (line, rendered, expected) in examples {
1275 14 : let actual: &'static str = rendered.into();
1276 14 : assert_eq!(actual, expected, "example on {line}");
1277 : }
1278 2 : }
1279 : }
|