TLA Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : num::{NonZeroU64, NonZeroUsize},
4 : time::SystemTime,
5 : };
6 :
7 : use byteorder::{BigEndian, ReadBytesExt};
8 : use serde::{Deserialize, Serialize};
9 : use serde_with::{serde_as, DisplayFromStr};
10 : use strum_macros;
11 : use utils::{
12 : completion,
13 : generation::Generation,
14 : history_buffer::HistoryBufferWithDropCounter,
15 : id::{NodeId, TenantId, TimelineId},
16 : lsn::Lsn,
17 : };
18 :
19 : use crate::reltag::RelTag;
20 : use anyhow::bail;
21 : use bytes::{BufMut, Bytes, BytesMut};
22 :
23 : /// The state of a tenant in this pageserver.
24 : ///
25 : /// ```mermaid
26 : /// stateDiagram-v2
27 : ///
28 : /// [*] --> Loading: spawn_load()
29 : /// [*] --> Attaching: spawn_attach()
30 : ///
31 : /// Loading --> Activating: activate()
32 : /// Attaching --> Activating: activate()
33 : /// Activating --> Active: infallible
34 : ///
35 : /// Loading --> Broken: load() failure
36 : /// Attaching --> Broken: attach() failure
37 : ///
38 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
39 : /// Stopping --> Broken: late error in remove_tenant_from_memory
40 : ///
41 : /// Broken --> [*]: ignore / detach / shutdown
42 : /// Stopping --> [*]: remove_from_memory complete
43 : ///
44 : /// Active --> Broken: cfg(testing)-only tenant break point
45 : /// ```
46 : #[derive(
47 CBC 20385 : Clone,
48 3115 : PartialEq,
49 : Eq,
50 844 : serde::Serialize,
51 28 : serde::Deserialize,
52 19 : strum_macros::Display,
53 : strum_macros::EnumVariantNames,
54 UBC 0 : strum_macros::AsRefStr,
55 CBC 2834 : strum_macros::IntoStaticStr,
56 : )]
57 : #[serde(tag = "slug", content = "data")]
58 : pub enum TenantState {
59 : /// This tenant is being loaded from local disk.
60 : ///
61 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
62 : Loading,
63 : /// This tenant is being attached to the pageserver.
64 : ///
65 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
66 : Attaching,
67 : /// The tenant is transitioning from Loading/Attaching to Active.
68 : ///
69 : /// While in this state, the individual timelines are being activated.
70 : ///
71 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
72 : Activating(ActivatingFrom),
73 : /// The tenant has finished activating and is open for business.
74 : ///
75 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
76 : Active,
77 : /// The tenant is recognized by pageserver, but it is being detached or the
78 : /// system is being shut down.
79 : ///
80 : /// Transitions out of this state are possible through `set_broken()`.
81 : Stopping {
82 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
83 : // otherwise it will not be skipped during deserialization
84 : #[serde(skip)]
85 : progress: completion::Barrier,
86 : },
87 : /// The tenant is recognized by the pageserver, but can no longer be used for
88 : /// any operations.
89 : ///
90 : /// If the tenant fails to load or attach, it will transition to this state
91 : /// and it is guaranteed that no background tasks are running in its name.
92 : ///
93 : /// The other way to transition into this state is from `Stopping` state
94 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
95 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
96 : Broken { reason: String, backtrace: String },
97 : }
98 :
99 : impl TenantState {
100 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
101 : use TenantAttachmentStatus::*;
102 :
103 : // Below TenantState::Activating is used as "transient" or "transparent" state for
104 : // attachment_status determining.
105 LBC (1) : match self {
106 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
107 : // So, technically, we can return Attached here.
108 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
109 : // But, our attach task might still be fetching the remote timelines, etc.
110 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
111 CBC 32 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
112 : // tenant mgr startup distinguishes attaching from loading via marker file.
113 : // If it's loading, there is no attach marker file, i.e., attach had finished in the past.
114 61 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
115 : // We only reach Active after successful load / attach.
116 : // So, call atttachment status Attached.
117 164 : Self::Active => Attached,
118 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
119 : // However, it also becomes Broken if the regular load fails.
120 : // From Console's perspective there's no practical difference
121 : // because attachment_status is polled by console only during attach operation execution.
122 103 : Self::Broken { reason, .. } => Failed {
123 103 : reason: reason.to_owned(),
124 103 : },
125 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
126 : // we set the Stopping state irrespective of whether the tenant
127 : // has finished attaching or not.
128 188 : Self::Stopping { .. } => Maybe,
129 : }
130 548 : }
131 :
132 64 : pub fn broken_from_reason(reason: String) -> Self {
133 64 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
134 64 : Self::Broken {
135 64 : reason,
136 64 : backtrace: backtrace_str,
137 64 : }
138 64 : }
139 : }
140 :
141 : impl std::fmt::Debug for TenantState {
142 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 70 : match self {
144 70 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
145 70 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
146 : }
147 19 : _ => write!(f, "{self}"),
148 : }
149 89 : }
150 : }
151 :
152 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
153 245 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
154 : pub enum ActivatingFrom {
155 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
156 : Loading,
157 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
158 : Attaching,
159 : }
160 :
161 : /// A state of a timeline in pageserver's memory.
162 3105544 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
163 : pub enum TimelineState {
164 : /// The timeline is recognized by the pageserver but is not yet operational.
165 : /// In particular, the walreceiver connection loop is not running for this timeline.
166 : /// It will eventually transition to state Active or Broken.
167 : Loading,
168 : /// The timeline is fully operational.
169 : /// It can be queried, and the walreceiver connection loop is running.
170 : Active,
171 : /// The timeline was previously Loading or Active but is shutting down.
172 : /// It cannot transition back into any other state.
173 : Stopping,
174 : /// The timeline is broken and not operational (previous states: Loading or Active).
175 : Broken { reason: String, backtrace: String },
176 : }
177 :
178 : #[serde_as]
179 10176 : #[derive(Serialize, Deserialize)]
180 : pub struct TimelineCreateRequest {
181 : #[serde_as(as = "DisplayFromStr")]
182 : pub new_timeline_id: TimelineId,
183 : #[serde(default)]
184 : #[serde_as(as = "Option<DisplayFromStr>")]
185 : pub ancestor_timeline_id: Option<TimelineId>,
186 : #[serde(default)]
187 : #[serde_as(as = "Option<DisplayFromStr>")]
188 : pub ancestor_start_lsn: Option<Lsn>,
189 : pub pg_version: Option<u32>,
190 : }
191 :
192 : #[serde_as]
193 9697 : #[derive(Serialize, Deserialize, Debug)]
194 : #[serde(deny_unknown_fields)]
195 : pub struct TenantCreateRequest {
196 : #[serde_as(as = "DisplayFromStr")]
197 : pub new_tenant_id: TenantId,
198 : #[serde(default)]
199 : #[serde(skip_serializing_if = "Option::is_none")]
200 : pub generation: Option<u32>,
201 : #[serde(flatten)]
202 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
203 : }
204 :
205 : #[serde_as]
206 UBC 0 : #[derive(Deserialize, Debug)]
207 : #[serde(deny_unknown_fields)]
208 : pub struct TenantLoadRequest {
209 : #[serde(default)]
210 : #[serde(skip_serializing_if = "Option::is_none")]
211 : pub generation: Option<u32>,
212 : }
213 :
214 : impl std::ops::Deref for TenantCreateRequest {
215 : type Target = TenantConfig;
216 :
217 0 : fn deref(&self) -> &Self::Target {
218 0 : &self.config
219 0 : }
220 : }
221 :
222 : /// An alternative representation of `pageserver::tenant::TenantConf` with
223 : /// simpler types.
224 CBC 15957 : #[derive(Serialize, Deserialize, Debug, Default)]
225 : pub struct TenantConfig {
226 : pub checkpoint_distance: Option<u64>,
227 : pub checkpoint_timeout: Option<String>,
228 : pub compaction_target_size: Option<u64>,
229 : pub compaction_period: Option<String>,
230 : pub compaction_threshold: Option<usize>,
231 : pub gc_horizon: Option<u64>,
232 : pub gc_period: Option<String>,
233 : pub image_creation_threshold: Option<usize>,
234 : pub pitr_interval: Option<String>,
235 : pub walreceiver_connect_timeout: Option<String>,
236 : pub lagging_wal_timeout: Option<String>,
237 : pub max_lsn_wal_lag: Option<NonZeroU64>,
238 : pub trace_read_requests: Option<bool>,
239 : // We defer the parsing of the eviction_policy field to the request handler.
240 : // Otherwise we'd have to move the types for eviction policy into this package.
241 : // We might do that once the eviction feature has stabilizied.
242 : // For now, this field is not even documented in the openapi_spec.yml.
243 : pub eviction_policy: Option<serde_json::Value>,
244 : pub min_resident_size_override: Option<u64>,
245 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
246 : pub gc_feedback: Option<bool>,
247 : }
248 :
249 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
250 : /// lists out all possible states (and the virtual "Detached" state)
251 : /// in a flat form rather than using rust-style enums.
252 UBC 0 : #[derive(Serialize, Deserialize, Debug)]
253 : pub enum LocationConfigMode {
254 : AttachedSingle,
255 : AttachedMulti,
256 : AttachedStale,
257 : Secondary,
258 : Detached,
259 : }
260 :
261 0 : #[derive(Serialize, Deserialize, Debug)]
262 : pub struct LocationConfigSecondary {
263 : pub warm: bool,
264 : }
265 :
266 : /// An alternative representation of `pageserver::tenant::LocationConf`,
267 : /// for use in external-facing APIs.
268 0 : #[derive(Serialize, Deserialize, Debug)]
269 : pub struct LocationConfig {
270 : pub mode: LocationConfigMode,
271 : /// If attaching, in what generation?
272 : #[serde(default)]
273 : pub generation: Option<Generation>,
274 : #[serde(default)]
275 : pub secondary_conf: Option<LocationConfigSecondary>,
276 :
277 : // If requesting mode `Secondary`, configuration for that.
278 : // Custom storage configuration for the tenant, if any
279 : pub tenant_conf: TenantConfig,
280 : }
281 :
282 : #[serde_as]
283 CBC 449 : #[derive(Serialize, Deserialize)]
284 : #[serde(transparent)]
285 : pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
286 :
287 564 : #[derive(Serialize)]
288 : pub struct StatusResponse {
289 : pub id: NodeId,
290 : }
291 :
292 : #[serde_as]
293 UBC 0 : #[derive(Serialize, Deserialize, Debug)]
294 : #[serde(deny_unknown_fields)]
295 : pub struct TenantLocationConfigRequest {
296 : #[serde_as(as = "DisplayFromStr")]
297 : pub tenant_id: TenantId,
298 : #[serde(flatten)]
299 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
300 : }
301 :
302 : #[serde_as]
303 CBC 420 : #[derive(Serialize, Deserialize, Debug)]
304 : #[serde(deny_unknown_fields)]
305 : pub struct TenantConfigRequest {
306 : #[serde_as(as = "DisplayFromStr")]
307 : pub tenant_id: TenantId,
308 : #[serde(flatten)]
309 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
310 : }
311 :
312 : impl std::ops::Deref for TenantConfigRequest {
313 : type Target = TenantConfig;
314 :
315 UBC 0 : fn deref(&self) -> &Self::Target {
316 0 : &self.config
317 0 : }
318 : }
319 :
320 : impl TenantConfigRequest {
321 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
322 0 : let config = TenantConfig {
323 0 : checkpoint_distance: None,
324 0 : checkpoint_timeout: None,
325 0 : compaction_target_size: None,
326 0 : compaction_period: None,
327 0 : compaction_threshold: None,
328 0 : gc_horizon: None,
329 0 : gc_period: None,
330 0 : image_creation_threshold: None,
331 0 : pitr_interval: None,
332 0 : walreceiver_connect_timeout: None,
333 0 : lagging_wal_timeout: None,
334 0 : max_lsn_wal_lag: None,
335 0 : trace_read_requests: None,
336 0 : eviction_policy: None,
337 0 : min_resident_size_override: None,
338 0 : evictions_low_residence_duration_metric_threshold: None,
339 0 : gc_feedback: None,
340 0 : };
341 0 : TenantConfigRequest { tenant_id, config }
342 0 : }
343 : }
344 :
345 CBC 195 : #[derive(Debug, Deserialize)]
346 : pub struct TenantAttachRequest {
347 : pub config: TenantAttachConfig,
348 : #[serde(default)]
349 : pub generation: Option<u32>,
350 : }
351 :
352 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
353 : /// its usage inside `TenantAttachRequest`.
354 112 : #[derive(Debug, Serialize, Deserialize)]
355 : #[serde(deny_unknown_fields)]
356 : pub struct TenantAttachConfig {
357 : #[serde(flatten)]
358 : allowing_unknown_fields: TenantConfig,
359 : }
360 :
361 : impl std::ops::Deref for TenantAttachConfig {
362 : type Target = TenantConfig;
363 :
364 55 : fn deref(&self) -> &Self::Target {
365 55 : &self.allowing_unknown_fields
366 55 : }
367 : }
368 :
369 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
370 653 : #[derive(Serialize, Deserialize, Clone)]
371 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
372 : pub enum TenantAttachmentStatus {
373 : Maybe,
374 : Attached,
375 : Failed { reason: String },
376 : }
377 :
378 : #[serde_as]
379 550 : #[derive(Serialize, Deserialize, Clone)]
380 : pub struct TenantInfo {
381 : #[serde_as(as = "DisplayFromStr")]
382 : pub id: TenantId,
383 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
384 : pub state: TenantState,
385 : /// Sum of the size of all layer files.
386 : /// If a layer is present in both local FS and S3, it counts only once.
387 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
388 : pub attachment_status: TenantAttachmentStatus,
389 : }
390 :
391 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
392 : #[serde_as]
393 43108 : #[derive(Debug, Serialize, Deserialize, Clone)]
394 : pub struct TimelineInfo {
395 : #[serde_as(as = "DisplayFromStr")]
396 : pub tenant_id: TenantId,
397 : #[serde_as(as = "DisplayFromStr")]
398 : pub timeline_id: TimelineId,
399 :
400 : #[serde_as(as = "Option<DisplayFromStr>")]
401 : pub ancestor_timeline_id: Option<TimelineId>,
402 : #[serde_as(as = "Option<DisplayFromStr>")]
403 : pub ancestor_lsn: Option<Lsn>,
404 : #[serde_as(as = "DisplayFromStr")]
405 : pub last_record_lsn: Lsn,
406 : #[serde_as(as = "Option<DisplayFromStr>")]
407 : pub prev_record_lsn: Option<Lsn>,
408 : #[serde_as(as = "DisplayFromStr")]
409 : pub latest_gc_cutoff_lsn: Lsn,
410 : #[serde_as(as = "DisplayFromStr")]
411 : pub disk_consistent_lsn: Lsn,
412 :
413 : /// The LSN that we have succesfully uploaded to remote storage
414 : #[serde_as(as = "DisplayFromStr")]
415 : pub remote_consistent_lsn: Lsn,
416 :
417 : /// The LSN that we are advertizing to safekeepers
418 : #[serde_as(as = "DisplayFromStr")]
419 : pub remote_consistent_lsn_visible: Lsn,
420 :
421 : pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
422 : /// Sum of the size of all layer files.
423 : /// If a layer is present in both local FS and S3, it counts only once.
424 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
425 : pub current_logical_size_non_incremental: Option<u64>,
426 :
427 : pub timeline_dir_layer_file_size_sum: Option<u64>,
428 :
429 : pub wal_source_connstr: Option<String>,
430 : #[serde_as(as = "Option<DisplayFromStr>")]
431 : pub last_received_msg_lsn: Option<Lsn>,
432 : /// the timestamp (in microseconds) of the last received message
433 : pub last_received_msg_ts: Option<u128>,
434 : pub pg_version: u32,
435 :
436 : pub state: TimelineState,
437 :
438 : pub walreceiver_status: String,
439 : }
440 :
441 85 : #[derive(Debug, Clone, Serialize)]
442 : pub struct LayerMapInfo {
443 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
444 : pub historic_layers: Vec<HistoricLayerInfo>,
445 : }
446 :
447 43821128 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
448 : #[repr(usize)]
449 : pub enum LayerAccessKind {
450 : GetValueReconstructData,
451 : Iter,
452 : KeyIter,
453 : Dump,
454 : }
455 :
456 8532 : #[derive(Debug, Clone, Serialize, Deserialize)]
457 : pub struct LayerAccessStatFullDetails {
458 : pub when_millis_since_epoch: u64,
459 : pub task_kind: &'static str,
460 : pub access_kind: LayerAccessKind,
461 : }
462 :
463 : /// An event that impacts the layer's residence status.
464 : #[serde_as]
465 8864 : #[derive(Debug, Clone, Serialize, Deserialize)]
466 : pub struct LayerResidenceEvent {
467 : /// The time when the event occurred.
468 : /// NB: this timestamp is captured while the residence status changes.
469 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
470 : ///
471 : #[serde(rename = "timestamp_millis_since_epoch")]
472 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
473 : pub timestamp: SystemTime,
474 : /// The new residence status of the layer.
475 : pub status: LayerResidenceStatus,
476 : /// The reason why we had to record this event.
477 : pub reason: LayerResidenceEventReason,
478 : }
479 :
480 : /// The reason for recording a given [`LayerResidenceEvent`].
481 8864 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
482 : pub enum LayerResidenceEventReason {
483 : /// The layer map is being populated, e.g. during timeline load or attach.
484 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
485 : /// We need to record such events because there is no persistent storage for the events.
486 : ///
487 : // https://github.com/rust-lang/rust/issues/74481
488 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
489 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
490 : LayerLoad,
491 : /// We just created the layer (e.g., freeze_and_flush or compaction).
492 : /// Such layers are always [`LayerResidenceStatus::Resident`].
493 : LayerCreate,
494 : /// We on-demand downloaded or evicted the given layer.
495 : ResidenceChange,
496 : }
497 :
498 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
499 8864 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
500 : pub enum LayerResidenceStatus {
501 : /// Residence status for a layer file that exists locally.
502 : /// It may also exist on the remote, we don't care here.
503 : Resident,
504 : /// Residence status for a layer file that only exists on the remote.
505 : Evicted,
506 : }
507 :
508 : impl LayerResidenceEvent {
509 57278 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
510 57278 : Self {
511 57278 : status,
512 57278 : reason,
513 57278 : timestamp: SystemTime::now(),
514 57278 : }
515 57278 : }
516 : }
517 :
518 1775 : #[derive(Debug, Clone, Serialize)]
519 : pub struct LayerAccessStats {
520 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
521 : pub task_kind_access_flag: Vec<&'static str>,
522 : pub first: Option<LayerAccessStatFullDetails>,
523 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
524 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
525 : }
526 :
527 : #[serde_as]
528 21 : #[derive(Debug, Clone, Serialize)]
529 : #[serde(tag = "kind")]
530 : pub enum InMemoryLayerInfo {
531 : Open {
532 : #[serde_as(as = "DisplayFromStr")]
533 : lsn_start: Lsn,
534 : },
535 : Frozen {
536 : #[serde_as(as = "DisplayFromStr")]
537 : lsn_start: Lsn,
538 : #[serde_as(as = "DisplayFromStr")]
539 : lsn_end: Lsn,
540 : },
541 : }
542 :
543 : #[serde_as]
544 1775 : #[derive(Debug, Clone, Serialize)]
545 : #[serde(tag = "kind")]
546 : pub enum HistoricLayerInfo {
547 : Delta {
548 : layer_file_name: String,
549 : layer_file_size: u64,
550 :
551 : #[serde_as(as = "DisplayFromStr")]
552 : lsn_start: Lsn,
553 : #[serde_as(as = "DisplayFromStr")]
554 : lsn_end: Lsn,
555 : remote: bool,
556 : access_stats: LayerAccessStats,
557 : },
558 : Image {
559 : layer_file_name: String,
560 : layer_file_size: u64,
561 :
562 : #[serde_as(as = "DisplayFromStr")]
563 : lsn_start: Lsn,
564 : remote: bool,
565 : access_stats: LayerAccessStats,
566 : },
567 : }
568 :
569 9 : #[derive(Debug, Serialize, Deserialize)]
570 : pub struct DownloadRemoteLayersTaskSpawnRequest {
571 : pub max_concurrent_downloads: NonZeroUsize,
572 : }
573 :
574 25 : #[derive(Debug, Serialize, Deserialize, Clone)]
575 : pub struct DownloadRemoteLayersTaskInfo {
576 : pub task_id: String,
577 : pub state: DownloadRemoteLayersTaskState,
578 : pub total_layer_count: u64, // stable once `completed`
579 : pub successful_download_count: u64, // stable once `completed`
580 : pub failed_download_count: u64, // stable once `completed`
581 : }
582 :
583 25 : #[derive(Debug, Serialize, Deserialize, Clone)]
584 : pub enum DownloadRemoteLayersTaskState {
585 : Running,
586 : Completed,
587 : ShutDown,
588 : }
589 :
590 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
591 :
592 : /// Information for configuring a single fail point
593 1065 : #[derive(Debug, Serialize, Deserialize)]
594 : pub struct FailpointConfig {
595 : /// Name of the fail point
596 : pub name: String,
597 : /// List of actions to take, using the format described in `fail::cfg`
598 : ///
599 : /// We also support `actions = "exit"` to cause the fail point to immediately exit.
600 : pub actions: String,
601 : }
602 :
603 1113 : #[derive(Debug, Serialize, Deserialize)]
604 : pub struct TimelineGcRequest {
605 : pub gc_horizon: Option<u64>,
606 : }
607 :
608 : // Wrapped in libpq CopyData
609 4 : #[derive(PartialEq, Eq, Debug)]
610 : pub enum PagestreamFeMessage {
611 : Exists(PagestreamExistsRequest),
612 : Nblocks(PagestreamNblocksRequest),
613 : GetPage(PagestreamGetPageRequest),
614 : DbSize(PagestreamDbSizeRequest),
615 : }
616 :
617 : // Wrapped in libpq CopyData
618 : pub enum PagestreamBeMessage {
619 : Exists(PagestreamExistsResponse),
620 : Nblocks(PagestreamNblocksResponse),
621 : GetPage(PagestreamGetPageResponse),
622 : Error(PagestreamErrorResponse),
623 : DbSize(PagestreamDbSizeResponse),
624 : }
625 :
626 1 : #[derive(Debug, PartialEq, Eq)]
627 : pub struct PagestreamExistsRequest {
628 : pub latest: bool,
629 : pub lsn: Lsn,
630 : pub rel: RelTag,
631 : }
632 :
633 1 : #[derive(Debug, PartialEq, Eq)]
634 : pub struct PagestreamNblocksRequest {
635 : pub latest: bool,
636 : pub lsn: Lsn,
637 : pub rel: RelTag,
638 : }
639 :
640 1 : #[derive(Debug, PartialEq, Eq)]
641 : pub struct PagestreamGetPageRequest {
642 : pub latest: bool,
643 : pub lsn: Lsn,
644 : pub rel: RelTag,
645 : pub blkno: u32,
646 : }
647 :
648 1 : #[derive(Debug, PartialEq, Eq)]
649 : pub struct PagestreamDbSizeRequest {
650 : pub latest: bool,
651 : pub lsn: Lsn,
652 : pub dbnode: u32,
653 : }
654 :
655 UBC 0 : #[derive(Debug)]
656 : pub struct PagestreamExistsResponse {
657 : pub exists: bool,
658 : }
659 :
660 0 : #[derive(Debug)]
661 : pub struct PagestreamNblocksResponse {
662 : pub n_blocks: u32,
663 : }
664 :
665 0 : #[derive(Debug)]
666 : pub struct PagestreamGetPageResponse {
667 : pub page: Bytes,
668 : }
669 :
670 0 : #[derive(Debug)]
671 : pub struct PagestreamErrorResponse {
672 : pub message: String,
673 : }
674 :
675 0 : #[derive(Debug)]
676 : pub struct PagestreamDbSizeResponse {
677 : pub db_size: i64,
678 : }
679 :
680 : impl PagestreamFeMessage {
681 CBC 4 : pub fn serialize(&self) -> Bytes {
682 4 : let mut bytes = BytesMut::new();
683 4 :
684 4 : match self {
685 1 : Self::Exists(req) => {
686 1 : bytes.put_u8(0);
687 1 : bytes.put_u8(u8::from(req.latest));
688 1 : bytes.put_u64(req.lsn.0);
689 1 : bytes.put_u32(req.rel.spcnode);
690 1 : bytes.put_u32(req.rel.dbnode);
691 1 : bytes.put_u32(req.rel.relnode);
692 1 : bytes.put_u8(req.rel.forknum);
693 1 : }
694 :
695 1 : Self::Nblocks(req) => {
696 1 : bytes.put_u8(1);
697 1 : bytes.put_u8(u8::from(req.latest));
698 1 : bytes.put_u64(req.lsn.0);
699 1 : bytes.put_u32(req.rel.spcnode);
700 1 : bytes.put_u32(req.rel.dbnode);
701 1 : bytes.put_u32(req.rel.relnode);
702 1 : bytes.put_u8(req.rel.forknum);
703 1 : }
704 :
705 1 : Self::GetPage(req) => {
706 1 : bytes.put_u8(2);
707 1 : bytes.put_u8(u8::from(req.latest));
708 1 : bytes.put_u64(req.lsn.0);
709 1 : bytes.put_u32(req.rel.spcnode);
710 1 : bytes.put_u32(req.rel.dbnode);
711 1 : bytes.put_u32(req.rel.relnode);
712 1 : bytes.put_u8(req.rel.forknum);
713 1 : bytes.put_u32(req.blkno);
714 1 : }
715 :
716 1 : Self::DbSize(req) => {
717 1 : bytes.put_u8(3);
718 1 : bytes.put_u8(u8::from(req.latest));
719 1 : bytes.put_u64(req.lsn.0);
720 1 : bytes.put_u32(req.dbnode);
721 1 : }
722 : }
723 :
724 4 : bytes.into()
725 4 : }
726 :
727 3779080 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
728 : // TODO these gets can fail
729 :
730 : // these correspond to the NeonMessageTag enum in pagestore_client.h
731 : //
732 : // TODO: consider using protobuf or serde bincode for less error prone
733 : // serialization.
734 3779080 : let msg_tag = body.read_u8()?;
735 3779080 : match msg_tag {
736 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
737 46664 : latest: body.read_u8()? != 0,
738 46664 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
739 : rel: RelTag {
740 46664 : spcnode: body.read_u32::<BigEndian>()?,
741 46664 : dbnode: body.read_u32::<BigEndian>()?,
742 46664 : relnode: body.read_u32::<BigEndian>()?,
743 46664 : forknum: body.read_u8()?,
744 : },
745 : })),
746 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
747 17068 : latest: body.read_u8()? != 0,
748 17068 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
749 : rel: RelTag {
750 17068 : spcnode: body.read_u32::<BigEndian>()?,
751 17068 : dbnode: body.read_u32::<BigEndian>()?,
752 17068 : relnode: body.read_u32::<BigEndian>()?,
753 17068 : forknum: body.read_u8()?,
754 : },
755 : })),
756 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
757 3715342 : latest: body.read_u8()? != 0,
758 3715342 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
759 : rel: RelTag {
760 3715342 : spcnode: body.read_u32::<BigEndian>()?,
761 3715342 : dbnode: body.read_u32::<BigEndian>()?,
762 3715342 : relnode: body.read_u32::<BigEndian>()?,
763 3715342 : forknum: body.read_u8()?,
764 : },
765 3715342 : blkno: body.read_u32::<BigEndian>()?,
766 : })),
767 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
768 6 : latest: body.read_u8()? != 0,
769 6 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
770 6 : dbnode: body.read_u32::<BigEndian>()?,
771 : })),
772 UBC 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
773 : }
774 CBC 3779080 : }
775 : }
776 :
777 : impl PagestreamBeMessage {
778 3779057 : pub fn serialize(&self) -> Bytes {
779 3779057 : let mut bytes = BytesMut::new();
780 3779057 :
781 3779057 : match self {
782 46663 : Self::Exists(resp) => {
783 46663 : bytes.put_u8(100); /* tag from pagestore_client.h */
784 46663 : bytes.put_u8(resp.exists as u8);
785 46663 : }
786 :
787 17067 : Self::Nblocks(resp) => {
788 17067 : bytes.put_u8(101); /* tag from pagestore_client.h */
789 17067 : bytes.put_u32(resp.n_blocks);
790 17067 : }
791 :
792 3715321 : Self::GetPage(resp) => {
793 3715321 : bytes.put_u8(102); /* tag from pagestore_client.h */
794 3715321 : bytes.put(&resp.page[..]);
795 3715321 : }
796 :
797 1 : Self::Error(resp) => {
798 1 : bytes.put_u8(103); /* tag from pagestore_client.h */
799 1 : bytes.put(resp.message.as_bytes());
800 1 : bytes.put_u8(0); // null terminator
801 1 : }
802 5 : Self::DbSize(resp) => {
803 5 : bytes.put_u8(104); /* tag from pagestore_client.h */
804 5 : bytes.put_i64(resp.db_size);
805 5 : }
806 : }
807 :
808 3779057 : bytes.into()
809 3779057 : }
810 : }
811 :
812 : #[cfg(test)]
813 : mod tests {
814 : use bytes::Buf;
815 : use serde_json::json;
816 :
817 : use super::*;
818 :
819 1 : #[test]
820 1 : fn test_pagestream() {
821 1 : // Test serialization/deserialization of PagestreamFeMessage
822 1 : let messages = vec![
823 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
824 1 : latest: true,
825 1 : lsn: Lsn(4),
826 1 : rel: RelTag {
827 1 : forknum: 1,
828 1 : spcnode: 2,
829 1 : dbnode: 3,
830 1 : relnode: 4,
831 1 : },
832 1 : }),
833 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
834 1 : latest: false,
835 1 : lsn: Lsn(4),
836 1 : rel: RelTag {
837 1 : forknum: 1,
838 1 : spcnode: 2,
839 1 : dbnode: 3,
840 1 : relnode: 4,
841 1 : },
842 1 : }),
843 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
844 1 : latest: true,
845 1 : lsn: Lsn(4),
846 1 : rel: RelTag {
847 1 : forknum: 1,
848 1 : spcnode: 2,
849 1 : dbnode: 3,
850 1 : relnode: 4,
851 1 : },
852 1 : blkno: 7,
853 1 : }),
854 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
855 1 : latest: true,
856 1 : lsn: Lsn(4),
857 1 : dbnode: 7,
858 1 : }),
859 1 : ];
860 5 : for msg in messages {
861 4 : let bytes = msg.serialize();
862 4 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
863 4 : assert!(msg == reconstructed);
864 : }
865 1 : }
866 :
867 1 : #[test]
868 1 : fn test_tenantinfo_serde() {
869 1 : // Test serialization/deserialization of TenantInfo
870 1 : let original_active = TenantInfo {
871 1 : id: TenantId::generate(),
872 1 : state: TenantState::Active,
873 1 : current_physical_size: Some(42),
874 1 : attachment_status: TenantAttachmentStatus::Attached,
875 1 : };
876 1 : let expected_active = json!({
877 1 : "id": original_active.id.to_string(),
878 1 : "state": {
879 1 : "slug": "Active",
880 1 : },
881 1 : "current_physical_size": 42,
882 1 : "attachment_status": {
883 1 : "slug":"attached",
884 1 : }
885 1 : });
886 1 :
887 1 : let original_broken = TenantInfo {
888 1 : id: TenantId::generate(),
889 1 : state: TenantState::Broken {
890 1 : reason: "reason".into(),
891 1 : backtrace: "backtrace info".into(),
892 1 : },
893 1 : current_physical_size: Some(42),
894 1 : attachment_status: TenantAttachmentStatus::Attached,
895 1 : };
896 1 : let expected_broken = json!({
897 1 : "id": original_broken.id.to_string(),
898 1 : "state": {
899 1 : "slug": "Broken",
900 1 : "data": {
901 1 : "backtrace": "backtrace info",
902 1 : "reason": "reason",
903 1 : }
904 1 : },
905 1 : "current_physical_size": 42,
906 1 : "attachment_status": {
907 1 : "slug":"attached",
908 1 : }
909 1 : });
910 1 :
911 1 : assert_eq!(
912 1 : serde_json::to_value(&original_active).unwrap(),
913 1 : expected_active
914 1 : );
915 :
916 1 : assert_eq!(
917 1 : serde_json::to_value(&original_broken).unwrap(),
918 1 : expected_broken
919 1 : );
920 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
921 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
922 1 : }
923 :
924 1 : #[test]
925 1 : fn test_reject_unknown_field() {
926 1 : let id = TenantId::generate();
927 1 : let create_request = json!({
928 1 : "new_tenant_id": id.to_string(),
929 1 : "unknown_field": "unknown_value".to_string(),
930 1 : });
931 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
932 1 : assert!(
933 1 : err.to_string().contains("unknown field `unknown_field`"),
934 UBC 0 : "expect unknown field `unknown_field` error, got: {}",
935 : err
936 : );
937 :
938 CBC 1 : let id = TenantId::generate();
939 1 : let config_request = json!({
940 1 : "tenant_id": id.to_string(),
941 1 : "unknown_field": "unknown_value".to_string(),
942 1 : });
943 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
944 1 : assert!(
945 1 : err.to_string().contains("unknown field `unknown_field`"),
946 UBC 0 : "expect unknown field `unknown_field` error, got: {}",
947 : err
948 : );
949 :
950 CBC 1 : let attach_request = json!({
951 1 : "config": {
952 1 : "unknown_field": "unknown_value".to_string(),
953 1 : },
954 1 : });
955 1 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
956 1 : assert!(
957 1 : err.to_string().contains("unknown field `unknown_field`"),
958 UBC 0 : "expect unknown field `unknown_field` error, got: {}",
959 : err
960 : );
961 CBC 1 : }
962 :
963 1 : #[test]
964 1 : fn tenantstatus_activating_serde() {
965 1 : let states = [
966 1 : TenantState::Activating(ActivatingFrom::Loading),
967 1 : TenantState::Activating(ActivatingFrom::Attaching),
968 1 : ];
969 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
970 1 :
971 1 : let actual = serde_json::to_string(&states).unwrap();
972 1 :
973 1 : assert_eq!(actual, expected);
974 :
975 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
976 1 :
977 1 : assert_eq!(states.as_slice(), &parsed);
978 1 : }
979 :
980 1 : #[test]
981 1 : fn tenantstatus_activating_strum() {
982 1 : // tests added, because we use these for metrics
983 1 : let examples = [
984 1 : (line!(), TenantState::Loading, "Loading"),
985 1 : (line!(), TenantState::Attaching, "Attaching"),
986 1 : (
987 1 : line!(),
988 1 : TenantState::Activating(ActivatingFrom::Loading),
989 1 : "Activating",
990 1 : ),
991 1 : (
992 1 : line!(),
993 1 : TenantState::Activating(ActivatingFrom::Attaching),
994 1 : "Activating",
995 1 : ),
996 1 : (line!(), TenantState::Active, "Active"),
997 1 : (
998 1 : line!(),
999 1 : TenantState::Stopping {
1000 1 : progress: utils::completion::Barrier::default(),
1001 1 : },
1002 1 : "Stopping",
1003 1 : ),
1004 1 : (
1005 1 : line!(),
1006 1 : TenantState::Broken {
1007 1 : reason: "Example".into(),
1008 1 : backtrace: "Looooong backtrace".into(),
1009 1 : },
1010 1 : "Broken",
1011 1 : ),
1012 1 : ];
1013 :
1014 8 : for (line, rendered, expected) in examples {
1015 7 : let actual: &'static str = rendered.into();
1016 7 : assert_eq!(actual, expected, "example on {line}");
1017 : }
1018 1 : }
1019 : }
|