Line data Source code
1 : pub mod partitioning;
2 : pub mod utilization;
3 :
4 : pub use utilization::PageserverUtilization;
5 :
6 : use std::{
7 : collections::HashMap,
8 : io::{BufRead, Read},
9 : num::{NonZeroU64, NonZeroUsize},
10 : time::{Duration, SystemTime},
11 : };
12 :
13 : use byteorder::{BigEndian, ReadBytesExt};
14 : use postgres_ffi::BLCKSZ;
15 : use serde::{Deserialize, Serialize};
16 : use serde_with::serde_as;
17 : use strum_macros;
18 : use utils::{
19 : completion,
20 : history_buffer::HistoryBufferWithDropCounter,
21 : id::{NodeId, TenantId, TimelineId},
22 : lsn::Lsn,
23 : };
24 :
25 : use crate::{
26 : reltag::RelTag,
27 : shard::{ShardCount, ShardStripeSize, TenantShardId},
28 : };
29 : use anyhow::bail;
30 : use bytes::{Buf, BufMut, Bytes, BytesMut};
31 :
32 : /// The state of a tenant in this pageserver.
33 : ///
34 : /// ```mermaid
35 : /// stateDiagram-v2
36 : ///
37 : /// [*] --> Loading: spawn_load()
38 : /// [*] --> Attaching: spawn_attach()
39 : ///
40 : /// Loading --> Activating: activate()
41 : /// Attaching --> Activating: activate()
42 : /// Activating --> Active: infallible
43 : ///
44 : /// Loading --> Broken: load() failure
45 : /// Attaching --> Broken: attach() failure
46 : ///
47 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
48 : /// Stopping --> Broken: late error in remove_tenant_from_memory
49 : ///
50 : /// Broken --> [*]: ignore / detach / shutdown
51 : /// Stopping --> [*]: remove_from_memory complete
52 : ///
53 : /// Active --> Broken: cfg(testing)-only tenant break point
54 : /// ```
55 : #[derive(
56 102 : Clone,
57 92 : PartialEq,
58 : Eq,
59 10 : serde::Serialize,
60 12 : serde::Deserialize,
61 0 : strum_macros::Display,
62 : strum_macros::EnumVariantNames,
63 0 : strum_macros::AsRefStr,
64 103 : strum_macros::IntoStaticStr,
65 : )]
66 : #[serde(tag = "slug", content = "data")]
67 : pub enum TenantState {
68 : /// This tenant is being loaded from local disk.
69 : ///
70 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
71 : Loading,
72 : /// This tenant is being attached to the pageserver.
73 : ///
74 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
75 : Attaching,
76 : /// The tenant is transitioning from Loading/Attaching to Active.
77 : ///
78 : /// While in this state, the individual timelines are being activated.
79 : ///
80 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
81 : Activating(ActivatingFrom),
82 : /// The tenant has finished activating and is open for business.
83 : ///
84 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
85 : Active,
86 : /// The tenant is recognized by pageserver, but it is being detached or the
87 : /// system is being shut down.
88 : ///
89 : /// Transitions out of this state are possible through `set_broken()`.
90 : Stopping {
91 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
92 : // otherwise it will not be skipped during deserialization
93 : #[serde(skip)]
94 : progress: completion::Barrier,
95 : },
96 : /// The tenant is recognized by the pageserver, but can no longer be used for
97 : /// any operations.
98 : ///
99 : /// If the tenant fails to load or attach, it will transition to this state
100 : /// and it is guaranteed that no background tasks are running in its name.
101 : ///
102 : /// The other way to transition into this state is from `Stopping` state
103 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
104 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
105 : Broken { reason: String, backtrace: String },
106 : }
107 :
108 : impl TenantState {
109 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
110 : use TenantAttachmentStatus::*;
111 :
112 : // Below TenantState::Activating is used as "transient" or "transparent" state for
113 : // attachment_status determining.
114 0 : match self {
115 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
116 : // So, technically, we can return Attached here.
117 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
118 : // But, our attach task might still be fetching the remote timelines, etc.
119 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
120 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
121 : // tenant mgr startup distinguishes attaching from loading via marker file.
122 0 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
123 : // We only reach Active after successful load / attach.
124 : // So, call atttachment status Attached.
125 0 : Self::Active => Attached,
126 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
127 : // However, it also becomes Broken if the regular load fails.
128 : // From Console's perspective there's no practical difference
129 : // because attachment_status is polled by console only during attach operation execution.
130 0 : Self::Broken { reason, .. } => Failed {
131 0 : reason: reason.to_owned(),
132 0 : },
133 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
134 : // we set the Stopping state irrespective of whether the tenant
135 : // has finished attaching or not.
136 0 : Self::Stopping { .. } => Maybe,
137 : }
138 0 : }
139 :
140 0 : pub fn broken_from_reason(reason: String) -> Self {
141 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
142 0 : Self::Broken {
143 0 : reason,
144 0 : backtrace: backtrace_str,
145 0 : }
146 0 : }
147 : }
148 :
149 : impl std::fmt::Debug for TenantState {
150 4 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 4 : match self {
152 4 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
153 4 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
154 : }
155 0 : _ => write!(f, "{self}"),
156 : }
157 4 : }
158 : }
159 :
160 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
161 8 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
162 : pub enum ActivatingFrom {
163 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
164 : Loading,
165 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
166 : Attaching,
167 : }
168 :
169 : /// A state of a timeline in pageserver's memory.
170 454866 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
171 : pub enum TimelineState {
172 : /// The timeline is recognized by the pageserver but is not yet operational.
173 : /// In particular, the walreceiver connection loop is not running for this timeline.
174 : /// It will eventually transition to state Active or Broken.
175 : Loading,
176 : /// The timeline is fully operational.
177 : /// It can be queried, and the walreceiver connection loop is running.
178 : Active,
179 : /// The timeline was previously Loading or Active but is shutting down.
180 : /// It cannot transition back into any other state.
181 : Stopping,
182 : /// The timeline is broken and not operational (previous states: Loading or Active).
183 : Broken { reason: String, backtrace: String },
184 : }
185 :
186 0 : #[derive(Serialize, Deserialize, Clone)]
187 : pub struct TimelineCreateRequest {
188 : pub new_timeline_id: TimelineId,
189 : #[serde(default)]
190 : pub ancestor_timeline_id: Option<TimelineId>,
191 : #[serde(default)]
192 : pub existing_initdb_timeline_id: Option<TimelineId>,
193 : #[serde(default)]
194 : pub ancestor_start_lsn: Option<Lsn>,
195 : pub pg_version: Option<u32>,
196 : }
197 :
198 0 : #[derive(Serialize, Deserialize)]
199 : pub struct TenantShardSplitRequest {
200 : pub new_shard_count: u8,
201 : }
202 :
203 0 : #[derive(Serialize, Deserialize)]
204 : pub struct TenantShardSplitResponse {
205 : pub new_shards: Vec<TenantShardId>,
206 : }
207 :
208 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
209 0 : #[derive(Serialize, Deserialize, Debug)]
210 : #[serde(deny_unknown_fields)]
211 : pub struct ShardParameters {
212 : pub count: ShardCount,
213 : pub stripe_size: ShardStripeSize,
214 : }
215 :
216 : impl ShardParameters {
217 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
218 :
219 0 : pub fn is_unsharded(&self) -> bool {
220 0 : self.count.is_unsharded()
221 0 : }
222 : }
223 :
224 : impl Default for ShardParameters {
225 90 : fn default() -> Self {
226 90 : Self {
227 90 : count: ShardCount::new(0),
228 90 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
229 90 : }
230 90 : }
231 : }
232 :
233 8 : #[derive(Serialize, Deserialize, Debug)]
234 : #[serde(deny_unknown_fields)]
235 : pub struct TenantCreateRequest {
236 : pub new_tenant_id: TenantShardId,
237 : #[serde(default)]
238 : #[serde(skip_serializing_if = "Option::is_none")]
239 : pub generation: Option<u32>,
240 :
241 : // If omitted, create a single shard with TenantShardId::unsharded()
242 : #[serde(default)]
243 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
244 : pub shard_parameters: ShardParameters,
245 :
246 : #[serde(flatten)]
247 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
248 : }
249 :
250 0 : #[derive(Deserialize, Debug)]
251 : #[serde(deny_unknown_fields)]
252 : pub struct TenantLoadRequest {
253 : #[serde(default)]
254 : #[serde(skip_serializing_if = "Option::is_none")]
255 : pub generation: Option<u32>,
256 : }
257 :
258 : impl std::ops::Deref for TenantCreateRequest {
259 : type Target = TenantConfig;
260 :
261 0 : fn deref(&self) -> &Self::Target {
262 0 : &self.config
263 0 : }
264 : }
265 :
266 : /// An alternative representation of `pageserver::tenant::TenantConf` with
267 : /// simpler types.
268 6 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
269 : pub struct TenantConfig {
270 : pub checkpoint_distance: Option<u64>,
271 : pub checkpoint_timeout: Option<String>,
272 : pub compaction_target_size: Option<u64>,
273 : pub compaction_period: Option<String>,
274 : pub compaction_threshold: Option<usize>,
275 : // defer parsing compaction_algorithm, like eviction_policy
276 : pub compaction_algorithm: Option<CompactionAlgorithm>,
277 : pub gc_horizon: Option<u64>,
278 : pub gc_period: Option<String>,
279 : pub image_creation_threshold: Option<usize>,
280 : pub pitr_interval: Option<String>,
281 : pub walreceiver_connect_timeout: Option<String>,
282 : pub lagging_wal_timeout: Option<String>,
283 : pub max_lsn_wal_lag: Option<NonZeroU64>,
284 : pub trace_read_requests: Option<bool>,
285 : pub eviction_policy: Option<EvictionPolicy>,
286 : pub min_resident_size_override: Option<u64>,
287 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
288 : pub heatmap_period: Option<String>,
289 : pub lazy_slru_download: Option<bool>,
290 : pub timeline_get_throttle: Option<ThrottleConfig>,
291 : }
292 :
293 2090 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
294 : #[serde(tag = "kind")]
295 : pub enum EvictionPolicy {
296 : NoEviction,
297 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
298 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
299 : }
300 :
301 : impl EvictionPolicy {
302 0 : pub fn discriminant_str(&self) -> &'static str {
303 0 : match self {
304 0 : EvictionPolicy::NoEviction => "NoEviction",
305 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
306 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
307 : }
308 0 : }
309 : }
310 :
311 2090 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
312 : #[serde(tag = "kind")]
313 : pub enum CompactionAlgorithm {
314 : Legacy,
315 : Tiered,
316 : }
317 :
318 28 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
319 : pub struct EvictionPolicyLayerAccessThreshold {
320 : #[serde(with = "humantime_serde")]
321 : pub period: Duration,
322 : #[serde(with = "humantime_serde")]
323 : pub threshold: Duration,
324 : }
325 :
326 2266 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
327 : pub struct ThrottleConfig {
328 : pub task_kinds: Vec<String>, // TaskKind
329 : pub initial: usize,
330 : #[serde(with = "humantime_serde")]
331 : pub refill_interval: Duration,
332 : pub refill_amount: NonZeroUsize,
333 : pub max: usize,
334 : pub fair: bool,
335 : }
336 :
337 : impl ThrottleConfig {
338 226 : pub fn disabled() -> Self {
339 226 : Self {
340 226 : task_kinds: vec![], // effectively disables the throttle
341 226 : // other values don't matter with emtpy `task_kinds`.
342 226 : initial: 0,
343 226 : refill_interval: Duration::from_millis(1),
344 226 : refill_amount: NonZeroUsize::new(1).unwrap(),
345 226 : max: 1,
346 226 : fair: true,
347 226 : }
348 226 : }
349 : /// The requests per second allowed by the given config.
350 0 : pub fn steady_rps(&self) -> f64 {
351 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
352 0 : }
353 : }
354 :
355 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
356 : /// lists out all possible states (and the virtual "Detached" state)
357 : /// in a flat form rather than using rust-style enums.
358 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
359 : pub enum LocationConfigMode {
360 : AttachedSingle,
361 : AttachedMulti,
362 : AttachedStale,
363 : Secondary,
364 : Detached,
365 : }
366 :
367 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
368 : pub struct LocationConfigSecondary {
369 : pub warm: bool,
370 : }
371 :
372 : /// An alternative representation of `pageserver::tenant::LocationConf`,
373 : /// for use in external-facing APIs.
374 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
375 : pub struct LocationConfig {
376 : pub mode: LocationConfigMode,
377 : /// If attaching, in what generation?
378 : #[serde(default)]
379 : pub generation: Option<u32>,
380 :
381 : // If requesting mode `Secondary`, configuration for that.
382 : #[serde(default)]
383 : pub secondary_conf: Option<LocationConfigSecondary>,
384 :
385 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
386 : // must be set accurately.
387 : #[serde(default)]
388 : pub shard_number: u8,
389 : #[serde(default)]
390 : pub shard_count: u8,
391 : #[serde(default)]
392 : pub shard_stripe_size: u32,
393 :
394 : // This configuration only affects attached mode, but should be provided irrespective
395 : // of the mode, as a secondary location might transition on startup if the response
396 : // to the `/re-attach` control plane API requests it.
397 : pub tenant_conf: TenantConfig,
398 : }
399 :
400 0 : #[derive(Serialize, Deserialize)]
401 : pub struct LocationConfigListResponse {
402 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
403 : }
404 :
405 0 : #[derive(Serialize, Deserialize)]
406 : #[serde(transparent)]
407 : pub struct TenantCreateResponse(pub TenantId);
408 :
409 0 : #[derive(Serialize)]
410 : pub struct StatusResponse {
411 : pub id: NodeId,
412 : }
413 :
414 0 : #[derive(Serialize, Deserialize, Debug)]
415 : #[serde(deny_unknown_fields)]
416 : pub struct TenantLocationConfigRequest {
417 : pub tenant_id: TenantShardId,
418 : #[serde(flatten)]
419 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
420 : }
421 :
422 0 : #[derive(Serialize, Deserialize, Debug)]
423 : #[serde(deny_unknown_fields)]
424 : pub struct TenantTimeTravelRequest {
425 : pub shard_counts: Vec<ShardCount>,
426 : }
427 :
428 0 : #[derive(Serialize, Deserialize, Debug)]
429 : #[serde(deny_unknown_fields)]
430 : pub struct TenantShardLocation {
431 : pub shard_id: TenantShardId,
432 : pub node_id: NodeId,
433 : }
434 :
435 0 : #[derive(Serialize, Deserialize, Debug)]
436 : #[serde(deny_unknown_fields)]
437 : pub struct TenantLocationConfigResponse {
438 : pub shards: Vec<TenantShardLocation>,
439 : }
440 :
441 8 : #[derive(Serialize, Deserialize, Debug)]
442 : #[serde(deny_unknown_fields)]
443 : pub struct TenantConfigRequest {
444 : pub tenant_id: TenantId,
445 : #[serde(flatten)]
446 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
447 : }
448 :
449 : impl std::ops::Deref for TenantConfigRequest {
450 : type Target = TenantConfig;
451 :
452 0 : fn deref(&self) -> &Self::Target {
453 0 : &self.config
454 0 : }
455 : }
456 :
457 : impl TenantConfigRequest {
458 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
459 0 : let config = TenantConfig::default();
460 0 : TenantConfigRequest { tenant_id, config }
461 0 : }
462 : }
463 :
464 8 : #[derive(Debug, Deserialize)]
465 : pub struct TenantAttachRequest {
466 : #[serde(default)]
467 : pub config: TenantAttachConfig,
468 : #[serde(default)]
469 : pub generation: Option<u32>,
470 : }
471 :
472 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
473 : /// its usage inside `TenantAttachRequest`.
474 2 : #[derive(Debug, Serialize, Deserialize, Default)]
475 : #[serde(deny_unknown_fields)]
476 : pub struct TenantAttachConfig {
477 : #[serde(flatten)]
478 : allowing_unknown_fields: TenantConfig,
479 : }
480 :
481 : impl std::ops::Deref for TenantAttachConfig {
482 : type Target = TenantConfig;
483 :
484 0 : fn deref(&self) -> &Self::Target {
485 0 : &self.allowing_unknown_fields
486 0 : }
487 : }
488 :
489 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
490 4 : #[derive(Serialize, Deserialize, Clone)]
491 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
492 : pub enum TenantAttachmentStatus {
493 : Maybe,
494 : Attached,
495 : Failed { reason: String },
496 : }
497 :
498 4 : #[derive(Serialize, Deserialize, Clone)]
499 : pub struct TenantInfo {
500 : pub id: TenantShardId,
501 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
502 : pub state: TenantState,
503 : /// Sum of the size of all layer files.
504 : /// If a layer is present in both local FS and S3, it counts only once.
505 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
506 : pub attachment_status: TenantAttachmentStatus,
507 : #[serde(skip_serializing_if = "Option::is_none")]
508 : pub generation: Option<u32>,
509 : }
510 :
511 0 : #[derive(Serialize, Deserialize, Clone)]
512 : pub struct TenantDetails {
513 : #[serde(flatten)]
514 : pub tenant_info: TenantInfo,
515 :
516 : pub walredo: Option<WalRedoManagerStatus>,
517 :
518 : pub timelines: Vec<TimelineId>,
519 : }
520 :
521 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
522 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
523 : pub struct TimelineInfo {
524 : pub tenant_id: TenantShardId,
525 : pub timeline_id: TimelineId,
526 :
527 : pub ancestor_timeline_id: Option<TimelineId>,
528 : pub ancestor_lsn: Option<Lsn>,
529 : pub last_record_lsn: Lsn,
530 : pub prev_record_lsn: Option<Lsn>,
531 : pub latest_gc_cutoff_lsn: Lsn,
532 : pub disk_consistent_lsn: Lsn,
533 :
534 : /// The LSN that we have succesfully uploaded to remote storage
535 : pub remote_consistent_lsn: Lsn,
536 :
537 : /// The LSN that we are advertizing to safekeepers
538 : pub remote_consistent_lsn_visible: Lsn,
539 :
540 : /// The LSN from the start of the root timeline (never changes)
541 : pub initdb_lsn: Lsn,
542 :
543 : pub current_logical_size: u64,
544 : pub current_logical_size_is_accurate: bool,
545 :
546 : pub directory_entries_counts: Vec<u64>,
547 :
548 : /// Sum of the size of all layer files.
549 : /// If a layer is present in both local FS and S3, it counts only once.
550 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
551 : pub current_logical_size_non_incremental: Option<u64>,
552 :
553 : pub timeline_dir_layer_file_size_sum: Option<u64>,
554 :
555 : pub wal_source_connstr: Option<String>,
556 : pub last_received_msg_lsn: Option<Lsn>,
557 : /// the timestamp (in microseconds) of the last received message
558 : pub last_received_msg_ts: Option<u128>,
559 : pub pg_version: u32,
560 :
561 : pub state: TimelineState,
562 :
563 : pub walreceiver_status: String,
564 : }
565 :
566 0 : #[derive(Debug, Clone, Serialize)]
567 : pub struct LayerMapInfo {
568 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
569 : pub historic_layers: Vec<HistoricLayerInfo>,
570 : }
571 :
572 253300 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
573 : #[repr(usize)]
574 : pub enum LayerAccessKind {
575 : GetValueReconstructData,
576 : Iter,
577 : KeyIter,
578 : Dump,
579 : }
580 :
581 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
582 : pub struct LayerAccessStatFullDetails {
583 : pub when_millis_since_epoch: u64,
584 : pub task_kind: &'static str,
585 : pub access_kind: LayerAccessKind,
586 : }
587 :
588 : /// An event that impacts the layer's residence status.
589 : #[serde_as]
590 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
591 : pub struct LayerResidenceEvent {
592 : /// The time when the event occurred.
593 : /// NB: this timestamp is captured while the residence status changes.
594 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
595 : ///
596 : #[serde(rename = "timestamp_millis_since_epoch")]
597 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
598 : pub timestamp: SystemTime,
599 : /// The new residence status of the layer.
600 : pub status: LayerResidenceStatus,
601 : /// The reason why we had to record this event.
602 : pub reason: LayerResidenceEventReason,
603 : }
604 :
605 : /// The reason for recording a given [`LayerResidenceEvent`].
606 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
607 : pub enum LayerResidenceEventReason {
608 : /// The layer map is being populated, e.g. during timeline load or attach.
609 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
610 : /// We need to record such events because there is no persistent storage for the events.
611 : ///
612 : // https://github.com/rust-lang/rust/issues/74481
613 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
614 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
615 : LayerLoad,
616 : /// We just created the layer (e.g., freeze_and_flush or compaction).
617 : /// Such layers are always [`LayerResidenceStatus::Resident`].
618 : LayerCreate,
619 : /// We on-demand downloaded or evicted the given layer.
620 : ResidenceChange,
621 : }
622 :
623 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
624 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
625 : pub enum LayerResidenceStatus {
626 : /// Residence status for a layer file that exists locally.
627 : /// It may also exist on the remote, we don't care here.
628 : Resident,
629 : /// Residence status for a layer file that only exists on the remote.
630 : Evicted,
631 : }
632 :
633 : impl LayerResidenceEvent {
634 1172 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
635 1172 : Self {
636 1172 : status,
637 1172 : reason,
638 1172 : timestamp: SystemTime::now(),
639 1172 : }
640 1172 : }
641 : }
642 :
643 0 : #[derive(Debug, Clone, Serialize)]
644 : pub struct LayerAccessStats {
645 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
646 : pub task_kind_access_flag: Vec<&'static str>,
647 : pub first: Option<LayerAccessStatFullDetails>,
648 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
649 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
650 : }
651 :
652 0 : #[derive(Debug, Clone, Serialize)]
653 : #[serde(tag = "kind")]
654 : pub enum InMemoryLayerInfo {
655 : Open { lsn_start: Lsn },
656 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
657 : }
658 :
659 0 : #[derive(Debug, Clone, Serialize)]
660 : #[serde(tag = "kind")]
661 : pub enum HistoricLayerInfo {
662 : Delta {
663 : layer_file_name: String,
664 : layer_file_size: u64,
665 :
666 : lsn_start: Lsn,
667 : lsn_end: Lsn,
668 : remote: bool,
669 : access_stats: LayerAccessStats,
670 : },
671 : Image {
672 : layer_file_name: String,
673 : layer_file_size: u64,
674 :
675 : lsn_start: Lsn,
676 : remote: bool,
677 : access_stats: LayerAccessStats,
678 : },
679 : }
680 :
681 0 : #[derive(Debug, Serialize, Deserialize)]
682 : pub struct DownloadRemoteLayersTaskSpawnRequest {
683 : pub max_concurrent_downloads: NonZeroUsize,
684 : }
685 :
686 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
687 : pub struct DownloadRemoteLayersTaskInfo {
688 : pub task_id: String,
689 : pub state: DownloadRemoteLayersTaskState,
690 : pub total_layer_count: u64, // stable once `completed`
691 : pub successful_download_count: u64, // stable once `completed`
692 : pub failed_download_count: u64, // stable once `completed`
693 : }
694 :
695 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
696 : pub enum DownloadRemoteLayersTaskState {
697 : Running,
698 : Completed,
699 : ShutDown,
700 : }
701 :
702 0 : #[derive(Debug, Serialize, Deserialize)]
703 : pub struct TimelineGcRequest {
704 : pub gc_horizon: Option<u64>,
705 : }
706 :
707 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
708 : pub struct WalRedoManagerStatus {
709 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
710 : pub pid: Option<u32>,
711 : }
712 :
713 : pub mod virtual_file {
714 : #[derive(
715 : Copy,
716 0 : Clone,
717 4 : PartialEq,
718 : Eq,
719 0 : Hash,
720 221 : strum_macros::EnumString,
721 0 : strum_macros::Display,
722 0 : serde_with::DeserializeFromStr,
723 0 : serde_with::SerializeDisplay,
724 0 : Debug,
725 : )]
726 : #[strum(serialize_all = "kebab-case")]
727 : pub enum IoEngineKind {
728 : StdFs,
729 : #[cfg(target_os = "linux")]
730 : TokioEpollUring,
731 : }
732 : }
733 :
734 : // Wrapped in libpq CopyData
735 8 : #[derive(PartialEq, Eq, Debug)]
736 : pub enum PagestreamFeMessage {
737 : Exists(PagestreamExistsRequest),
738 : Nblocks(PagestreamNblocksRequest),
739 : GetPage(PagestreamGetPageRequest),
740 : DbSize(PagestreamDbSizeRequest),
741 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
742 : }
743 :
744 : // Wrapped in libpq CopyData
745 0 : #[derive(strum_macros::EnumProperty)]
746 : pub enum PagestreamBeMessage {
747 : Exists(PagestreamExistsResponse),
748 : Nblocks(PagestreamNblocksResponse),
749 : GetPage(PagestreamGetPageResponse),
750 : Error(PagestreamErrorResponse),
751 : DbSize(PagestreamDbSizeResponse),
752 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
753 : }
754 :
755 : // Keep in sync with `pagestore_client.h`
756 : #[repr(u8)]
757 : enum PagestreamBeMessageTag {
758 : Exists = 100,
759 : Nblocks = 101,
760 : GetPage = 102,
761 : Error = 103,
762 : DbSize = 104,
763 : GetSlruSegment = 105,
764 : }
765 : impl TryFrom<u8> for PagestreamBeMessageTag {
766 : type Error = u8;
767 0 : fn try_from(value: u8) -> Result<Self, u8> {
768 0 : match value {
769 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
770 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
771 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
772 0 : 103 => Ok(PagestreamBeMessageTag::Error),
773 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
774 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
775 0 : _ => Err(value),
776 : }
777 0 : }
778 : }
779 :
780 2 : #[derive(Debug, PartialEq, Eq)]
781 : pub struct PagestreamExistsRequest {
782 : pub latest: bool,
783 : pub lsn: Lsn,
784 : pub rel: RelTag,
785 : }
786 :
787 2 : #[derive(Debug, PartialEq, Eq)]
788 : pub struct PagestreamNblocksRequest {
789 : pub latest: bool,
790 : pub lsn: Lsn,
791 : pub rel: RelTag,
792 : }
793 :
794 2 : #[derive(Debug, PartialEq, Eq)]
795 : pub struct PagestreamGetPageRequest {
796 : pub latest: bool,
797 : pub lsn: Lsn,
798 : pub rel: RelTag,
799 : pub blkno: u32,
800 : }
801 :
802 2 : #[derive(Debug, PartialEq, Eq)]
803 : pub struct PagestreamDbSizeRequest {
804 : pub latest: bool,
805 : pub lsn: Lsn,
806 : pub dbnode: u32,
807 : }
808 :
809 0 : #[derive(Debug, PartialEq, Eq)]
810 : pub struct PagestreamGetSlruSegmentRequest {
811 : pub latest: bool,
812 : pub lsn: Lsn,
813 : pub kind: u8,
814 : pub segno: u32,
815 : }
816 :
817 0 : #[derive(Debug)]
818 : pub struct PagestreamExistsResponse {
819 : pub exists: bool,
820 : }
821 :
822 0 : #[derive(Debug)]
823 : pub struct PagestreamNblocksResponse {
824 : pub n_blocks: u32,
825 : }
826 :
827 0 : #[derive(Debug)]
828 : pub struct PagestreamGetPageResponse {
829 : pub page: Bytes,
830 : }
831 :
832 0 : #[derive(Debug)]
833 : pub struct PagestreamGetSlruSegmentResponse {
834 : pub segment: Bytes,
835 : }
836 :
837 0 : #[derive(Debug)]
838 : pub struct PagestreamErrorResponse {
839 : pub message: String,
840 : }
841 :
842 0 : #[derive(Debug)]
843 : pub struct PagestreamDbSizeResponse {
844 : pub db_size: i64,
845 : }
846 :
847 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
848 : // that require pageserver-internal types. It is sufficient to get the total size.
849 0 : #[derive(Serialize, Deserialize, Debug)]
850 : pub struct TenantHistorySize {
851 : pub id: TenantId,
852 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
853 : ///
854 : /// Will be none if `?inputs_only=true` was given.
855 : pub size: Option<u64>,
856 : }
857 :
858 : impl PagestreamFeMessage {
859 8 : pub fn serialize(&self) -> Bytes {
860 8 : let mut bytes = BytesMut::new();
861 8 :
862 8 : match self {
863 2 : Self::Exists(req) => {
864 2 : bytes.put_u8(0);
865 2 : bytes.put_u8(u8::from(req.latest));
866 2 : bytes.put_u64(req.lsn.0);
867 2 : bytes.put_u32(req.rel.spcnode);
868 2 : bytes.put_u32(req.rel.dbnode);
869 2 : bytes.put_u32(req.rel.relnode);
870 2 : bytes.put_u8(req.rel.forknum);
871 2 : }
872 :
873 2 : Self::Nblocks(req) => {
874 2 : bytes.put_u8(1);
875 2 : bytes.put_u8(u8::from(req.latest));
876 2 : bytes.put_u64(req.lsn.0);
877 2 : bytes.put_u32(req.rel.spcnode);
878 2 : bytes.put_u32(req.rel.dbnode);
879 2 : bytes.put_u32(req.rel.relnode);
880 2 : bytes.put_u8(req.rel.forknum);
881 2 : }
882 :
883 2 : Self::GetPage(req) => {
884 2 : bytes.put_u8(2);
885 2 : bytes.put_u8(u8::from(req.latest));
886 2 : bytes.put_u64(req.lsn.0);
887 2 : bytes.put_u32(req.rel.spcnode);
888 2 : bytes.put_u32(req.rel.dbnode);
889 2 : bytes.put_u32(req.rel.relnode);
890 2 : bytes.put_u8(req.rel.forknum);
891 2 : bytes.put_u32(req.blkno);
892 2 : }
893 :
894 2 : Self::DbSize(req) => {
895 2 : bytes.put_u8(3);
896 2 : bytes.put_u8(u8::from(req.latest));
897 2 : bytes.put_u64(req.lsn.0);
898 2 : bytes.put_u32(req.dbnode);
899 2 : }
900 :
901 0 : Self::GetSlruSegment(req) => {
902 0 : bytes.put_u8(4);
903 0 : bytes.put_u8(u8::from(req.latest));
904 0 : bytes.put_u64(req.lsn.0);
905 0 : bytes.put_u8(req.kind);
906 0 : bytes.put_u32(req.segno);
907 0 : }
908 : }
909 :
910 8 : bytes.into()
911 8 : }
912 :
913 8 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
914 : // TODO these gets can fail
915 :
916 : // these correspond to the NeonMessageTag enum in pagestore_client.h
917 : //
918 : // TODO: consider using protobuf or serde bincode for less error prone
919 : // serialization.
920 8 : let msg_tag = body.read_u8()?;
921 8 : match msg_tag {
922 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
923 2 : latest: body.read_u8()? != 0,
924 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
925 : rel: RelTag {
926 2 : spcnode: body.read_u32::<BigEndian>()?,
927 2 : dbnode: body.read_u32::<BigEndian>()?,
928 2 : relnode: body.read_u32::<BigEndian>()?,
929 2 : forknum: body.read_u8()?,
930 : },
931 : })),
932 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
933 2 : latest: body.read_u8()? != 0,
934 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
935 : rel: RelTag {
936 2 : spcnode: body.read_u32::<BigEndian>()?,
937 2 : dbnode: body.read_u32::<BigEndian>()?,
938 2 : relnode: body.read_u32::<BigEndian>()?,
939 2 : forknum: body.read_u8()?,
940 : },
941 : })),
942 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
943 2 : latest: body.read_u8()? != 0,
944 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
945 : rel: RelTag {
946 2 : spcnode: body.read_u32::<BigEndian>()?,
947 2 : dbnode: body.read_u32::<BigEndian>()?,
948 2 : relnode: body.read_u32::<BigEndian>()?,
949 2 : forknum: body.read_u8()?,
950 : },
951 2 : blkno: body.read_u32::<BigEndian>()?,
952 : })),
953 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
954 2 : latest: body.read_u8()? != 0,
955 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
956 2 : dbnode: body.read_u32::<BigEndian>()?,
957 : })),
958 : 4 => Ok(PagestreamFeMessage::GetSlruSegment(
959 : PagestreamGetSlruSegmentRequest {
960 0 : latest: body.read_u8()? != 0,
961 0 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
962 0 : kind: body.read_u8()?,
963 0 : segno: body.read_u32::<BigEndian>()?,
964 : },
965 : )),
966 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
967 : }
968 8 : }
969 : }
970 :
971 : impl PagestreamBeMessage {
972 0 : pub fn serialize(&self) -> Bytes {
973 0 : let mut bytes = BytesMut::new();
974 0 :
975 0 : use PagestreamBeMessageTag as Tag;
976 0 : match self {
977 0 : Self::Exists(resp) => {
978 0 : bytes.put_u8(Tag::Exists as u8);
979 0 : bytes.put_u8(resp.exists as u8);
980 0 : }
981 :
982 0 : Self::Nblocks(resp) => {
983 0 : bytes.put_u8(Tag::Nblocks as u8);
984 0 : bytes.put_u32(resp.n_blocks);
985 0 : }
986 :
987 0 : Self::GetPage(resp) => {
988 0 : bytes.put_u8(Tag::GetPage as u8);
989 0 : bytes.put(&resp.page[..]);
990 0 : }
991 :
992 0 : Self::Error(resp) => {
993 0 : bytes.put_u8(Tag::Error as u8);
994 0 : bytes.put(resp.message.as_bytes());
995 0 : bytes.put_u8(0); // null terminator
996 0 : }
997 0 : Self::DbSize(resp) => {
998 0 : bytes.put_u8(Tag::DbSize as u8);
999 0 : bytes.put_i64(resp.db_size);
1000 0 : }
1001 :
1002 0 : Self::GetSlruSegment(resp) => {
1003 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1004 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1005 0 : bytes.put(&resp.segment[..]);
1006 0 : }
1007 : }
1008 :
1009 0 : bytes.into()
1010 0 : }
1011 :
1012 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
1013 0 : let mut buf = buf.reader();
1014 0 : let msg_tag = buf.read_u8()?;
1015 :
1016 : use PagestreamBeMessageTag as Tag;
1017 0 : let ok =
1018 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
1019 : Tag::Exists => {
1020 0 : let exists = buf.read_u8()?;
1021 0 : Self::Exists(PagestreamExistsResponse {
1022 0 : exists: exists != 0,
1023 0 : })
1024 : }
1025 : Tag::Nblocks => {
1026 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1027 0 : Self::Nblocks(PagestreamNblocksResponse { n_blocks })
1028 : }
1029 : Tag::GetPage => {
1030 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
1031 0 : buf.read_exact(&mut page)?;
1032 0 : PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
1033 : }
1034 : Tag::Error => {
1035 0 : let mut msg = Vec::new();
1036 0 : buf.read_until(0, &mut msg)?;
1037 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
1038 0 : let rust_str = cstring.to_str()?;
1039 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1040 0 : message: rust_str.to_owned(),
1041 0 : })
1042 : }
1043 : Tag::DbSize => {
1044 0 : let db_size = buf.read_i64::<BigEndian>()?;
1045 0 : Self::DbSize(PagestreamDbSizeResponse { db_size })
1046 : }
1047 : Tag::GetSlruSegment => {
1048 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1049 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
1050 0 : buf.read_exact(&mut segment)?;
1051 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
1052 0 : segment: segment.into(),
1053 0 : })
1054 : }
1055 : };
1056 0 : let remaining = buf.into_inner();
1057 0 : if !remaining.is_empty() {
1058 0 : anyhow::bail!(
1059 0 : "remaining bytes in msg with tag={msg_tag}: {}",
1060 0 : remaining.len()
1061 0 : );
1062 0 : }
1063 0 : Ok(ok)
1064 0 : }
1065 :
1066 0 : pub fn kind(&self) -> &'static str {
1067 0 : match self {
1068 0 : Self::Exists(_) => "Exists",
1069 0 : Self::Nblocks(_) => "Nblocks",
1070 0 : Self::GetPage(_) => "GetPage",
1071 0 : Self::Error(_) => "Error",
1072 0 : Self::DbSize(_) => "DbSize",
1073 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
1074 : }
1075 0 : }
1076 : }
1077 :
1078 : #[cfg(test)]
1079 : mod tests {
1080 : use bytes::Buf;
1081 : use serde_json::json;
1082 :
1083 : use super::*;
1084 :
1085 2 : #[test]
1086 2 : fn test_pagestream() {
1087 2 : // Test serialization/deserialization of PagestreamFeMessage
1088 2 : let messages = vec![
1089 2 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
1090 2 : latest: true,
1091 2 : lsn: Lsn(4),
1092 2 : rel: RelTag {
1093 2 : forknum: 1,
1094 2 : spcnode: 2,
1095 2 : dbnode: 3,
1096 2 : relnode: 4,
1097 2 : },
1098 2 : }),
1099 2 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1100 2 : latest: false,
1101 2 : lsn: Lsn(4),
1102 2 : rel: RelTag {
1103 2 : forknum: 1,
1104 2 : spcnode: 2,
1105 2 : dbnode: 3,
1106 2 : relnode: 4,
1107 2 : },
1108 2 : }),
1109 2 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1110 2 : latest: true,
1111 2 : lsn: Lsn(4),
1112 2 : rel: RelTag {
1113 2 : forknum: 1,
1114 2 : spcnode: 2,
1115 2 : dbnode: 3,
1116 2 : relnode: 4,
1117 2 : },
1118 2 : blkno: 7,
1119 2 : }),
1120 2 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1121 2 : latest: true,
1122 2 : lsn: Lsn(4),
1123 2 : dbnode: 7,
1124 2 : }),
1125 2 : ];
1126 10 : for msg in messages {
1127 8 : let bytes = msg.serialize();
1128 8 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
1129 8 : assert!(msg == reconstructed);
1130 : }
1131 2 : }
1132 :
1133 2 : #[test]
1134 2 : fn test_tenantinfo_serde() {
1135 2 : // Test serialization/deserialization of TenantInfo
1136 2 : let original_active = TenantInfo {
1137 2 : id: TenantShardId::unsharded(TenantId::generate()),
1138 2 : state: TenantState::Active,
1139 2 : current_physical_size: Some(42),
1140 2 : attachment_status: TenantAttachmentStatus::Attached,
1141 2 : generation: None,
1142 2 : };
1143 2 : let expected_active = json!({
1144 2 : "id": original_active.id.to_string(),
1145 2 : "state": {
1146 2 : "slug": "Active",
1147 2 : },
1148 2 : "current_physical_size": 42,
1149 2 : "attachment_status": {
1150 2 : "slug":"attached",
1151 2 : }
1152 2 : });
1153 2 :
1154 2 : let original_broken = TenantInfo {
1155 2 : id: TenantShardId::unsharded(TenantId::generate()),
1156 2 : state: TenantState::Broken {
1157 2 : reason: "reason".into(),
1158 2 : backtrace: "backtrace info".into(),
1159 2 : },
1160 2 : current_physical_size: Some(42),
1161 2 : attachment_status: TenantAttachmentStatus::Attached,
1162 2 : generation: None,
1163 2 : };
1164 2 : let expected_broken = json!({
1165 2 : "id": original_broken.id.to_string(),
1166 2 : "state": {
1167 2 : "slug": "Broken",
1168 2 : "data": {
1169 2 : "backtrace": "backtrace info",
1170 2 : "reason": "reason",
1171 2 : }
1172 2 : },
1173 2 : "current_physical_size": 42,
1174 2 : "attachment_status": {
1175 2 : "slug":"attached",
1176 2 : }
1177 2 : });
1178 2 :
1179 2 : assert_eq!(
1180 2 : serde_json::to_value(&original_active).unwrap(),
1181 2 : expected_active
1182 2 : );
1183 :
1184 2 : assert_eq!(
1185 2 : serde_json::to_value(&original_broken).unwrap(),
1186 2 : expected_broken
1187 2 : );
1188 2 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
1189 2 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
1190 2 : }
1191 :
1192 2 : #[test]
1193 2 : fn test_reject_unknown_field() {
1194 2 : let id = TenantId::generate();
1195 2 : let create_request = json!({
1196 2 : "new_tenant_id": id.to_string(),
1197 2 : "unknown_field": "unknown_value".to_string(),
1198 2 : });
1199 2 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
1200 2 : assert!(
1201 2 : err.to_string().contains("unknown field `unknown_field`"),
1202 0 : "expect unknown field `unknown_field` error, got: {}",
1203 : err
1204 : );
1205 :
1206 2 : let id = TenantId::generate();
1207 2 : let config_request = json!({
1208 2 : "tenant_id": id.to_string(),
1209 2 : "unknown_field": "unknown_value".to_string(),
1210 2 : });
1211 2 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
1212 2 : assert!(
1213 2 : err.to_string().contains("unknown field `unknown_field`"),
1214 0 : "expect unknown field `unknown_field` error, got: {}",
1215 : err
1216 : );
1217 :
1218 2 : let attach_request = json!({
1219 2 : "config": {
1220 2 : "unknown_field": "unknown_value".to_string(),
1221 2 : },
1222 2 : });
1223 2 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
1224 2 : assert!(
1225 2 : err.to_string().contains("unknown field `unknown_field`"),
1226 0 : "expect unknown field `unknown_field` error, got: {}",
1227 : err
1228 : );
1229 2 : }
1230 :
1231 2 : #[test]
1232 2 : fn tenantstatus_activating_serde() {
1233 2 : let states = [
1234 2 : TenantState::Activating(ActivatingFrom::Loading),
1235 2 : TenantState::Activating(ActivatingFrom::Attaching),
1236 2 : ];
1237 2 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
1238 2 :
1239 2 : let actual = serde_json::to_string(&states).unwrap();
1240 2 :
1241 2 : assert_eq!(actual, expected);
1242 :
1243 2 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
1244 2 :
1245 2 : assert_eq!(states.as_slice(), &parsed);
1246 2 : }
1247 :
1248 2 : #[test]
1249 2 : fn tenantstatus_activating_strum() {
1250 2 : // tests added, because we use these for metrics
1251 2 : let examples = [
1252 2 : (line!(), TenantState::Loading, "Loading"),
1253 2 : (line!(), TenantState::Attaching, "Attaching"),
1254 2 : (
1255 2 : line!(),
1256 2 : TenantState::Activating(ActivatingFrom::Loading),
1257 2 : "Activating",
1258 2 : ),
1259 2 : (
1260 2 : line!(),
1261 2 : TenantState::Activating(ActivatingFrom::Attaching),
1262 2 : "Activating",
1263 2 : ),
1264 2 : (line!(), TenantState::Active, "Active"),
1265 2 : (
1266 2 : line!(),
1267 2 : TenantState::Stopping {
1268 2 : progress: utils::completion::Barrier::default(),
1269 2 : },
1270 2 : "Stopping",
1271 2 : ),
1272 2 : (
1273 2 : line!(),
1274 2 : TenantState::Broken {
1275 2 : reason: "Example".into(),
1276 2 : backtrace: "Looooong backtrace".into(),
1277 2 : },
1278 2 : "Broken",
1279 2 : ),
1280 2 : ];
1281 :
1282 16 : for (line, rendered, expected) in examples {
1283 14 : let actual: &'static str = rendered.into();
1284 14 : assert_eq!(actual, expected, "example on {line}");
1285 : }
1286 2 : }
1287 : }
|