Line data Source code
1 : pub mod partitioning;
2 : pub mod utilization;
3 :
4 : pub use utilization::PageserverUtilization;
5 :
6 : use std::{
7 : borrow::Cow,
8 : collections::HashMap,
9 : io::{BufRead, Read},
10 : num::{NonZeroU64, NonZeroUsize},
11 : time::{Duration, SystemTime},
12 : };
13 :
14 : use byteorder::{BigEndian, ReadBytesExt};
15 : use postgres_ffi::BLCKSZ;
16 : use serde::{Deserialize, Serialize};
17 : use serde_with::serde_as;
18 : use utils::{
19 : completion,
20 : history_buffer::HistoryBufferWithDropCounter,
21 : id::{NodeId, TenantId, TimelineId},
22 : lsn::Lsn,
23 : serde_system_time,
24 : };
25 :
26 : use crate::controller_api::PlacementPolicy;
27 : use crate::{
28 : reltag::RelTag,
29 : shard::{ShardCount, ShardStripeSize, TenantShardId},
30 : };
31 : use anyhow::bail;
32 : use bytes::{Buf, BufMut, Bytes, BytesMut};
33 :
34 : /// The state of a tenant in this pageserver.
35 : ///
36 : /// ```mermaid
37 : /// stateDiagram-v2
38 : ///
39 : /// [*] --> Loading: spawn_load()
40 : /// [*] --> Attaching: spawn_attach()
41 : ///
42 : /// Loading --> Activating: activate()
43 : /// Attaching --> Activating: activate()
44 : /// Activating --> Active: infallible
45 : ///
46 : /// Loading --> Broken: load() failure
47 : /// Attaching --> Broken: attach() failure
48 : ///
49 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
50 : /// Stopping --> Broken: late error in remove_tenant_from_memory
51 : ///
52 : /// Broken --> [*]: ignore / detach / shutdown
53 : /// Stopping --> [*]: remove_from_memory complete
54 : ///
55 : /// Active --> Broken: cfg(testing)-only tenant break point
56 : /// ```
57 : #[derive(
58 : Clone,
59 : PartialEq,
60 : Eq,
61 2 : serde::Serialize,
62 12 : serde::Deserialize,
63 0 : strum_macros::Display,
64 : strum_macros::EnumVariantNames,
65 0 : strum_macros::AsRefStr,
66 119 : strum_macros::IntoStaticStr,
67 : )]
68 : #[serde(tag = "slug", content = "data")]
69 : pub enum TenantState {
70 : /// This tenant is being loaded from local disk.
71 : ///
72 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
73 : Loading,
74 : /// This tenant is being attached to the pageserver.
75 : ///
76 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
77 : Attaching,
78 : /// The tenant is transitioning from Loading/Attaching to Active.
79 : ///
80 : /// While in this state, the individual timelines are being activated.
81 : ///
82 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
83 : Activating(ActivatingFrom),
84 : /// The tenant has finished activating and is open for business.
85 : ///
86 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
87 : Active,
88 : /// The tenant is recognized by pageserver, but it is being detached or the
89 : /// system is being shut down.
90 : ///
91 : /// Transitions out of this state are possible through `set_broken()`.
92 : Stopping {
93 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
94 : // otherwise it will not be skipped during deserialization
95 : #[serde(skip)]
96 : progress: completion::Barrier,
97 : },
98 : /// The tenant is recognized by the pageserver, but can no longer be used for
99 : /// any operations.
100 : ///
101 : /// If the tenant fails to load or attach, it will transition to this state
102 : /// and it is guaranteed that no background tasks are running in its name.
103 : ///
104 : /// The other way to transition into this state is from `Stopping` state
105 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
106 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
107 : Broken { reason: String, backtrace: String },
108 : }
109 :
110 : impl TenantState {
111 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
112 : use TenantAttachmentStatus::*;
113 :
114 : // Below TenantState::Activating is used as "transient" or "transparent" state for
115 : // attachment_status determining.
116 0 : match self {
117 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
118 : // So, technically, we can return Attached here.
119 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
120 : // But, our attach task might still be fetching the remote timelines, etc.
121 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
122 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
123 : // tenant mgr startup distinguishes attaching from loading via marker file.
124 0 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
125 : // We only reach Active after successful load / attach.
126 : // So, call atttachment status Attached.
127 0 : Self::Active => Attached,
128 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
129 : // However, it also becomes Broken if the regular load fails.
130 : // From Console's perspective there's no practical difference
131 : // because attachment_status is polled by console only during attach operation execution.
132 0 : Self::Broken { reason, .. } => Failed {
133 0 : reason: reason.to_owned(),
134 0 : },
135 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
136 : // we set the Stopping state irrespective of whether the tenant
137 : // has finished attaching or not.
138 0 : Self::Stopping { .. } => Maybe,
139 : }
140 0 : }
141 :
142 0 : pub fn broken_from_reason(reason: String) -> Self {
143 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
144 0 : Self::Broken {
145 0 : reason,
146 0 : backtrace: backtrace_str,
147 0 : }
148 0 : }
149 : }
150 :
151 : impl std::fmt::Debug for TenantState {
152 4 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 4 : match self {
154 4 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
155 4 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
156 : }
157 0 : _ => write!(f, "{self}"),
158 : }
159 4 : }
160 : }
161 :
162 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
163 8 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
164 : pub enum ActivatingFrom {
165 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
166 : Loading,
167 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
168 : Attaching,
169 : }
170 :
171 : /// A state of a timeline in pageserver's memory.
172 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
173 : pub enum TimelineState {
174 : /// The timeline is recognized by the pageserver but is not yet operational.
175 : /// In particular, the walreceiver connection loop is not running for this timeline.
176 : /// It will eventually transition to state Active or Broken.
177 : Loading,
178 : /// The timeline is fully operational.
179 : /// It can be queried, and the walreceiver connection loop is running.
180 : Active,
181 : /// The timeline was previously Loading or Active but is shutting down.
182 : /// It cannot transition back into any other state.
183 : Stopping,
184 : /// The timeline is broken and not operational (previous states: Loading or Active).
185 : Broken { reason: String, backtrace: String },
186 : }
187 :
188 0 : #[derive(Serialize, Deserialize, Clone)]
189 : pub struct TimelineCreateRequest {
190 : pub new_timeline_id: TimelineId,
191 : #[serde(default)]
192 : pub ancestor_timeline_id: Option<TimelineId>,
193 : #[serde(default)]
194 : pub existing_initdb_timeline_id: Option<TimelineId>,
195 : #[serde(default)]
196 : pub ancestor_start_lsn: Option<Lsn>,
197 : pub pg_version: Option<u32>,
198 : }
199 :
200 0 : #[derive(Serialize, Deserialize)]
201 : pub struct TenantShardSplitRequest {
202 : pub new_shard_count: u8,
203 :
204 : // A tenant's stripe size is only meaningful the first time their shard count goes
205 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
206 : //
207 : // If this is set while the stripe count is being increased from an already >1 value,
208 : // then the request will fail with 400.
209 : pub new_stripe_size: Option<ShardStripeSize>,
210 : }
211 :
212 0 : #[derive(Serialize, Deserialize)]
213 : pub struct TenantShardSplitResponse {
214 : pub new_shards: Vec<TenantShardId>,
215 : }
216 :
217 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
218 0 : #[derive(Serialize, Deserialize, Debug)]
219 : #[serde(deny_unknown_fields)]
220 : pub struct ShardParameters {
221 : pub count: ShardCount,
222 : pub stripe_size: ShardStripeSize,
223 : }
224 :
225 : impl ShardParameters {
226 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
227 :
228 0 : pub fn is_unsharded(&self) -> bool {
229 0 : self.count.is_unsharded()
230 0 : }
231 : }
232 :
233 : impl Default for ShardParameters {
234 106 : fn default() -> Self {
235 106 : Self {
236 106 : count: ShardCount::new(0),
237 106 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
238 106 : }
239 106 : }
240 : }
241 :
242 8 : #[derive(Serialize, Deserialize, Debug)]
243 : #[serde(deny_unknown_fields)]
244 : pub struct TenantCreateRequest {
245 : pub new_tenant_id: TenantShardId,
246 : #[serde(default)]
247 : #[serde(skip_serializing_if = "Option::is_none")]
248 : pub generation: Option<u32>,
249 :
250 : // If omitted, create a single shard with TenantShardId::unsharded()
251 : #[serde(default)]
252 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
253 : pub shard_parameters: ShardParameters,
254 :
255 : // This parameter is only meaningful in requests sent to the storage controller
256 : #[serde(default)]
257 : #[serde(skip_serializing_if = "Option::is_none")]
258 : pub placement_policy: Option<PlacementPolicy>,
259 :
260 : #[serde(flatten)]
261 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
262 : }
263 :
264 0 : #[derive(Deserialize, Debug)]
265 : #[serde(deny_unknown_fields)]
266 : pub struct TenantLoadRequest {
267 : #[serde(default)]
268 : #[serde(skip_serializing_if = "Option::is_none")]
269 : pub generation: Option<u32>,
270 : }
271 :
272 : impl std::ops::Deref for TenantCreateRequest {
273 : type Target = TenantConfig;
274 :
275 0 : fn deref(&self) -> &Self::Target {
276 0 : &self.config
277 0 : }
278 : }
279 :
280 : /// An alternative representation of `pageserver::tenant::TenantConf` with
281 : /// simpler types.
282 6 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
283 : pub struct TenantConfig {
284 : pub checkpoint_distance: Option<u64>,
285 : pub checkpoint_timeout: Option<String>,
286 : pub compaction_target_size: Option<u64>,
287 : pub compaction_period: Option<String>,
288 : pub compaction_threshold: Option<usize>,
289 : // defer parsing compaction_algorithm, like eviction_policy
290 : pub compaction_algorithm: Option<CompactionAlgorithm>,
291 : pub gc_horizon: Option<u64>,
292 : pub gc_period: Option<String>,
293 : pub image_creation_threshold: Option<usize>,
294 : pub pitr_interval: Option<String>,
295 : pub walreceiver_connect_timeout: Option<String>,
296 : pub lagging_wal_timeout: Option<String>,
297 : pub max_lsn_wal_lag: Option<NonZeroU64>,
298 : pub trace_read_requests: Option<bool>,
299 : pub eviction_policy: Option<EvictionPolicy>,
300 : pub min_resident_size_override: Option<u64>,
301 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
302 : pub heatmap_period: Option<String>,
303 : pub lazy_slru_download: Option<bool>,
304 : pub timeline_get_throttle: Option<ThrottleConfig>,
305 : pub image_layer_creation_check_threshold: Option<u8>,
306 : }
307 :
308 4 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
309 : #[serde(tag = "kind")]
310 : pub enum EvictionPolicy {
311 : NoEviction,
312 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
313 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
314 : }
315 :
316 : impl EvictionPolicy {
317 0 : pub fn discriminant_str(&self) -> &'static str {
318 0 : match self {
319 0 : EvictionPolicy::NoEviction => "NoEviction",
320 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
321 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
322 : }
323 0 : }
324 : }
325 :
326 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
327 : #[serde(tag = "kind")]
328 : pub enum CompactionAlgorithm {
329 : Legacy,
330 : Tiered,
331 : }
332 :
333 28 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
334 : pub struct EvictionPolicyLayerAccessThreshold {
335 : #[serde(with = "humantime_serde")]
336 : pub period: Duration,
337 : #[serde(with = "humantime_serde")]
338 : pub threshold: Duration,
339 : }
340 :
341 0 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
342 : pub struct ThrottleConfig {
343 : pub task_kinds: Vec<String>, // TaskKind
344 : pub initial: usize,
345 : #[serde(with = "humantime_serde")]
346 : pub refill_interval: Duration,
347 : pub refill_amount: NonZeroUsize,
348 : pub max: usize,
349 : pub fair: bool,
350 : }
351 :
352 : impl ThrottleConfig {
353 256 : pub fn disabled() -> Self {
354 256 : Self {
355 256 : task_kinds: vec![], // effectively disables the throttle
356 256 : // other values don't matter with emtpy `task_kinds`.
357 256 : initial: 0,
358 256 : refill_interval: Duration::from_millis(1),
359 256 : refill_amount: NonZeroUsize::new(1).unwrap(),
360 256 : max: 1,
361 256 : fair: true,
362 256 : }
363 256 : }
364 : /// The requests per second allowed by the given config.
365 0 : pub fn steady_rps(&self) -> f64 {
366 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
367 0 : }
368 : }
369 :
370 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
371 : /// lists out all possible states (and the virtual "Detached" state)
372 : /// in a flat form rather than using rust-style enums.
373 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
374 : pub enum LocationConfigMode {
375 : AttachedSingle,
376 : AttachedMulti,
377 : AttachedStale,
378 : Secondary,
379 : Detached,
380 : }
381 :
382 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
383 : pub struct LocationConfigSecondary {
384 : pub warm: bool,
385 : }
386 :
387 : /// An alternative representation of `pageserver::tenant::LocationConf`,
388 : /// for use in external-facing APIs.
389 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
390 : pub struct LocationConfig {
391 : pub mode: LocationConfigMode,
392 : /// If attaching, in what generation?
393 : #[serde(default)]
394 : pub generation: Option<u32>,
395 :
396 : // If requesting mode `Secondary`, configuration for that.
397 : #[serde(default)]
398 : pub secondary_conf: Option<LocationConfigSecondary>,
399 :
400 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
401 : // must be set accurately.
402 : #[serde(default)]
403 : pub shard_number: u8,
404 : #[serde(default)]
405 : pub shard_count: u8,
406 : #[serde(default)]
407 : pub shard_stripe_size: u32,
408 :
409 : // This configuration only affects attached mode, but should be provided irrespective
410 : // of the mode, as a secondary location might transition on startup if the response
411 : // to the `/re-attach` control plane API requests it.
412 : pub tenant_conf: TenantConfig,
413 : }
414 :
415 0 : #[derive(Serialize, Deserialize)]
416 : pub struct LocationConfigListResponse {
417 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
418 : }
419 :
420 0 : #[derive(Serialize, Deserialize)]
421 : #[serde(transparent)]
422 : pub struct TenantCreateResponse(pub TenantId);
423 :
424 : #[derive(Serialize)]
425 : pub struct StatusResponse {
426 : pub id: NodeId,
427 : }
428 :
429 0 : #[derive(Serialize, Deserialize, Debug)]
430 : #[serde(deny_unknown_fields)]
431 : pub struct TenantLocationConfigRequest {
432 : pub tenant_id: Option<TenantShardId>,
433 : #[serde(flatten)]
434 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
435 : }
436 :
437 0 : #[derive(Serialize, Deserialize, Debug)]
438 : #[serde(deny_unknown_fields)]
439 : pub struct TenantTimeTravelRequest {
440 : pub shard_counts: Vec<ShardCount>,
441 : }
442 :
443 0 : #[derive(Serialize, Deserialize, Debug)]
444 : #[serde(deny_unknown_fields)]
445 : pub struct TenantShardLocation {
446 : pub shard_id: TenantShardId,
447 : pub node_id: NodeId,
448 : }
449 :
450 0 : #[derive(Serialize, Deserialize, Debug)]
451 : #[serde(deny_unknown_fields)]
452 : pub struct TenantLocationConfigResponse {
453 : pub shards: Vec<TenantShardLocation>,
454 : // If the shards' ShardCount count is >1, stripe_size will be set.
455 : pub stripe_size: Option<ShardStripeSize>,
456 : }
457 :
458 8 : #[derive(Serialize, Deserialize, Debug)]
459 : #[serde(deny_unknown_fields)]
460 : pub struct TenantConfigRequest {
461 : pub tenant_id: TenantId,
462 : #[serde(flatten)]
463 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
464 : }
465 :
466 : impl std::ops::Deref for TenantConfigRequest {
467 : type Target = TenantConfig;
468 :
469 0 : fn deref(&self) -> &Self::Target {
470 0 : &self.config
471 0 : }
472 : }
473 :
474 : impl TenantConfigRequest {
475 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
476 0 : let config = TenantConfig::default();
477 0 : TenantConfigRequest { tenant_id, config }
478 0 : }
479 : }
480 :
481 8 : #[derive(Debug, Deserialize)]
482 : pub struct TenantAttachRequest {
483 : #[serde(default)]
484 : pub config: TenantAttachConfig,
485 : #[serde(default)]
486 : pub generation: Option<u32>,
487 : }
488 :
489 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
490 : /// its usage inside `TenantAttachRequest`.
491 2 : #[derive(Debug, Serialize, Deserialize, Default)]
492 : #[serde(deny_unknown_fields)]
493 : pub struct TenantAttachConfig {
494 : #[serde(flatten)]
495 : allowing_unknown_fields: TenantConfig,
496 : }
497 :
498 : impl std::ops::Deref for TenantAttachConfig {
499 : type Target = TenantConfig;
500 :
501 0 : fn deref(&self) -> &Self::Target {
502 0 : &self.allowing_unknown_fields
503 0 : }
504 : }
505 :
506 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
507 0 : #[derive(Serialize, Deserialize, Clone)]
508 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
509 : pub enum TenantAttachmentStatus {
510 : Maybe,
511 : Attached,
512 : Failed { reason: String },
513 : }
514 :
515 0 : #[derive(Serialize, Deserialize, Clone)]
516 : pub struct TenantInfo {
517 : pub id: TenantShardId,
518 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
519 : pub state: TenantState,
520 : /// Sum of the size of all layer files.
521 : /// If a layer is present in both local FS and S3, it counts only once.
522 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
523 : pub attachment_status: TenantAttachmentStatus,
524 : #[serde(skip_serializing_if = "Option::is_none")]
525 : pub generation: Option<u32>,
526 : }
527 :
528 0 : #[derive(Serialize, Deserialize, Clone)]
529 : pub struct TenantDetails {
530 : #[serde(flatten)]
531 : pub tenant_info: TenantInfo,
532 :
533 : pub walredo: Option<WalRedoManagerStatus>,
534 :
535 : pub timelines: Vec<TimelineId>,
536 : }
537 :
538 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
539 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
540 : pub struct TimelineInfo {
541 : pub tenant_id: TenantShardId,
542 : pub timeline_id: TimelineId,
543 :
544 : pub ancestor_timeline_id: Option<TimelineId>,
545 : pub ancestor_lsn: Option<Lsn>,
546 : pub last_record_lsn: Lsn,
547 : pub prev_record_lsn: Option<Lsn>,
548 : pub latest_gc_cutoff_lsn: Lsn,
549 : pub disk_consistent_lsn: Lsn,
550 :
551 : /// The LSN that we have succesfully uploaded to remote storage
552 : pub remote_consistent_lsn: Lsn,
553 :
554 : /// The LSN that we are advertizing to safekeepers
555 : pub remote_consistent_lsn_visible: Lsn,
556 :
557 : /// The LSN from the start of the root timeline (never changes)
558 : pub initdb_lsn: Lsn,
559 :
560 : pub current_logical_size: u64,
561 : pub current_logical_size_is_accurate: bool,
562 :
563 : pub directory_entries_counts: Vec<u64>,
564 :
565 : /// Sum of the size of all layer files.
566 : /// If a layer is present in both local FS and S3, it counts only once.
567 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
568 : pub current_logical_size_non_incremental: Option<u64>,
569 :
570 : pub timeline_dir_layer_file_size_sum: Option<u64>,
571 :
572 : pub wal_source_connstr: Option<String>,
573 : pub last_received_msg_lsn: Option<Lsn>,
574 : /// the timestamp (in microseconds) of the last received message
575 : pub last_received_msg_ts: Option<u128>,
576 : pub pg_version: u32,
577 :
578 : pub state: TimelineState,
579 :
580 : pub walreceiver_status: String,
581 : }
582 :
583 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
584 : pub struct LayerMapInfo {
585 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
586 : pub historic_layers: Vec<HistoricLayerInfo>,
587 : }
588 :
589 0 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
590 : #[repr(usize)]
591 : pub enum LayerAccessKind {
592 : GetValueReconstructData,
593 : Iter,
594 : KeyIter,
595 : Dump,
596 : }
597 :
598 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
599 : pub struct LayerAccessStatFullDetails {
600 : pub when_millis_since_epoch: u64,
601 : pub task_kind: Cow<'static, str>,
602 : pub access_kind: LayerAccessKind,
603 : }
604 :
605 : /// An event that impacts the layer's residence status.
606 : #[serde_as]
607 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
608 : pub struct LayerResidenceEvent {
609 : /// The time when the event occurred.
610 : /// NB: this timestamp is captured while the residence status changes.
611 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
612 : ///
613 : #[serde(rename = "timestamp_millis_since_epoch")]
614 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
615 : pub timestamp: SystemTime,
616 : /// The new residence status of the layer.
617 : pub status: LayerResidenceStatus,
618 : /// The reason why we had to record this event.
619 : pub reason: LayerResidenceEventReason,
620 : }
621 :
622 : /// The reason for recording a given [`LayerResidenceEvent`].
623 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
624 : pub enum LayerResidenceEventReason {
625 : /// The layer map is being populated, e.g. during timeline load or attach.
626 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
627 : /// We need to record such events because there is no persistent storage for the events.
628 : ///
629 : // https://github.com/rust-lang/rust/issues/74481
630 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
631 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
632 : LayerLoad,
633 : /// We just created the layer (e.g., freeze_and_flush or compaction).
634 : /// Such layers are always [`LayerResidenceStatus::Resident`].
635 : LayerCreate,
636 : /// We on-demand downloaded or evicted the given layer.
637 : ResidenceChange,
638 : }
639 :
640 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
641 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
642 : pub enum LayerResidenceStatus {
643 : /// Residence status for a layer file that exists locally.
644 : /// It may also exist on the remote, we don't care here.
645 : Resident,
646 : /// Residence status for a layer file that only exists on the remote.
647 : Evicted,
648 : }
649 :
650 : impl LayerResidenceEvent {
651 1980 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
652 1980 : Self {
653 1980 : status,
654 1980 : reason,
655 1980 : timestamp: SystemTime::now(),
656 1980 : }
657 1980 : }
658 : }
659 :
660 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
661 : pub struct LayerAccessStats {
662 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
663 : pub task_kind_access_flag: Vec<Cow<'static, str>>,
664 : pub first: Option<LayerAccessStatFullDetails>,
665 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
666 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
667 : }
668 :
669 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
670 : #[serde(tag = "kind")]
671 : pub enum InMemoryLayerInfo {
672 : Open { lsn_start: Lsn },
673 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
674 : }
675 :
676 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
677 : #[serde(tag = "kind")]
678 : pub enum HistoricLayerInfo {
679 : Delta {
680 : layer_file_name: String,
681 : layer_file_size: u64,
682 :
683 : lsn_start: Lsn,
684 : lsn_end: Lsn,
685 : remote: bool,
686 : access_stats: LayerAccessStats,
687 : },
688 : Image {
689 : layer_file_name: String,
690 : layer_file_size: u64,
691 :
692 : lsn_start: Lsn,
693 : remote: bool,
694 : access_stats: LayerAccessStats,
695 : },
696 : }
697 :
698 : impl HistoricLayerInfo {
699 0 : pub fn layer_file_name(&self) -> &str {
700 0 : match self {
701 : HistoricLayerInfo::Delta {
702 0 : layer_file_name, ..
703 0 : } => layer_file_name,
704 : HistoricLayerInfo::Image {
705 0 : layer_file_name, ..
706 0 : } => layer_file_name,
707 : }
708 0 : }
709 0 : pub fn is_remote(&self) -> bool {
710 0 : match self {
711 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
712 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
713 : }
714 0 : }
715 0 : pub fn set_remote(&mut self, value: bool) {
716 0 : let field = match self {
717 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
718 0 : HistoricLayerInfo::Image { remote, .. } => remote,
719 : };
720 0 : *field = value;
721 0 : }
722 : }
723 :
724 0 : #[derive(Debug, Serialize, Deserialize)]
725 : pub struct DownloadRemoteLayersTaskSpawnRequest {
726 : pub max_concurrent_downloads: NonZeroUsize,
727 : }
728 :
729 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
730 : pub struct DownloadRemoteLayersTaskInfo {
731 : pub task_id: String,
732 : pub state: DownloadRemoteLayersTaskState,
733 : pub total_layer_count: u64, // stable once `completed`
734 : pub successful_download_count: u64, // stable once `completed`
735 : pub failed_download_count: u64, // stable once `completed`
736 : }
737 :
738 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
739 : pub enum DownloadRemoteLayersTaskState {
740 : Running,
741 : Completed,
742 : ShutDown,
743 : }
744 :
745 0 : #[derive(Debug, Serialize, Deserialize)]
746 : pub struct TimelineGcRequest {
747 : pub gc_horizon: Option<u64>,
748 : }
749 :
750 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
751 : pub struct WalRedoManagerStatus {
752 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
753 : pub pid: Option<u32>,
754 : }
755 :
756 : /// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
757 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
758 : /// what's happening.
759 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
760 : pub struct SecondaryProgress {
761 : /// The remote storage LastModified time of the heatmap object we last downloaded.
762 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
763 :
764 : /// The number of layers currently on-disk
765 : pub layers_downloaded: usize,
766 : /// The number of layers in the most recently seen heatmap
767 : pub layers_total: usize,
768 :
769 : /// The number of layer bytes currently on-disk
770 : pub bytes_downloaded: u64,
771 : /// The number of layer bytes in the most recently seen heatmap
772 : pub bytes_total: u64,
773 : }
774 :
775 : pub mod virtual_file {
776 : #[derive(
777 : Copy,
778 : Clone,
779 : PartialEq,
780 : Eq,
781 : Hash,
782 252 : strum_macros::EnumString,
783 0 : strum_macros::Display,
784 0 : serde_with::DeserializeFromStr,
785 : serde_with::SerializeDisplay,
786 : Debug,
787 : )]
788 : #[strum(serialize_all = "kebab-case")]
789 : pub enum IoEngineKind {
790 : StdFs,
791 : #[cfg(target_os = "linux")]
792 : TokioEpollUring,
793 : }
794 : }
795 :
796 : // Wrapped in libpq CopyData
797 : #[derive(PartialEq, Eq, Debug)]
798 : pub enum PagestreamFeMessage {
799 : Exists(PagestreamExistsRequest),
800 : Nblocks(PagestreamNblocksRequest),
801 : GetPage(PagestreamGetPageRequest),
802 : DbSize(PagestreamDbSizeRequest),
803 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
804 : }
805 :
806 : // Wrapped in libpq CopyData
807 0 : #[derive(strum_macros::EnumProperty)]
808 : pub enum PagestreamBeMessage {
809 : Exists(PagestreamExistsResponse),
810 : Nblocks(PagestreamNblocksResponse),
811 : GetPage(PagestreamGetPageResponse),
812 : Error(PagestreamErrorResponse),
813 : DbSize(PagestreamDbSizeResponse),
814 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
815 : }
816 :
817 : // Keep in sync with `pagestore_client.h`
818 : #[repr(u8)]
819 : enum PagestreamBeMessageTag {
820 : Exists = 100,
821 : Nblocks = 101,
822 : GetPage = 102,
823 : Error = 103,
824 : DbSize = 104,
825 : GetSlruSegment = 105,
826 : }
827 : impl TryFrom<u8> for PagestreamBeMessageTag {
828 : type Error = u8;
829 0 : fn try_from(value: u8) -> Result<Self, u8> {
830 0 : match value {
831 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
832 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
833 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
834 0 : 103 => Ok(PagestreamBeMessageTag::Error),
835 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
836 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
837 0 : _ => Err(value),
838 : }
839 0 : }
840 : }
841 :
842 : #[derive(Debug, PartialEq, Eq)]
843 : pub struct PagestreamExistsRequest {
844 : pub latest: bool,
845 : pub lsn: Lsn,
846 : pub rel: RelTag,
847 : }
848 :
849 : #[derive(Debug, PartialEq, Eq)]
850 : pub struct PagestreamNblocksRequest {
851 : pub latest: bool,
852 : pub lsn: Lsn,
853 : pub rel: RelTag,
854 : }
855 :
856 : #[derive(Debug, PartialEq, Eq)]
857 : pub struct PagestreamGetPageRequest {
858 : pub latest: bool,
859 : pub lsn: Lsn,
860 : pub rel: RelTag,
861 : pub blkno: u32,
862 : }
863 :
864 : #[derive(Debug, PartialEq, Eq)]
865 : pub struct PagestreamDbSizeRequest {
866 : pub latest: bool,
867 : pub lsn: Lsn,
868 : pub dbnode: u32,
869 : }
870 :
871 : #[derive(Debug, PartialEq, Eq)]
872 : pub struct PagestreamGetSlruSegmentRequest {
873 : pub latest: bool,
874 : pub lsn: Lsn,
875 : pub kind: u8,
876 : pub segno: u32,
877 : }
878 :
879 : #[derive(Debug)]
880 : pub struct PagestreamExistsResponse {
881 : pub exists: bool,
882 : }
883 :
884 : #[derive(Debug)]
885 : pub struct PagestreamNblocksResponse {
886 : pub n_blocks: u32,
887 : }
888 :
889 : #[derive(Debug)]
890 : pub struct PagestreamGetPageResponse {
891 : pub page: Bytes,
892 : }
893 :
894 : #[derive(Debug)]
895 : pub struct PagestreamGetSlruSegmentResponse {
896 : pub segment: Bytes,
897 : }
898 :
899 : #[derive(Debug)]
900 : pub struct PagestreamErrorResponse {
901 : pub message: String,
902 : }
903 :
904 : #[derive(Debug)]
905 : pub struct PagestreamDbSizeResponse {
906 : pub db_size: i64,
907 : }
908 :
909 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
910 : // that require pageserver-internal types. It is sufficient to get the total size.
911 0 : #[derive(Serialize, Deserialize, Debug)]
912 : pub struct TenantHistorySize {
913 : pub id: TenantId,
914 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
915 : ///
916 : /// Will be none if `?inputs_only=true` was given.
917 : pub size: Option<u64>,
918 : }
919 :
920 : impl PagestreamFeMessage {
921 8 : pub fn serialize(&self) -> Bytes {
922 8 : let mut bytes = BytesMut::new();
923 8 :
924 8 : match self {
925 2 : Self::Exists(req) => {
926 2 : bytes.put_u8(0);
927 2 : bytes.put_u8(u8::from(req.latest));
928 2 : bytes.put_u64(req.lsn.0);
929 2 : bytes.put_u32(req.rel.spcnode);
930 2 : bytes.put_u32(req.rel.dbnode);
931 2 : bytes.put_u32(req.rel.relnode);
932 2 : bytes.put_u8(req.rel.forknum);
933 2 : }
934 :
935 2 : Self::Nblocks(req) => {
936 2 : bytes.put_u8(1);
937 2 : bytes.put_u8(u8::from(req.latest));
938 2 : bytes.put_u64(req.lsn.0);
939 2 : bytes.put_u32(req.rel.spcnode);
940 2 : bytes.put_u32(req.rel.dbnode);
941 2 : bytes.put_u32(req.rel.relnode);
942 2 : bytes.put_u8(req.rel.forknum);
943 2 : }
944 :
945 2 : Self::GetPage(req) => {
946 2 : bytes.put_u8(2);
947 2 : bytes.put_u8(u8::from(req.latest));
948 2 : bytes.put_u64(req.lsn.0);
949 2 : bytes.put_u32(req.rel.spcnode);
950 2 : bytes.put_u32(req.rel.dbnode);
951 2 : bytes.put_u32(req.rel.relnode);
952 2 : bytes.put_u8(req.rel.forknum);
953 2 : bytes.put_u32(req.blkno);
954 2 : }
955 :
956 2 : Self::DbSize(req) => {
957 2 : bytes.put_u8(3);
958 2 : bytes.put_u8(u8::from(req.latest));
959 2 : bytes.put_u64(req.lsn.0);
960 2 : bytes.put_u32(req.dbnode);
961 2 : }
962 :
963 0 : Self::GetSlruSegment(req) => {
964 0 : bytes.put_u8(4);
965 0 : bytes.put_u8(u8::from(req.latest));
966 0 : bytes.put_u64(req.lsn.0);
967 0 : bytes.put_u8(req.kind);
968 0 : bytes.put_u32(req.segno);
969 0 : }
970 : }
971 :
972 8 : bytes.into()
973 8 : }
974 :
975 8 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
976 : // TODO these gets can fail
977 :
978 : // these correspond to the NeonMessageTag enum in pagestore_client.h
979 : //
980 : // TODO: consider using protobuf or serde bincode for less error prone
981 : // serialization.
982 8 : let msg_tag = body.read_u8()?;
983 8 : match msg_tag {
984 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
985 2 : latest: body.read_u8()? != 0,
986 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
987 : rel: RelTag {
988 2 : spcnode: body.read_u32::<BigEndian>()?,
989 2 : dbnode: body.read_u32::<BigEndian>()?,
990 2 : relnode: body.read_u32::<BigEndian>()?,
991 2 : forknum: body.read_u8()?,
992 : },
993 : })),
994 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
995 2 : latest: body.read_u8()? != 0,
996 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
997 : rel: RelTag {
998 2 : spcnode: body.read_u32::<BigEndian>()?,
999 2 : dbnode: body.read_u32::<BigEndian>()?,
1000 2 : relnode: body.read_u32::<BigEndian>()?,
1001 2 : forknum: body.read_u8()?,
1002 : },
1003 : })),
1004 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1005 2 : latest: body.read_u8()? != 0,
1006 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1007 : rel: RelTag {
1008 2 : spcnode: body.read_u32::<BigEndian>()?,
1009 2 : dbnode: body.read_u32::<BigEndian>()?,
1010 2 : relnode: body.read_u32::<BigEndian>()?,
1011 2 : forknum: body.read_u8()?,
1012 : },
1013 2 : blkno: body.read_u32::<BigEndian>()?,
1014 : })),
1015 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1016 2 : latest: body.read_u8()? != 0,
1017 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1018 2 : dbnode: body.read_u32::<BigEndian>()?,
1019 : })),
1020 : 4 => Ok(PagestreamFeMessage::GetSlruSegment(
1021 : PagestreamGetSlruSegmentRequest {
1022 0 : latest: body.read_u8()? != 0,
1023 0 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1024 0 : kind: body.read_u8()?,
1025 0 : segno: body.read_u32::<BigEndian>()?,
1026 : },
1027 : )),
1028 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
1029 : }
1030 8 : }
1031 : }
1032 :
1033 : impl PagestreamBeMessage {
1034 0 : pub fn serialize(&self) -> Bytes {
1035 0 : let mut bytes = BytesMut::new();
1036 0 :
1037 0 : use PagestreamBeMessageTag as Tag;
1038 0 : match self {
1039 0 : Self::Exists(resp) => {
1040 0 : bytes.put_u8(Tag::Exists as u8);
1041 0 : bytes.put_u8(resp.exists as u8);
1042 0 : }
1043 :
1044 0 : Self::Nblocks(resp) => {
1045 0 : bytes.put_u8(Tag::Nblocks as u8);
1046 0 : bytes.put_u32(resp.n_blocks);
1047 0 : }
1048 :
1049 0 : Self::GetPage(resp) => {
1050 0 : bytes.put_u8(Tag::GetPage as u8);
1051 0 : bytes.put(&resp.page[..]);
1052 0 : }
1053 :
1054 0 : Self::Error(resp) => {
1055 0 : bytes.put_u8(Tag::Error as u8);
1056 0 : bytes.put(resp.message.as_bytes());
1057 0 : bytes.put_u8(0); // null terminator
1058 0 : }
1059 0 : Self::DbSize(resp) => {
1060 0 : bytes.put_u8(Tag::DbSize as u8);
1061 0 : bytes.put_i64(resp.db_size);
1062 0 : }
1063 :
1064 0 : Self::GetSlruSegment(resp) => {
1065 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1066 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1067 0 : bytes.put(&resp.segment[..]);
1068 0 : }
1069 : }
1070 :
1071 0 : bytes.into()
1072 0 : }
1073 :
1074 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
1075 0 : let mut buf = buf.reader();
1076 0 : let msg_tag = buf.read_u8()?;
1077 :
1078 : use PagestreamBeMessageTag as Tag;
1079 0 : let ok =
1080 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
1081 : Tag::Exists => {
1082 0 : let exists = buf.read_u8()?;
1083 0 : Self::Exists(PagestreamExistsResponse {
1084 0 : exists: exists != 0,
1085 0 : })
1086 : }
1087 : Tag::Nblocks => {
1088 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1089 0 : Self::Nblocks(PagestreamNblocksResponse { n_blocks })
1090 : }
1091 : Tag::GetPage => {
1092 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
1093 0 : buf.read_exact(&mut page)?;
1094 0 : PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
1095 : }
1096 : Tag::Error => {
1097 0 : let mut msg = Vec::new();
1098 0 : buf.read_until(0, &mut msg)?;
1099 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
1100 0 : let rust_str = cstring.to_str()?;
1101 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1102 0 : message: rust_str.to_owned(),
1103 0 : })
1104 : }
1105 : Tag::DbSize => {
1106 0 : let db_size = buf.read_i64::<BigEndian>()?;
1107 0 : Self::DbSize(PagestreamDbSizeResponse { db_size })
1108 : }
1109 : Tag::GetSlruSegment => {
1110 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1111 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
1112 0 : buf.read_exact(&mut segment)?;
1113 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
1114 0 : segment: segment.into(),
1115 0 : })
1116 : }
1117 : };
1118 0 : let remaining = buf.into_inner();
1119 0 : if !remaining.is_empty() {
1120 0 : anyhow::bail!(
1121 0 : "remaining bytes in msg with tag={msg_tag}: {}",
1122 0 : remaining.len()
1123 0 : );
1124 0 : }
1125 0 : Ok(ok)
1126 0 : }
1127 :
1128 0 : pub fn kind(&self) -> &'static str {
1129 0 : match self {
1130 0 : Self::Exists(_) => "Exists",
1131 0 : Self::Nblocks(_) => "Nblocks",
1132 0 : Self::GetPage(_) => "GetPage",
1133 0 : Self::Error(_) => "Error",
1134 0 : Self::DbSize(_) => "DbSize",
1135 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
1136 : }
1137 0 : }
1138 : }
1139 :
1140 : #[cfg(test)]
1141 : mod tests {
1142 : use serde_json::json;
1143 :
1144 : use super::*;
1145 :
1146 : #[test]
1147 2 : fn test_pagestream() {
1148 2 : // Test serialization/deserialization of PagestreamFeMessage
1149 2 : let messages = vec![
1150 2 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
1151 2 : latest: true,
1152 2 : lsn: Lsn(4),
1153 2 : rel: RelTag {
1154 2 : forknum: 1,
1155 2 : spcnode: 2,
1156 2 : dbnode: 3,
1157 2 : relnode: 4,
1158 2 : },
1159 2 : }),
1160 2 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1161 2 : latest: false,
1162 2 : lsn: Lsn(4),
1163 2 : rel: RelTag {
1164 2 : forknum: 1,
1165 2 : spcnode: 2,
1166 2 : dbnode: 3,
1167 2 : relnode: 4,
1168 2 : },
1169 2 : }),
1170 2 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1171 2 : latest: true,
1172 2 : lsn: Lsn(4),
1173 2 : rel: RelTag {
1174 2 : forknum: 1,
1175 2 : spcnode: 2,
1176 2 : dbnode: 3,
1177 2 : relnode: 4,
1178 2 : },
1179 2 : blkno: 7,
1180 2 : }),
1181 2 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1182 2 : latest: true,
1183 2 : lsn: Lsn(4),
1184 2 : dbnode: 7,
1185 2 : }),
1186 2 : ];
1187 10 : for msg in messages {
1188 8 : let bytes = msg.serialize();
1189 8 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
1190 8 : assert!(msg == reconstructed);
1191 : }
1192 2 : }
1193 :
1194 : #[test]
1195 2 : fn test_tenantinfo_serde() {
1196 2 : // Test serialization/deserialization of TenantInfo
1197 2 : let original_active = TenantInfo {
1198 2 : id: TenantShardId::unsharded(TenantId::generate()),
1199 2 : state: TenantState::Active,
1200 2 : current_physical_size: Some(42),
1201 2 : attachment_status: TenantAttachmentStatus::Attached,
1202 2 : generation: None,
1203 2 : };
1204 2 : let expected_active = json!({
1205 2 : "id": original_active.id.to_string(),
1206 2 : "state": {
1207 2 : "slug": "Active",
1208 2 : },
1209 2 : "current_physical_size": 42,
1210 2 : "attachment_status": {
1211 2 : "slug":"attached",
1212 2 : }
1213 2 : });
1214 2 :
1215 2 : let original_broken = TenantInfo {
1216 2 : id: TenantShardId::unsharded(TenantId::generate()),
1217 2 : state: TenantState::Broken {
1218 2 : reason: "reason".into(),
1219 2 : backtrace: "backtrace info".into(),
1220 2 : },
1221 2 : current_physical_size: Some(42),
1222 2 : attachment_status: TenantAttachmentStatus::Attached,
1223 2 : generation: None,
1224 2 : };
1225 2 : let expected_broken = json!({
1226 2 : "id": original_broken.id.to_string(),
1227 2 : "state": {
1228 2 : "slug": "Broken",
1229 2 : "data": {
1230 2 : "backtrace": "backtrace info",
1231 2 : "reason": "reason",
1232 2 : }
1233 2 : },
1234 2 : "current_physical_size": 42,
1235 2 : "attachment_status": {
1236 2 : "slug":"attached",
1237 2 : }
1238 2 : });
1239 2 :
1240 2 : assert_eq!(
1241 2 : serde_json::to_value(&original_active).unwrap(),
1242 2 : expected_active
1243 2 : );
1244 :
1245 2 : assert_eq!(
1246 2 : serde_json::to_value(&original_broken).unwrap(),
1247 2 : expected_broken
1248 2 : );
1249 2 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
1250 2 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
1251 2 : }
1252 :
1253 : #[test]
1254 2 : fn test_reject_unknown_field() {
1255 2 : let id = TenantId::generate();
1256 2 : let create_request = json!({
1257 2 : "new_tenant_id": id.to_string(),
1258 2 : "unknown_field": "unknown_value".to_string(),
1259 2 : });
1260 2 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
1261 2 : assert!(
1262 2 : err.to_string().contains("unknown field `unknown_field`"),
1263 0 : "expect unknown field `unknown_field` error, got: {}",
1264 : err
1265 : );
1266 :
1267 2 : let id = TenantId::generate();
1268 2 : let config_request = json!({
1269 2 : "tenant_id": id.to_string(),
1270 2 : "unknown_field": "unknown_value".to_string(),
1271 2 : });
1272 2 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
1273 2 : assert!(
1274 2 : err.to_string().contains("unknown field `unknown_field`"),
1275 0 : "expect unknown field `unknown_field` error, got: {}",
1276 : err
1277 : );
1278 :
1279 2 : let attach_request = json!({
1280 2 : "config": {
1281 2 : "unknown_field": "unknown_value".to_string(),
1282 2 : },
1283 2 : });
1284 2 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
1285 2 : assert!(
1286 2 : err.to_string().contains("unknown field `unknown_field`"),
1287 0 : "expect unknown field `unknown_field` error, got: {}",
1288 : err
1289 : );
1290 2 : }
1291 :
1292 : #[test]
1293 2 : fn tenantstatus_activating_serde() {
1294 2 : let states = [
1295 2 : TenantState::Activating(ActivatingFrom::Loading),
1296 2 : TenantState::Activating(ActivatingFrom::Attaching),
1297 2 : ];
1298 2 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
1299 2 :
1300 2 : let actual = serde_json::to_string(&states).unwrap();
1301 2 :
1302 2 : assert_eq!(actual, expected);
1303 :
1304 2 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
1305 2 :
1306 2 : assert_eq!(states.as_slice(), &parsed);
1307 2 : }
1308 :
1309 : #[test]
1310 2 : fn tenantstatus_activating_strum() {
1311 2 : // tests added, because we use these for metrics
1312 2 : let examples = [
1313 2 : (line!(), TenantState::Loading, "Loading"),
1314 2 : (line!(), TenantState::Attaching, "Attaching"),
1315 2 : (
1316 2 : line!(),
1317 2 : TenantState::Activating(ActivatingFrom::Loading),
1318 2 : "Activating",
1319 2 : ),
1320 2 : (
1321 2 : line!(),
1322 2 : TenantState::Activating(ActivatingFrom::Attaching),
1323 2 : "Activating",
1324 2 : ),
1325 2 : (line!(), TenantState::Active, "Active"),
1326 2 : (
1327 2 : line!(),
1328 2 : TenantState::Stopping {
1329 2 : progress: utils::completion::Barrier::default(),
1330 2 : },
1331 2 : "Stopping",
1332 2 : ),
1333 2 : (
1334 2 : line!(),
1335 2 : TenantState::Broken {
1336 2 : reason: "Example".into(),
1337 2 : backtrace: "Looooong backtrace".into(),
1338 2 : },
1339 2 : "Broken",
1340 2 : ),
1341 2 : ];
1342 :
1343 16 : for (line, rendered, expected) in examples {
1344 14 : let actual: &'static str = rendered.into();
1345 14 : assert_eq!(actual, expected, "example on {line}");
1346 : }
1347 2 : }
1348 : }
|