Line data Source code
1 : pub mod partitioning;
2 : pub mod utilization;
3 :
4 : pub use utilization::PageserverUtilization;
5 :
6 : use std::{
7 : borrow::Cow,
8 : collections::HashMap,
9 : io::{BufRead, Read},
10 : num::{NonZeroU64, NonZeroUsize},
11 : time::{Duration, SystemTime},
12 : };
13 :
14 : use byteorder::{BigEndian, ReadBytesExt};
15 : use postgres_ffi::BLCKSZ;
16 : use serde::{Deserialize, Serialize};
17 : use serde_with::serde_as;
18 : use utils::{
19 : completion,
20 : history_buffer::HistoryBufferWithDropCounter,
21 : id::{NodeId, TenantId, TimelineId},
22 : lsn::Lsn,
23 : serde_system_time,
24 : };
25 :
26 : use crate::controller_api::PlacementPolicy;
27 : use crate::{
28 : reltag::RelTag,
29 : shard::{ShardCount, ShardStripeSize, TenantShardId},
30 : };
31 : use anyhow::bail;
32 : use bytes::{Buf, BufMut, Bytes, BytesMut};
33 :
34 : /// The state of a tenant in this pageserver.
35 : ///
36 : /// ```mermaid
37 : /// stateDiagram-v2
38 : ///
39 : /// [*] --> Loading: spawn_load()
40 : /// [*] --> Attaching: spawn_attach()
41 : ///
42 : /// Loading --> Activating: activate()
43 : /// Attaching --> Activating: activate()
44 : /// Activating --> Active: infallible
45 : ///
46 : /// Loading --> Broken: load() failure
47 : /// Attaching --> Broken: attach() failure
48 : ///
49 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
50 : /// Stopping --> Broken: late error in remove_tenant_from_memory
51 : ///
52 : /// Broken --> [*]: ignore / detach / shutdown
53 : /// Stopping --> [*]: remove_from_memory complete
54 : ///
55 : /// Active --> Broken: cfg(testing)-only tenant break point
56 : /// ```
57 : #[derive(
58 : Clone,
59 : PartialEq,
60 : Eq,
61 2 : serde::Serialize,
62 12 : serde::Deserialize,
63 0 : strum_macros::Display,
64 : strum_macros::EnumVariantNames,
65 0 : strum_macros::AsRefStr,
66 123 : strum_macros::IntoStaticStr,
67 : )]
68 : #[serde(tag = "slug", content = "data")]
69 : pub enum TenantState {
70 : /// This tenant is being loaded from local disk.
71 : ///
72 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
73 : Loading,
74 : /// This tenant is being attached to the pageserver.
75 : ///
76 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
77 : Attaching,
78 : /// The tenant is transitioning from Loading/Attaching to Active.
79 : ///
80 : /// While in this state, the individual timelines are being activated.
81 : ///
82 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
83 : Activating(ActivatingFrom),
84 : /// The tenant has finished activating and is open for business.
85 : ///
86 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
87 : Active,
88 : /// The tenant is recognized by pageserver, but it is being detached or the
89 : /// system is being shut down.
90 : ///
91 : /// Transitions out of this state are possible through `set_broken()`.
92 : Stopping {
93 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
94 : // otherwise it will not be skipped during deserialization
95 : #[serde(skip)]
96 : progress: completion::Barrier,
97 : },
98 : /// The tenant is recognized by the pageserver, but can no longer be used for
99 : /// any operations.
100 : ///
101 : /// If the tenant fails to load or attach, it will transition to this state
102 : /// and it is guaranteed that no background tasks are running in its name.
103 : ///
104 : /// The other way to transition into this state is from `Stopping` state
105 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
106 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
107 : Broken { reason: String, backtrace: String },
108 : }
109 :
110 : impl TenantState {
111 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
112 : use TenantAttachmentStatus::*;
113 :
114 : // Below TenantState::Activating is used as "transient" or "transparent" state for
115 : // attachment_status determining.
116 0 : match self {
117 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
118 : // So, technically, we can return Attached here.
119 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
120 : // But, our attach task might still be fetching the remote timelines, etc.
121 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
122 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
123 : // tenant mgr startup distinguishes attaching from loading via marker file.
124 0 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
125 : // We only reach Active after successful load / attach.
126 : // So, call atttachment status Attached.
127 0 : Self::Active => Attached,
128 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
129 : // However, it also becomes Broken if the regular load fails.
130 : // From Console's perspective there's no practical difference
131 : // because attachment_status is polled by console only during attach operation execution.
132 0 : Self::Broken { reason, .. } => Failed {
133 0 : reason: reason.to_owned(),
134 0 : },
135 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
136 : // we set the Stopping state irrespective of whether the tenant
137 : // has finished attaching or not.
138 0 : Self::Stopping { .. } => Maybe,
139 : }
140 0 : }
141 :
142 0 : pub fn broken_from_reason(reason: String) -> Self {
143 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
144 0 : Self::Broken {
145 0 : reason,
146 0 : backtrace: backtrace_str,
147 0 : }
148 0 : }
149 : }
150 :
151 : impl std::fmt::Debug for TenantState {
152 4 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 4 : match self {
154 4 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
155 4 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
156 : }
157 0 : _ => write!(f, "{self}"),
158 : }
159 4 : }
160 : }
161 :
162 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
163 8 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
164 : pub enum ActivatingFrom {
165 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
166 : Loading,
167 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
168 : Attaching,
169 : }
170 :
171 : /// A state of a timeline in pageserver's memory.
172 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
173 : pub enum TimelineState {
174 : /// The timeline is recognized by the pageserver but is not yet operational.
175 : /// In particular, the walreceiver connection loop is not running for this timeline.
176 : /// It will eventually transition to state Active or Broken.
177 : Loading,
178 : /// The timeline is fully operational.
179 : /// It can be queried, and the walreceiver connection loop is running.
180 : Active,
181 : /// The timeline was previously Loading or Active but is shutting down.
182 : /// It cannot transition back into any other state.
183 : Stopping,
184 : /// The timeline is broken and not operational (previous states: Loading or Active).
185 : Broken { reason: String, backtrace: String },
186 : }
187 :
188 0 : #[derive(Serialize, Deserialize, Clone)]
189 : pub struct TimelineCreateRequest {
190 : pub new_timeline_id: TimelineId,
191 : #[serde(default)]
192 : pub ancestor_timeline_id: Option<TimelineId>,
193 : #[serde(default)]
194 : pub existing_initdb_timeline_id: Option<TimelineId>,
195 : #[serde(default)]
196 : pub ancestor_start_lsn: Option<Lsn>,
197 : pub pg_version: Option<u32>,
198 : }
199 :
200 0 : #[derive(Serialize, Deserialize)]
201 : pub struct TenantShardSplitRequest {
202 : pub new_shard_count: u8,
203 :
204 : // A tenant's stripe size is only meaningful the first time their shard count goes
205 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
206 : //
207 : // If this is set while the stripe count is being increased from an already >1 value,
208 : // then the request will fail with 400.
209 : pub new_stripe_size: Option<ShardStripeSize>,
210 : }
211 :
212 0 : #[derive(Serialize, Deserialize)]
213 : pub struct TenantShardSplitResponse {
214 : pub new_shards: Vec<TenantShardId>,
215 : }
216 :
217 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
218 0 : #[derive(Serialize, Deserialize, Debug)]
219 : #[serde(deny_unknown_fields)]
220 : pub struct ShardParameters {
221 : pub count: ShardCount,
222 : pub stripe_size: ShardStripeSize,
223 : }
224 :
225 : impl ShardParameters {
226 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
227 :
228 0 : pub fn is_unsharded(&self) -> bool {
229 0 : self.count.is_unsharded()
230 0 : }
231 : }
232 :
233 : impl Default for ShardParameters {
234 110 : fn default() -> Self {
235 110 : Self {
236 110 : count: ShardCount::new(0),
237 110 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
238 110 : }
239 110 : }
240 : }
241 :
242 8 : #[derive(Serialize, Deserialize, Debug)]
243 : #[serde(deny_unknown_fields)]
244 : pub struct TenantCreateRequest {
245 : pub new_tenant_id: TenantShardId,
246 : #[serde(default)]
247 : #[serde(skip_serializing_if = "Option::is_none")]
248 : pub generation: Option<u32>,
249 :
250 : // If omitted, create a single shard with TenantShardId::unsharded()
251 : #[serde(default)]
252 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
253 : pub shard_parameters: ShardParameters,
254 :
255 : // This parameter is only meaningful in requests sent to the storage controller
256 : #[serde(default)]
257 : #[serde(skip_serializing_if = "Option::is_none")]
258 : pub placement_policy: Option<PlacementPolicy>,
259 :
260 : #[serde(flatten)]
261 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
262 : }
263 :
264 0 : #[derive(Deserialize, Debug)]
265 : #[serde(deny_unknown_fields)]
266 : pub struct TenantLoadRequest {
267 : #[serde(default)]
268 : #[serde(skip_serializing_if = "Option::is_none")]
269 : pub generation: Option<u32>,
270 : }
271 :
272 : impl std::ops::Deref for TenantCreateRequest {
273 : type Target = TenantConfig;
274 :
275 0 : fn deref(&self) -> &Self::Target {
276 0 : &self.config
277 0 : }
278 : }
279 :
280 : /// An alternative representation of `pageserver::tenant::TenantConf` with
281 : /// simpler types.
282 6 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
283 : pub struct TenantConfig {
284 : pub checkpoint_distance: Option<u64>,
285 : pub checkpoint_timeout: Option<String>,
286 : pub compaction_target_size: Option<u64>,
287 : pub compaction_period: Option<String>,
288 : pub compaction_threshold: Option<usize>,
289 : // defer parsing compaction_algorithm, like eviction_policy
290 : pub compaction_algorithm: Option<CompactionAlgorithm>,
291 : pub gc_horizon: Option<u64>,
292 : pub gc_period: Option<String>,
293 : pub image_creation_threshold: Option<usize>,
294 : pub pitr_interval: Option<String>,
295 : pub walreceiver_connect_timeout: Option<String>,
296 : pub lagging_wal_timeout: Option<String>,
297 : pub max_lsn_wal_lag: Option<NonZeroU64>,
298 : pub trace_read_requests: Option<bool>,
299 : pub eviction_policy: Option<EvictionPolicy>,
300 : pub min_resident_size_override: Option<u64>,
301 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
302 : pub heatmap_period: Option<String>,
303 : pub lazy_slru_download: Option<bool>,
304 : pub timeline_get_throttle: Option<ThrottleConfig>,
305 : pub image_layer_creation_check_threshold: Option<u8>,
306 : }
307 :
308 4 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
309 : #[serde(tag = "kind")]
310 : pub enum EvictionPolicy {
311 : NoEviction,
312 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
313 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
314 : }
315 :
316 : impl EvictionPolicy {
317 0 : pub fn discriminant_str(&self) -> &'static str {
318 0 : match self {
319 0 : EvictionPolicy::NoEviction => "NoEviction",
320 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
321 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
322 : }
323 0 : }
324 : }
325 :
326 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
327 : #[serde(tag = "kind")]
328 : pub enum CompactionAlgorithm {
329 : Legacy,
330 : Tiered,
331 : }
332 :
333 28 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
334 : pub struct EvictionPolicyLayerAccessThreshold {
335 : #[serde(with = "humantime_serde")]
336 : pub period: Duration,
337 : #[serde(with = "humantime_serde")]
338 : pub threshold: Duration,
339 : }
340 :
341 0 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
342 : pub struct ThrottleConfig {
343 : pub task_kinds: Vec<String>, // TaskKind
344 : pub initial: usize,
345 : #[serde(with = "humantime_serde")]
346 : pub refill_interval: Duration,
347 : pub refill_amount: NonZeroUsize,
348 : pub max: usize,
349 : pub fair: bool,
350 : }
351 :
352 : impl ThrottleConfig {
353 264 : pub fn disabled() -> Self {
354 264 : Self {
355 264 : task_kinds: vec![], // effectively disables the throttle
356 264 : // other values don't matter with emtpy `task_kinds`.
357 264 : initial: 0,
358 264 : refill_interval: Duration::from_millis(1),
359 264 : refill_amount: NonZeroUsize::new(1).unwrap(),
360 264 : max: 1,
361 264 : fair: true,
362 264 : }
363 264 : }
364 : /// The requests per second allowed by the given config.
365 0 : pub fn steady_rps(&self) -> f64 {
366 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
367 0 : }
368 : }
369 :
370 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
371 : /// lists out all possible states (and the virtual "Detached" state)
372 : /// in a flat form rather than using rust-style enums.
373 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
374 : pub enum LocationConfigMode {
375 : AttachedSingle,
376 : AttachedMulti,
377 : AttachedStale,
378 : Secondary,
379 : Detached,
380 : }
381 :
382 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
383 : pub struct LocationConfigSecondary {
384 : pub warm: bool,
385 : }
386 :
387 : /// An alternative representation of `pageserver::tenant::LocationConf`,
388 : /// for use in external-facing APIs.
389 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
390 : pub struct LocationConfig {
391 : pub mode: LocationConfigMode,
392 : /// If attaching, in what generation?
393 : #[serde(default)]
394 : pub generation: Option<u32>,
395 :
396 : // If requesting mode `Secondary`, configuration for that.
397 : #[serde(default)]
398 : pub secondary_conf: Option<LocationConfigSecondary>,
399 :
400 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
401 : // must be set accurately.
402 : #[serde(default)]
403 : pub shard_number: u8,
404 : #[serde(default)]
405 : pub shard_count: u8,
406 : #[serde(default)]
407 : pub shard_stripe_size: u32,
408 :
409 : // This configuration only affects attached mode, but should be provided irrespective
410 : // of the mode, as a secondary location might transition on startup if the response
411 : // to the `/re-attach` control plane API requests it.
412 : pub tenant_conf: TenantConfig,
413 : }
414 :
415 0 : #[derive(Serialize, Deserialize)]
416 : pub struct LocationConfigListResponse {
417 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
418 : }
419 :
420 0 : #[derive(Serialize, Deserialize)]
421 : #[serde(transparent)]
422 : pub struct TenantCreateResponse(pub TenantId);
423 :
424 : #[derive(Serialize)]
425 : pub struct StatusResponse {
426 : pub id: NodeId,
427 : }
428 :
429 0 : #[derive(Serialize, Deserialize, Debug)]
430 : #[serde(deny_unknown_fields)]
431 : pub struct TenantLocationConfigRequest {
432 : pub tenant_id: Option<TenantShardId>,
433 : #[serde(flatten)]
434 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
435 : }
436 :
437 0 : #[derive(Serialize, Deserialize, Debug)]
438 : #[serde(deny_unknown_fields)]
439 : pub struct TenantTimeTravelRequest {
440 : pub shard_counts: Vec<ShardCount>,
441 : }
442 :
443 0 : #[derive(Serialize, Deserialize, Debug)]
444 : #[serde(deny_unknown_fields)]
445 : pub struct TenantShardLocation {
446 : pub shard_id: TenantShardId,
447 : pub node_id: NodeId,
448 : }
449 :
450 0 : #[derive(Serialize, Deserialize, Debug)]
451 : #[serde(deny_unknown_fields)]
452 : pub struct TenantLocationConfigResponse {
453 : pub shards: Vec<TenantShardLocation>,
454 : // If the shards' ShardCount count is >1, stripe_size will be set.
455 : pub stripe_size: Option<ShardStripeSize>,
456 : }
457 :
458 8 : #[derive(Serialize, Deserialize, Debug)]
459 : #[serde(deny_unknown_fields)]
460 : pub struct TenantConfigRequest {
461 : pub tenant_id: TenantId,
462 : #[serde(flatten)]
463 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
464 : }
465 :
466 : impl std::ops::Deref for TenantConfigRequest {
467 : type Target = TenantConfig;
468 :
469 0 : fn deref(&self) -> &Self::Target {
470 0 : &self.config
471 0 : }
472 : }
473 :
474 : impl TenantConfigRequest {
475 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
476 0 : let config = TenantConfig::default();
477 0 : TenantConfigRequest { tenant_id, config }
478 0 : }
479 : }
480 :
481 8 : #[derive(Debug, Deserialize)]
482 : pub struct TenantAttachRequest {
483 : #[serde(default)]
484 : pub config: TenantAttachConfig,
485 : #[serde(default)]
486 : pub generation: Option<u32>,
487 : }
488 :
489 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
490 : /// its usage inside `TenantAttachRequest`.
491 2 : #[derive(Debug, Serialize, Deserialize, Default)]
492 : #[serde(deny_unknown_fields)]
493 : pub struct TenantAttachConfig {
494 : #[serde(flatten)]
495 : allowing_unknown_fields: TenantConfig,
496 : }
497 :
498 : impl std::ops::Deref for TenantAttachConfig {
499 : type Target = TenantConfig;
500 :
501 0 : fn deref(&self) -> &Self::Target {
502 0 : &self.allowing_unknown_fields
503 0 : }
504 : }
505 :
506 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
507 0 : #[derive(Serialize, Deserialize, Clone)]
508 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
509 : pub enum TenantAttachmentStatus {
510 : Maybe,
511 : Attached,
512 : Failed { reason: String },
513 : }
514 :
515 0 : #[derive(Serialize, Deserialize, Clone)]
516 : pub struct TenantInfo {
517 : pub id: TenantShardId,
518 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
519 : pub state: TenantState,
520 : /// Sum of the size of all layer files.
521 : /// If a layer is present in both local FS and S3, it counts only once.
522 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
523 : pub attachment_status: TenantAttachmentStatus,
524 : #[serde(skip_serializing_if = "Option::is_none")]
525 : pub generation: Option<u32>,
526 : }
527 :
528 0 : #[derive(Serialize, Deserialize, Clone)]
529 : pub struct TenantDetails {
530 : #[serde(flatten)]
531 : pub tenant_info: TenantInfo,
532 :
533 : pub walredo: Option<WalRedoManagerStatus>,
534 :
535 : pub timelines: Vec<TimelineId>,
536 : }
537 :
538 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
539 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
540 : pub struct TimelineInfo {
541 : pub tenant_id: TenantShardId,
542 : pub timeline_id: TimelineId,
543 :
544 : pub ancestor_timeline_id: Option<TimelineId>,
545 : pub ancestor_lsn: Option<Lsn>,
546 : pub last_record_lsn: Lsn,
547 : pub prev_record_lsn: Option<Lsn>,
548 : pub latest_gc_cutoff_lsn: Lsn,
549 : pub disk_consistent_lsn: Lsn,
550 :
551 : /// The LSN that we have succesfully uploaded to remote storage
552 : pub remote_consistent_lsn: Lsn,
553 :
554 : /// The LSN that we are advertizing to safekeepers
555 : pub remote_consistent_lsn_visible: Lsn,
556 :
557 : /// The LSN from the start of the root timeline (never changes)
558 : pub initdb_lsn: Lsn,
559 :
560 : pub current_logical_size: u64,
561 : pub current_logical_size_is_accurate: bool,
562 :
563 : pub directory_entries_counts: Vec<u64>,
564 :
565 : /// Sum of the size of all layer files.
566 : /// If a layer is present in both local FS and S3, it counts only once.
567 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
568 : pub current_logical_size_non_incremental: Option<u64>,
569 :
570 : pub timeline_dir_layer_file_size_sum: Option<u64>,
571 :
572 : pub wal_source_connstr: Option<String>,
573 : pub last_received_msg_lsn: Option<Lsn>,
574 : /// the timestamp (in microseconds) of the last received message
575 : pub last_received_msg_ts: Option<u128>,
576 : pub pg_version: u32,
577 :
578 : pub state: TimelineState,
579 :
580 : pub walreceiver_status: String,
581 : }
582 :
583 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
584 : pub struct LayerMapInfo {
585 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
586 : pub historic_layers: Vec<HistoricLayerInfo>,
587 : }
588 :
589 0 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
590 : #[repr(usize)]
591 : pub enum LayerAccessKind {
592 : GetValueReconstructData,
593 : Iter,
594 : KeyIter,
595 : Dump,
596 : }
597 :
598 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
599 : pub struct LayerAccessStatFullDetails {
600 : pub when_millis_since_epoch: u64,
601 : pub task_kind: Cow<'static, str>,
602 : pub access_kind: LayerAccessKind,
603 : }
604 :
605 : /// An event that impacts the layer's residence status.
606 : #[serde_as]
607 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
608 : pub struct LayerResidenceEvent {
609 : /// The time when the event occurred.
610 : /// NB: this timestamp is captured while the residence status changes.
611 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
612 : ///
613 : #[serde(rename = "timestamp_millis_since_epoch")]
614 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
615 : pub timestamp: SystemTime,
616 : /// The new residence status of the layer.
617 : pub status: LayerResidenceStatus,
618 : /// The reason why we had to record this event.
619 : pub reason: LayerResidenceEventReason,
620 : }
621 :
622 : /// The reason for recording a given [`LayerResidenceEvent`].
623 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
624 : pub enum LayerResidenceEventReason {
625 : /// The layer map is being populated, e.g. during timeline load or attach.
626 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
627 : /// We need to record such events because there is no persistent storage for the events.
628 : ///
629 : // https://github.com/rust-lang/rust/issues/74481
630 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
631 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
632 : LayerLoad,
633 : /// We just created the layer (e.g., freeze_and_flush or compaction).
634 : /// Such layers are always [`LayerResidenceStatus::Resident`].
635 : LayerCreate,
636 : /// We on-demand downloaded or evicted the given layer.
637 : ResidenceChange,
638 : }
639 :
640 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
641 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
642 : pub enum LayerResidenceStatus {
643 : /// Residence status for a layer file that exists locally.
644 : /// It may also exist on the remote, we don't care here.
645 : Resident,
646 : /// Residence status for a layer file that only exists on the remote.
647 : Evicted,
648 : }
649 :
650 : impl LayerResidenceEvent {
651 2016 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
652 2016 : Self {
653 2016 : status,
654 2016 : reason,
655 2016 : timestamp: SystemTime::now(),
656 2016 : }
657 2016 : }
658 : }
659 :
660 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
661 : pub struct LayerAccessStats {
662 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
663 : pub task_kind_access_flag: Vec<Cow<'static, str>>,
664 : pub first: Option<LayerAccessStatFullDetails>,
665 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
666 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
667 : }
668 :
669 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
670 : #[serde(tag = "kind")]
671 : pub enum InMemoryLayerInfo {
672 : Open { lsn_start: Lsn },
673 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
674 : }
675 :
676 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
677 : #[serde(tag = "kind")]
678 : pub enum HistoricLayerInfo {
679 : Delta {
680 : layer_file_name: String,
681 : layer_file_size: u64,
682 :
683 : lsn_start: Lsn,
684 : lsn_end: Lsn,
685 : remote: bool,
686 : access_stats: LayerAccessStats,
687 : },
688 : Image {
689 : layer_file_name: String,
690 : layer_file_size: u64,
691 :
692 : lsn_start: Lsn,
693 : remote: bool,
694 : access_stats: LayerAccessStats,
695 : },
696 : }
697 :
698 : impl HistoricLayerInfo {
699 0 : pub fn layer_file_name(&self) -> &str {
700 0 : match self {
701 : HistoricLayerInfo::Delta {
702 0 : layer_file_name, ..
703 0 : } => layer_file_name,
704 : HistoricLayerInfo::Image {
705 0 : layer_file_name, ..
706 0 : } => layer_file_name,
707 : }
708 0 : }
709 0 : pub fn is_remote(&self) -> bool {
710 0 : match self {
711 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
712 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
713 : }
714 0 : }
715 0 : pub fn set_remote(&mut self, value: bool) {
716 0 : let field = match self {
717 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
718 0 : HistoricLayerInfo::Image { remote, .. } => remote,
719 : };
720 0 : *field = value;
721 0 : }
722 : }
723 :
724 0 : #[derive(Debug, Serialize, Deserialize)]
725 : pub struct DownloadRemoteLayersTaskSpawnRequest {
726 : pub max_concurrent_downloads: NonZeroUsize,
727 : }
728 :
729 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
730 : pub struct DownloadRemoteLayersTaskInfo {
731 : pub task_id: String,
732 : pub state: DownloadRemoteLayersTaskState,
733 : pub total_layer_count: u64, // stable once `completed`
734 : pub successful_download_count: u64, // stable once `completed`
735 : pub failed_download_count: u64, // stable once `completed`
736 : }
737 :
738 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
739 : pub enum DownloadRemoteLayersTaskState {
740 : Running,
741 : Completed,
742 : ShutDown,
743 : }
744 :
745 0 : #[derive(Debug, Serialize, Deserialize)]
746 : pub struct TimelineGcRequest {
747 : pub gc_horizon: Option<u64>,
748 : }
749 :
750 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
751 : pub struct WalRedoManagerProcessStatus {
752 : pub pid: u32,
753 : /// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`.
754 : /// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`.
755 : pub kind: Cow<'static, str>,
756 : }
757 :
758 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
759 : pub struct WalRedoManagerStatus {
760 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
761 : pub process: Option<WalRedoManagerProcessStatus>,
762 : }
763 :
764 : /// The progress of a secondary tenant is mostly useful when doing a long running download: e.g. initiating
765 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
766 : /// what's happening.
767 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
768 : pub struct SecondaryProgress {
769 : /// The remote storage LastModified time of the heatmap object we last downloaded.
770 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
771 :
772 : /// The number of layers currently on-disk
773 : pub layers_downloaded: usize,
774 : /// The number of layers in the most recently seen heatmap
775 : pub layers_total: usize,
776 :
777 : /// The number of layer bytes currently on-disk
778 : pub bytes_downloaded: u64,
779 : /// The number of layer bytes in the most recently seen heatmap
780 : pub bytes_total: u64,
781 : }
782 :
783 : pub mod virtual_file {
784 : #[derive(
785 : Copy,
786 : Clone,
787 : PartialEq,
788 : Eq,
789 : Hash,
790 260 : strum_macros::EnumString,
791 0 : strum_macros::Display,
792 0 : serde_with::DeserializeFromStr,
793 : serde_with::SerializeDisplay,
794 : Debug,
795 : )]
796 : #[strum(serialize_all = "kebab-case")]
797 : pub enum IoEngineKind {
798 : StdFs,
799 : #[cfg(target_os = "linux")]
800 : TokioEpollUring,
801 : }
802 : }
803 :
804 : // Wrapped in libpq CopyData
805 : #[derive(PartialEq, Eq, Debug)]
806 : pub enum PagestreamFeMessage {
807 : Exists(PagestreamExistsRequest),
808 : Nblocks(PagestreamNblocksRequest),
809 : GetPage(PagestreamGetPageRequest),
810 : DbSize(PagestreamDbSizeRequest),
811 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
812 : }
813 :
814 : // Wrapped in libpq CopyData
815 0 : #[derive(strum_macros::EnumProperty)]
816 : pub enum PagestreamBeMessage {
817 : Exists(PagestreamExistsResponse),
818 : Nblocks(PagestreamNblocksResponse),
819 : GetPage(PagestreamGetPageResponse),
820 : Error(PagestreamErrorResponse),
821 : DbSize(PagestreamDbSizeResponse),
822 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
823 : }
824 :
825 : // Keep in sync with `pagestore_client.h`
826 : #[repr(u8)]
827 : enum PagestreamBeMessageTag {
828 : Exists = 100,
829 : Nblocks = 101,
830 : GetPage = 102,
831 : Error = 103,
832 : DbSize = 104,
833 : GetSlruSegment = 105,
834 : }
835 : impl TryFrom<u8> for PagestreamBeMessageTag {
836 : type Error = u8;
837 0 : fn try_from(value: u8) -> Result<Self, u8> {
838 0 : match value {
839 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
840 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
841 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
842 0 : 103 => Ok(PagestreamBeMessageTag::Error),
843 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
844 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
845 0 : _ => Err(value),
846 : }
847 0 : }
848 : }
849 :
850 : #[derive(Debug, PartialEq, Eq)]
851 : pub struct PagestreamExistsRequest {
852 : pub latest: bool,
853 : pub lsn: Lsn,
854 : pub rel: RelTag,
855 : }
856 :
857 : #[derive(Debug, PartialEq, Eq)]
858 : pub struct PagestreamNblocksRequest {
859 : pub latest: bool,
860 : pub lsn: Lsn,
861 : pub rel: RelTag,
862 : }
863 :
864 : #[derive(Debug, PartialEq, Eq)]
865 : pub struct PagestreamGetPageRequest {
866 : pub latest: bool,
867 : pub lsn: Lsn,
868 : pub rel: RelTag,
869 : pub blkno: u32,
870 : }
871 :
872 : #[derive(Debug, PartialEq, Eq)]
873 : pub struct PagestreamDbSizeRequest {
874 : pub latest: bool,
875 : pub lsn: Lsn,
876 : pub dbnode: u32,
877 : }
878 :
879 : #[derive(Debug, PartialEq, Eq)]
880 : pub struct PagestreamGetSlruSegmentRequest {
881 : pub latest: bool,
882 : pub lsn: Lsn,
883 : pub kind: u8,
884 : pub segno: u32,
885 : }
886 :
887 : #[derive(Debug)]
888 : pub struct PagestreamExistsResponse {
889 : pub exists: bool,
890 : }
891 :
892 : #[derive(Debug)]
893 : pub struct PagestreamNblocksResponse {
894 : pub n_blocks: u32,
895 : }
896 :
897 : #[derive(Debug)]
898 : pub struct PagestreamGetPageResponse {
899 : pub page: Bytes,
900 : }
901 :
902 : #[derive(Debug)]
903 : pub struct PagestreamGetSlruSegmentResponse {
904 : pub segment: Bytes,
905 : }
906 :
907 : #[derive(Debug)]
908 : pub struct PagestreamErrorResponse {
909 : pub message: String,
910 : }
911 :
912 : #[derive(Debug)]
913 : pub struct PagestreamDbSizeResponse {
914 : pub db_size: i64,
915 : }
916 :
917 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
918 : // that require pageserver-internal types. It is sufficient to get the total size.
919 0 : #[derive(Serialize, Deserialize, Debug)]
920 : pub struct TenantHistorySize {
921 : pub id: TenantId,
922 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
923 : ///
924 : /// Will be none if `?inputs_only=true` was given.
925 : pub size: Option<u64>,
926 : }
927 :
928 : impl PagestreamFeMessage {
929 8 : pub fn serialize(&self) -> Bytes {
930 8 : let mut bytes = BytesMut::new();
931 8 :
932 8 : match self {
933 2 : Self::Exists(req) => {
934 2 : bytes.put_u8(0);
935 2 : bytes.put_u8(u8::from(req.latest));
936 2 : bytes.put_u64(req.lsn.0);
937 2 : bytes.put_u32(req.rel.spcnode);
938 2 : bytes.put_u32(req.rel.dbnode);
939 2 : bytes.put_u32(req.rel.relnode);
940 2 : bytes.put_u8(req.rel.forknum);
941 2 : }
942 :
943 2 : Self::Nblocks(req) => {
944 2 : bytes.put_u8(1);
945 2 : bytes.put_u8(u8::from(req.latest));
946 2 : bytes.put_u64(req.lsn.0);
947 2 : bytes.put_u32(req.rel.spcnode);
948 2 : bytes.put_u32(req.rel.dbnode);
949 2 : bytes.put_u32(req.rel.relnode);
950 2 : bytes.put_u8(req.rel.forknum);
951 2 : }
952 :
953 2 : Self::GetPage(req) => {
954 2 : bytes.put_u8(2);
955 2 : bytes.put_u8(u8::from(req.latest));
956 2 : bytes.put_u64(req.lsn.0);
957 2 : bytes.put_u32(req.rel.spcnode);
958 2 : bytes.put_u32(req.rel.dbnode);
959 2 : bytes.put_u32(req.rel.relnode);
960 2 : bytes.put_u8(req.rel.forknum);
961 2 : bytes.put_u32(req.blkno);
962 2 : }
963 :
964 2 : Self::DbSize(req) => {
965 2 : bytes.put_u8(3);
966 2 : bytes.put_u8(u8::from(req.latest));
967 2 : bytes.put_u64(req.lsn.0);
968 2 : bytes.put_u32(req.dbnode);
969 2 : }
970 :
971 0 : Self::GetSlruSegment(req) => {
972 0 : bytes.put_u8(4);
973 0 : bytes.put_u8(u8::from(req.latest));
974 0 : bytes.put_u64(req.lsn.0);
975 0 : bytes.put_u8(req.kind);
976 0 : bytes.put_u32(req.segno);
977 0 : }
978 : }
979 :
980 8 : bytes.into()
981 8 : }
982 :
983 8 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
984 : // TODO these gets can fail
985 :
986 : // these correspond to the NeonMessageTag enum in pagestore_client.h
987 : //
988 : // TODO: consider using protobuf or serde bincode for less error prone
989 : // serialization.
990 8 : let msg_tag = body.read_u8()?;
991 8 : match msg_tag {
992 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
993 2 : latest: body.read_u8()? != 0,
994 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
995 : rel: RelTag {
996 2 : spcnode: body.read_u32::<BigEndian>()?,
997 2 : dbnode: body.read_u32::<BigEndian>()?,
998 2 : relnode: body.read_u32::<BigEndian>()?,
999 2 : forknum: body.read_u8()?,
1000 : },
1001 : })),
1002 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1003 2 : latest: body.read_u8()? != 0,
1004 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1005 : rel: RelTag {
1006 2 : spcnode: body.read_u32::<BigEndian>()?,
1007 2 : dbnode: body.read_u32::<BigEndian>()?,
1008 2 : relnode: body.read_u32::<BigEndian>()?,
1009 2 : forknum: body.read_u8()?,
1010 : },
1011 : })),
1012 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1013 2 : latest: body.read_u8()? != 0,
1014 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1015 : rel: RelTag {
1016 2 : spcnode: body.read_u32::<BigEndian>()?,
1017 2 : dbnode: body.read_u32::<BigEndian>()?,
1018 2 : relnode: body.read_u32::<BigEndian>()?,
1019 2 : forknum: body.read_u8()?,
1020 : },
1021 2 : blkno: body.read_u32::<BigEndian>()?,
1022 : })),
1023 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1024 2 : latest: body.read_u8()? != 0,
1025 2 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1026 2 : dbnode: body.read_u32::<BigEndian>()?,
1027 : })),
1028 : 4 => Ok(PagestreamFeMessage::GetSlruSegment(
1029 : PagestreamGetSlruSegmentRequest {
1030 0 : latest: body.read_u8()? != 0,
1031 0 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
1032 0 : kind: body.read_u8()?,
1033 0 : segno: body.read_u32::<BigEndian>()?,
1034 : },
1035 : )),
1036 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
1037 : }
1038 8 : }
1039 : }
1040 :
1041 : impl PagestreamBeMessage {
1042 0 : pub fn serialize(&self) -> Bytes {
1043 0 : let mut bytes = BytesMut::new();
1044 0 :
1045 0 : use PagestreamBeMessageTag as Tag;
1046 0 : match self {
1047 0 : Self::Exists(resp) => {
1048 0 : bytes.put_u8(Tag::Exists as u8);
1049 0 : bytes.put_u8(resp.exists as u8);
1050 0 : }
1051 :
1052 0 : Self::Nblocks(resp) => {
1053 0 : bytes.put_u8(Tag::Nblocks as u8);
1054 0 : bytes.put_u32(resp.n_blocks);
1055 0 : }
1056 :
1057 0 : Self::GetPage(resp) => {
1058 0 : bytes.put_u8(Tag::GetPage as u8);
1059 0 : bytes.put(&resp.page[..]);
1060 0 : }
1061 :
1062 0 : Self::Error(resp) => {
1063 0 : bytes.put_u8(Tag::Error as u8);
1064 0 : bytes.put(resp.message.as_bytes());
1065 0 : bytes.put_u8(0); // null terminator
1066 0 : }
1067 0 : Self::DbSize(resp) => {
1068 0 : bytes.put_u8(Tag::DbSize as u8);
1069 0 : bytes.put_i64(resp.db_size);
1070 0 : }
1071 :
1072 0 : Self::GetSlruSegment(resp) => {
1073 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1074 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1075 0 : bytes.put(&resp.segment[..]);
1076 0 : }
1077 : }
1078 :
1079 0 : bytes.into()
1080 0 : }
1081 :
1082 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
1083 0 : let mut buf = buf.reader();
1084 0 : let msg_tag = buf.read_u8()?;
1085 :
1086 : use PagestreamBeMessageTag as Tag;
1087 0 : let ok =
1088 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
1089 : Tag::Exists => {
1090 0 : let exists = buf.read_u8()?;
1091 0 : Self::Exists(PagestreamExistsResponse {
1092 0 : exists: exists != 0,
1093 0 : })
1094 : }
1095 : Tag::Nblocks => {
1096 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1097 0 : Self::Nblocks(PagestreamNblocksResponse { n_blocks })
1098 : }
1099 : Tag::GetPage => {
1100 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
1101 0 : buf.read_exact(&mut page)?;
1102 0 : PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
1103 : }
1104 : Tag::Error => {
1105 0 : let mut msg = Vec::new();
1106 0 : buf.read_until(0, &mut msg)?;
1107 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
1108 0 : let rust_str = cstring.to_str()?;
1109 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1110 0 : message: rust_str.to_owned(),
1111 0 : })
1112 : }
1113 : Tag::DbSize => {
1114 0 : let db_size = buf.read_i64::<BigEndian>()?;
1115 0 : Self::DbSize(PagestreamDbSizeResponse { db_size })
1116 : }
1117 : Tag::GetSlruSegment => {
1118 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1119 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
1120 0 : buf.read_exact(&mut segment)?;
1121 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
1122 0 : segment: segment.into(),
1123 0 : })
1124 : }
1125 : };
1126 0 : let remaining = buf.into_inner();
1127 0 : if !remaining.is_empty() {
1128 0 : anyhow::bail!(
1129 0 : "remaining bytes in msg with tag={msg_tag}: {}",
1130 0 : remaining.len()
1131 0 : );
1132 0 : }
1133 0 : Ok(ok)
1134 0 : }
1135 :
1136 0 : pub fn kind(&self) -> &'static str {
1137 0 : match self {
1138 0 : Self::Exists(_) => "Exists",
1139 0 : Self::Nblocks(_) => "Nblocks",
1140 0 : Self::GetPage(_) => "GetPage",
1141 0 : Self::Error(_) => "Error",
1142 0 : Self::DbSize(_) => "DbSize",
1143 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
1144 : }
1145 0 : }
1146 : }
1147 :
1148 : #[cfg(test)]
1149 : mod tests {
1150 : use serde_json::json;
1151 :
1152 : use super::*;
1153 :
1154 : #[test]
1155 2 : fn test_pagestream() {
1156 2 : // Test serialization/deserialization of PagestreamFeMessage
1157 2 : let messages = vec![
1158 2 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
1159 2 : latest: true,
1160 2 : lsn: Lsn(4),
1161 2 : rel: RelTag {
1162 2 : forknum: 1,
1163 2 : spcnode: 2,
1164 2 : dbnode: 3,
1165 2 : relnode: 4,
1166 2 : },
1167 2 : }),
1168 2 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1169 2 : latest: false,
1170 2 : lsn: Lsn(4),
1171 2 : rel: RelTag {
1172 2 : forknum: 1,
1173 2 : spcnode: 2,
1174 2 : dbnode: 3,
1175 2 : relnode: 4,
1176 2 : },
1177 2 : }),
1178 2 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1179 2 : latest: true,
1180 2 : lsn: Lsn(4),
1181 2 : rel: RelTag {
1182 2 : forknum: 1,
1183 2 : spcnode: 2,
1184 2 : dbnode: 3,
1185 2 : relnode: 4,
1186 2 : },
1187 2 : blkno: 7,
1188 2 : }),
1189 2 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1190 2 : latest: true,
1191 2 : lsn: Lsn(4),
1192 2 : dbnode: 7,
1193 2 : }),
1194 2 : ];
1195 10 : for msg in messages {
1196 8 : let bytes = msg.serialize();
1197 8 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
1198 8 : assert!(msg == reconstructed);
1199 : }
1200 2 : }
1201 :
1202 : #[test]
1203 2 : fn test_tenantinfo_serde() {
1204 2 : // Test serialization/deserialization of TenantInfo
1205 2 : let original_active = TenantInfo {
1206 2 : id: TenantShardId::unsharded(TenantId::generate()),
1207 2 : state: TenantState::Active,
1208 2 : current_physical_size: Some(42),
1209 2 : attachment_status: TenantAttachmentStatus::Attached,
1210 2 : generation: None,
1211 2 : };
1212 2 : let expected_active = json!({
1213 2 : "id": original_active.id.to_string(),
1214 2 : "state": {
1215 2 : "slug": "Active",
1216 2 : },
1217 2 : "current_physical_size": 42,
1218 2 : "attachment_status": {
1219 2 : "slug":"attached",
1220 2 : }
1221 2 : });
1222 2 :
1223 2 : let original_broken = TenantInfo {
1224 2 : id: TenantShardId::unsharded(TenantId::generate()),
1225 2 : state: TenantState::Broken {
1226 2 : reason: "reason".into(),
1227 2 : backtrace: "backtrace info".into(),
1228 2 : },
1229 2 : current_physical_size: Some(42),
1230 2 : attachment_status: TenantAttachmentStatus::Attached,
1231 2 : generation: None,
1232 2 : };
1233 2 : let expected_broken = json!({
1234 2 : "id": original_broken.id.to_string(),
1235 2 : "state": {
1236 2 : "slug": "Broken",
1237 2 : "data": {
1238 2 : "backtrace": "backtrace info",
1239 2 : "reason": "reason",
1240 2 : }
1241 2 : },
1242 2 : "current_physical_size": 42,
1243 2 : "attachment_status": {
1244 2 : "slug":"attached",
1245 2 : }
1246 2 : });
1247 2 :
1248 2 : assert_eq!(
1249 2 : serde_json::to_value(&original_active).unwrap(),
1250 2 : expected_active
1251 2 : );
1252 :
1253 2 : assert_eq!(
1254 2 : serde_json::to_value(&original_broken).unwrap(),
1255 2 : expected_broken
1256 2 : );
1257 2 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
1258 2 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
1259 2 : }
1260 :
1261 : #[test]
1262 2 : fn test_reject_unknown_field() {
1263 2 : let id = TenantId::generate();
1264 2 : let create_request = json!({
1265 2 : "new_tenant_id": id.to_string(),
1266 2 : "unknown_field": "unknown_value".to_string(),
1267 2 : });
1268 2 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
1269 2 : assert!(
1270 2 : err.to_string().contains("unknown field `unknown_field`"),
1271 0 : "expect unknown field `unknown_field` error, got: {}",
1272 : err
1273 : );
1274 :
1275 2 : let id = TenantId::generate();
1276 2 : let config_request = json!({
1277 2 : "tenant_id": id.to_string(),
1278 2 : "unknown_field": "unknown_value".to_string(),
1279 2 : });
1280 2 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
1281 2 : assert!(
1282 2 : err.to_string().contains("unknown field `unknown_field`"),
1283 0 : "expect unknown field `unknown_field` error, got: {}",
1284 : err
1285 : );
1286 :
1287 2 : let attach_request = json!({
1288 2 : "config": {
1289 2 : "unknown_field": "unknown_value".to_string(),
1290 2 : },
1291 2 : });
1292 2 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
1293 2 : assert!(
1294 2 : err.to_string().contains("unknown field `unknown_field`"),
1295 0 : "expect unknown field `unknown_field` error, got: {}",
1296 : err
1297 : );
1298 2 : }
1299 :
1300 : #[test]
1301 2 : fn tenantstatus_activating_serde() {
1302 2 : let states = [
1303 2 : TenantState::Activating(ActivatingFrom::Loading),
1304 2 : TenantState::Activating(ActivatingFrom::Attaching),
1305 2 : ];
1306 2 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
1307 2 :
1308 2 : let actual = serde_json::to_string(&states).unwrap();
1309 2 :
1310 2 : assert_eq!(actual, expected);
1311 :
1312 2 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
1313 2 :
1314 2 : assert_eq!(states.as_slice(), &parsed);
1315 2 : }
1316 :
1317 : #[test]
1318 2 : fn tenantstatus_activating_strum() {
1319 2 : // tests added, because we use these for metrics
1320 2 : let examples = [
1321 2 : (line!(), TenantState::Loading, "Loading"),
1322 2 : (line!(), TenantState::Attaching, "Attaching"),
1323 2 : (
1324 2 : line!(),
1325 2 : TenantState::Activating(ActivatingFrom::Loading),
1326 2 : "Activating",
1327 2 : ),
1328 2 : (
1329 2 : line!(),
1330 2 : TenantState::Activating(ActivatingFrom::Attaching),
1331 2 : "Activating",
1332 2 : ),
1333 2 : (line!(), TenantState::Active, "Active"),
1334 2 : (
1335 2 : line!(),
1336 2 : TenantState::Stopping {
1337 2 : progress: utils::completion::Barrier::default(),
1338 2 : },
1339 2 : "Stopping",
1340 2 : ),
1341 2 : (
1342 2 : line!(),
1343 2 : TenantState::Broken {
1344 2 : reason: "Example".into(),
1345 2 : backtrace: "Looooong backtrace".into(),
1346 2 : },
1347 2 : "Broken",
1348 2 : ),
1349 2 : ];
1350 :
1351 16 : for (line, rendered, expected) in examples {
1352 14 : let actual: &'static str = rendered.into();
1353 14 : assert_eq!(actual, expected, "example on {line}");
1354 : }
1355 2 : }
1356 : }
|