Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : num::{NonZeroU64, NonZeroUsize},
4 : time::SystemTime,
5 : };
6 :
7 : use byteorder::{BigEndian, ReadBytesExt};
8 : use serde::{Deserialize, Serialize};
9 : use serde_with::{serde_as, DisplayFromStr};
10 : use strum_macros;
11 : use utils::{
12 : completion,
13 : history_buffer::HistoryBufferWithDropCounter,
14 : id::{NodeId, TenantId, TimelineId},
15 : lsn::Lsn,
16 : };
17 :
18 : use crate::reltag::RelTag;
19 : use anyhow::bail;
20 : use bytes::{BufMut, Bytes, BytesMut};
21 :
22 : /// The state of a tenant in this pageserver.
23 : ///
24 : /// ```mermaid
25 : /// stateDiagram-v2
26 : ///
27 : /// [*] --> Loading: spawn_load()
28 : /// [*] --> Attaching: spawn_attach()
29 : ///
30 : /// Loading --> Activating: activate()
31 : /// Attaching --> Activating: activate()
32 : /// Activating --> Active: infallible
33 : ///
34 : /// Loading --> Broken: load() failure
35 : /// Attaching --> Broken: attach() failure
36 : ///
37 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
38 : /// Stopping --> Broken: late error in remove_tenant_from_memory
39 : ///
40 : /// Broken --> [*]: ignore / detach / shutdown
41 : /// Stopping --> [*]: remove_from_memory complete
42 : ///
43 : /// Active --> Broken: cfg(testing)-only tenant break point
44 : /// ```
45 : #[derive(
46 14838 : Clone,
47 7090 : PartialEq,
48 : Eq,
49 842 : serde::Serialize,
50 28 : serde::Deserialize,
51 18 : strum_macros::Display,
52 : strum_macros::EnumVariantNames,
53 0 : strum_macros::AsRefStr,
54 2950 : strum_macros::IntoStaticStr,
55 : )]
56 : #[serde(tag = "slug", content = "data")]
57 : pub enum TenantState {
58 : /// This tenant is being loaded from local disk.
59 : ///
60 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
61 : Loading,
62 : /// This tenant is being attached to the pageserver.
63 : ///
64 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
65 : Attaching,
66 : /// The tenant is transitioning from Loading/Attaching to Active.
67 : ///
68 : /// While in this state, the individual timelines are being activated.
69 : ///
70 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
71 : Activating(ActivatingFrom),
72 : /// The tenant has finished activating and is open for business.
73 : ///
74 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
75 : Active,
76 : /// The tenant is recognized by pageserver, but it is being detached or the
77 : /// system is being shut down.
78 : ///
79 : /// Transitions out of this state are possible through `set_broken()`.
80 : Stopping {
81 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
82 : // otherwise it will not be skipped during deserialization
83 : #[serde(skip)]
84 : progress: completion::Barrier,
85 : },
86 : /// The tenant is recognized by the pageserver, but can no longer be used for
87 : /// any operations.
88 : ///
89 : /// If the tenant fails to load or attach, it will transition to this state
90 : /// and it is guaranteed that no background tasks are running in its name.
91 : ///
92 : /// The other way to transition into this state is from `Stopping` state
93 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
94 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
95 : Broken { reason: String, backtrace: String },
96 : }
97 :
98 : impl TenantState {
99 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
100 : use TenantAttachmentStatus::*;
101 :
102 : // Below TenantState::Activating is used as "transient" or "transparent" state for
103 : // attachment_status determining.
104 0 : match self {
105 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
106 : // So, technically, we can return Attached here.
107 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
108 : // But, our attach task might still be fetching the remote timelines, etc.
109 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
110 23 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
111 : // tenant mgr startup distinguishes attaching from loading via marker file.
112 : // If it's loading, there is no attach marker file, i.e., attach had finished in the past.
113 57 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
114 : // We only reach Active after successful load / attach.
115 : // So, call atttachment status Attached.
116 181 : Self::Active => Attached,
117 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
118 : // However, it also becomes Broken if the regular load fails.
119 : // From Console's perspective there's no practical difference
120 : // because attachment_status is polled by console only during attach operation execution.
121 115 : Self::Broken { reason, .. } => Failed {
122 115 : reason: reason.to_owned(),
123 115 : },
124 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
125 : // we set the Stopping state irrespective of whether the tenant
126 : // has finished attaching or not.
127 173 : Self::Stopping { .. } => Maybe,
128 : }
129 549 : }
130 :
131 85 : pub fn broken_from_reason(reason: String) -> Self {
132 85 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
133 85 : Self::Broken {
134 85 : reason,
135 85 : backtrace: backtrace_str,
136 85 : }
137 85 : }
138 : }
139 :
140 : impl std::fmt::Debug for TenantState {
141 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 135 : match self {
143 135 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
144 135 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
145 : }
146 18 : _ => write!(f, "{self}"),
147 : }
148 153 : }
149 : }
150 :
151 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
152 247 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
153 : pub enum ActivatingFrom {
154 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
155 : Loading,
156 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
157 : Attaching,
158 : }
159 :
160 : /// A state of a timeline in pageserver's memory.
161 3085855 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
162 : pub enum TimelineState {
163 : /// The timeline is recognized by the pageserver but is not yet operational.
164 : /// In particular, the walreceiver connection loop is not running for this timeline.
165 : /// It will eventually transition to state Active or Broken.
166 : Loading,
167 : /// The timeline is fully operational.
168 : /// It can be queried, and the walreceiver connection loop is running.
169 : Active,
170 : /// The timeline was previously Loading or Active but is shutting down.
171 : /// It cannot transition back into any other state.
172 : Stopping,
173 : /// The timeline is broken and not operational (previous states: Loading or Active).
174 : Broken { reason: String, backtrace: String },
175 : }
176 :
177 : #[serde_as]
178 10884 : #[derive(Serialize, Deserialize)]
179 : pub struct TimelineCreateRequest {
180 : #[serde_as(as = "DisplayFromStr")]
181 : pub new_timeline_id: TimelineId,
182 : #[serde(default)]
183 : #[serde_as(as = "Option<DisplayFromStr>")]
184 : pub ancestor_timeline_id: Option<TimelineId>,
185 : #[serde(default)]
186 : #[serde_as(as = "Option<DisplayFromStr>")]
187 : pub ancestor_start_lsn: Option<Lsn>,
188 : pub pg_version: Option<u32>,
189 : }
190 :
191 : #[serde_as]
192 10417 : #[derive(Serialize, Deserialize, Debug)]
193 : #[serde(deny_unknown_fields)]
194 : pub struct TenantCreateRequest {
195 : #[serde_as(as = "DisplayFromStr")]
196 : pub new_tenant_id: TenantId,
197 : #[serde(flatten)]
198 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
199 : }
200 :
201 : impl std::ops::Deref for TenantCreateRequest {
202 : type Target = TenantConfig;
203 :
204 0 : fn deref(&self) -> &Self::Target {
205 0 : &self.config
206 0 : }
207 : }
208 :
209 17168 : #[derive(Serialize, Deserialize, Debug, Default)]
210 : pub struct TenantConfig {
211 : pub checkpoint_distance: Option<u64>,
212 : pub checkpoint_timeout: Option<String>,
213 : pub compaction_target_size: Option<u64>,
214 : pub compaction_period: Option<String>,
215 : pub compaction_threshold: Option<usize>,
216 : pub gc_horizon: Option<u64>,
217 : pub gc_period: Option<String>,
218 : pub image_creation_threshold: Option<usize>,
219 : pub pitr_interval: Option<String>,
220 : pub walreceiver_connect_timeout: Option<String>,
221 : pub lagging_wal_timeout: Option<String>,
222 : pub max_lsn_wal_lag: Option<NonZeroU64>,
223 : pub trace_read_requests: Option<bool>,
224 : // We defer the parsing of the eviction_policy field to the request handler.
225 : // Otherwise we'd have to move the types for eviction policy into this package.
226 : // We might do that once the eviction feature has stabilizied.
227 : // For now, this field is not even documented in the openapi_spec.yml.
228 : pub eviction_policy: Option<serde_json::Value>,
229 : pub min_resident_size_override: Option<u64>,
230 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
231 : pub gc_feedback: Option<bool>,
232 : }
233 :
234 : #[serde_as]
235 479 : #[derive(Serialize, Deserialize)]
236 : #[serde(transparent)]
237 : pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
238 :
239 579 : #[derive(Serialize)]
240 : pub struct StatusResponse {
241 : pub id: NodeId,
242 : }
243 :
244 : impl TenantCreateRequest {
245 0 : pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
246 0 : TenantCreateRequest {
247 0 : new_tenant_id,
248 0 : config: TenantConfig::default(),
249 0 : }
250 0 : }
251 : }
252 :
253 : #[serde_as]
254 420 : #[derive(Serialize, Deserialize, Debug)]
255 : #[serde(deny_unknown_fields)]
256 : pub struct TenantConfigRequest {
257 : #[serde_as(as = "DisplayFromStr")]
258 : pub tenant_id: TenantId,
259 : #[serde(flatten)]
260 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
261 : }
262 :
263 : impl std::ops::Deref for TenantConfigRequest {
264 : type Target = TenantConfig;
265 :
266 0 : fn deref(&self) -> &Self::Target {
267 0 : &self.config
268 0 : }
269 : }
270 :
271 : impl TenantConfigRequest {
272 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
273 0 : let config = TenantConfig {
274 0 : checkpoint_distance: None,
275 0 : checkpoint_timeout: None,
276 0 : compaction_target_size: None,
277 0 : compaction_period: None,
278 0 : compaction_threshold: None,
279 0 : gc_horizon: None,
280 0 : gc_period: None,
281 0 : image_creation_threshold: None,
282 0 : pitr_interval: None,
283 0 : walreceiver_connect_timeout: None,
284 0 : lagging_wal_timeout: None,
285 0 : max_lsn_wal_lag: None,
286 0 : trace_read_requests: None,
287 0 : eviction_policy: None,
288 0 : min_resident_size_override: None,
289 0 : evictions_low_residence_duration_metric_threshold: None,
290 0 : gc_feedback: None,
291 0 : };
292 0 : TenantConfigRequest { tenant_id, config }
293 0 : }
294 : }
295 :
296 150 : #[derive(Debug, Serialize, Deserialize)]
297 : pub struct TenantAttachRequest {
298 : pub config: TenantAttachConfig,
299 : }
300 :
301 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
302 : /// its usage inside `TenantAttachRequest`.
303 94 : #[derive(Debug, Serialize, Deserialize)]
304 : #[serde(deny_unknown_fields)]
305 : pub struct TenantAttachConfig {
306 : #[serde(flatten)]
307 : allowing_unknown_fields: TenantConfig,
308 : }
309 :
310 : impl std::ops::Deref for TenantAttachConfig {
311 : type Target = TenantConfig;
312 :
313 46 : fn deref(&self) -> &Self::Target {
314 46 : &self.allowing_unknown_fields
315 46 : }
316 : }
317 :
318 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
319 666 : #[derive(Serialize, Deserialize, Clone)]
320 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
321 : pub enum TenantAttachmentStatus {
322 : Maybe,
323 : Attached,
324 : Failed { reason: String },
325 : }
326 :
327 : #[serde_as]
328 551 : #[derive(Serialize, Deserialize, Clone)]
329 : pub struct TenantInfo {
330 : #[serde_as(as = "DisplayFromStr")]
331 : pub id: TenantId,
332 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
333 : pub state: TenantState,
334 : /// Sum of the size of all layer files.
335 : /// If a layer is present in both local FS and S3, it counts only once.
336 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
337 : pub attachment_status: TenantAttachmentStatus,
338 : }
339 :
340 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
341 : #[serde_as]
342 43005 : #[derive(Debug, Serialize, Deserialize, Clone)]
343 : pub struct TimelineInfo {
344 : #[serde_as(as = "DisplayFromStr")]
345 : pub tenant_id: TenantId,
346 : #[serde_as(as = "DisplayFromStr")]
347 : pub timeline_id: TimelineId,
348 :
349 : #[serde_as(as = "Option<DisplayFromStr>")]
350 : pub ancestor_timeline_id: Option<TimelineId>,
351 : #[serde_as(as = "Option<DisplayFromStr>")]
352 : pub ancestor_lsn: Option<Lsn>,
353 : #[serde_as(as = "DisplayFromStr")]
354 : pub last_record_lsn: Lsn,
355 : #[serde_as(as = "Option<DisplayFromStr>")]
356 : pub prev_record_lsn: Option<Lsn>,
357 : #[serde_as(as = "DisplayFromStr")]
358 : pub latest_gc_cutoff_lsn: Lsn,
359 : #[serde_as(as = "DisplayFromStr")]
360 : pub disk_consistent_lsn: Lsn,
361 : #[serde_as(as = "DisplayFromStr")]
362 : pub remote_consistent_lsn: Lsn,
363 : pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
364 : /// Sum of the size of all layer files.
365 : /// If a layer is present in both local FS and S3, it counts only once.
366 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
367 : pub current_logical_size_non_incremental: Option<u64>,
368 :
369 : pub timeline_dir_layer_file_size_sum: Option<u64>,
370 :
371 : pub wal_source_connstr: Option<String>,
372 : #[serde_as(as = "Option<DisplayFromStr>")]
373 : pub last_received_msg_lsn: Option<Lsn>,
374 : /// the timestamp (in microseconds) of the last received message
375 : pub last_received_msg_ts: Option<u128>,
376 : pub pg_version: u32,
377 :
378 : pub state: TimelineState,
379 : }
380 :
381 101 : #[derive(Debug, Clone, Serialize)]
382 : pub struct LayerMapInfo {
383 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
384 : pub historic_layers: Vec<HistoricLayerInfo>,
385 : }
386 :
387 87830160 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
388 : #[repr(usize)]
389 : pub enum LayerAccessKind {
390 : GetValueReconstructData,
391 : Iter,
392 : KeyIter,
393 : Dump,
394 : }
395 :
396 8585 : #[derive(Debug, Clone, Serialize, Deserialize)]
397 : pub struct LayerAccessStatFullDetails {
398 : pub when_millis_since_epoch: u64,
399 : pub task_kind: &'static str,
400 : pub access_kind: LayerAccessKind,
401 : }
402 :
403 : /// An event that impacts the layer's residence status.
404 : #[serde_as]
405 7922 : #[derive(Debug, Clone, Serialize, Deserialize)]
406 : pub struct LayerResidenceEvent {
407 : /// The time when the event occurred.
408 : /// NB: this timestamp is captured while the residence status changes.
409 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
410 : ///
411 : #[serde(rename = "timestamp_millis_since_epoch")]
412 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
413 : pub timestamp: SystemTime,
414 : /// The new residence status of the layer.
415 : pub status: LayerResidenceStatus,
416 : /// The reason why we had to record this event.
417 : pub reason: LayerResidenceEventReason,
418 : }
419 :
420 : /// The reason for recording a given [`LayerResidenceEvent`].
421 7922 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
422 : pub enum LayerResidenceEventReason {
423 : /// The layer map is being populated, e.g. during timeline load or attach.
424 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
425 : /// We need to record such events because there is no persistent storage for the events.
426 : ///
427 : // https://github.com/rust-lang/rust/issues/74481
428 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
429 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
430 : LayerLoad,
431 : /// We just created the layer (e.g., freeze_and_flush or compaction).
432 : /// Such layers are always [`LayerResidenceStatus::Resident`].
433 : LayerCreate,
434 : /// We on-demand downloaded or evicted the given layer.
435 : ResidenceChange,
436 : }
437 :
438 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
439 7922 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
440 : pub enum LayerResidenceStatus {
441 : /// Residence status for a layer file that exists locally.
442 : /// It may also exist on the remote, we don't care here.
443 : Resident,
444 : /// Residence status for a layer file that only exists on the remote.
445 : Evicted,
446 : }
447 :
448 : impl LayerResidenceEvent {
449 52372 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
450 52372 : Self {
451 52372 : status,
452 52372 : reason,
453 52372 : timestamp: SystemTime::now(),
454 52372 : }
455 52372 : }
456 : }
457 :
458 2273 : #[derive(Debug, Clone, Serialize)]
459 : pub struct LayerAccessStats {
460 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
461 : pub task_kind_access_flag: Vec<&'static str>,
462 : pub first: Option<LayerAccessStatFullDetails>,
463 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
464 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
465 : }
466 :
467 : #[serde_as]
468 23 : #[derive(Debug, Clone, Serialize)]
469 : #[serde(tag = "kind")]
470 : pub enum InMemoryLayerInfo {
471 : Open {
472 : #[serde_as(as = "DisplayFromStr")]
473 : lsn_start: Lsn,
474 : },
475 : Frozen {
476 : #[serde_as(as = "DisplayFromStr")]
477 : lsn_start: Lsn,
478 : #[serde_as(as = "DisplayFromStr")]
479 : lsn_end: Lsn,
480 : },
481 : }
482 :
483 : #[serde_as]
484 2273 : #[derive(Debug, Clone, Serialize)]
485 : #[serde(tag = "kind")]
486 : pub enum HistoricLayerInfo {
487 : Delta {
488 : layer_file_name: String,
489 : layer_file_size: u64,
490 :
491 : #[serde_as(as = "DisplayFromStr")]
492 : lsn_start: Lsn,
493 : #[serde_as(as = "DisplayFromStr")]
494 : lsn_end: Lsn,
495 : remote: bool,
496 : access_stats: LayerAccessStats,
497 : },
498 : Image {
499 : layer_file_name: String,
500 : layer_file_size: u64,
501 :
502 : #[serde_as(as = "DisplayFromStr")]
503 : lsn_start: Lsn,
504 : remote: bool,
505 : access_stats: LayerAccessStats,
506 : },
507 : }
508 :
509 9 : #[derive(Debug, Serialize, Deserialize)]
510 : pub struct DownloadRemoteLayersTaskSpawnRequest {
511 : pub max_concurrent_downloads: NonZeroUsize,
512 : }
513 :
514 24 : #[derive(Debug, Serialize, Deserialize, Clone)]
515 : pub struct DownloadRemoteLayersTaskInfo {
516 : pub task_id: String,
517 : pub state: DownloadRemoteLayersTaskState,
518 : pub total_layer_count: u64, // stable once `completed`
519 : pub successful_download_count: u64, // stable once `completed`
520 : pub failed_download_count: u64, // stable once `completed`
521 : }
522 :
523 24 : #[derive(Debug, Serialize, Deserialize, Clone)]
524 : pub enum DownloadRemoteLayersTaskState {
525 : Running,
526 : Completed,
527 : ShutDown,
528 : }
529 :
530 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
531 :
532 : /// Information for configuring a single fail point
533 1255 : #[derive(Debug, Serialize, Deserialize)]
534 : pub struct FailpointConfig {
535 : /// Name of the fail point
536 : pub name: String,
537 : /// List of actions to take, using the format described in `fail::cfg`
538 : ///
539 : /// We also support `actions = "exit"` to cause the fail point to immediately exit.
540 : pub actions: String,
541 : }
542 :
543 1515 : #[derive(Debug, Serialize, Deserialize)]
544 : pub struct TimelineGcRequest {
545 : pub gc_horizon: Option<u64>,
546 : }
547 :
548 : // Wrapped in libpq CopyData
549 4 : #[derive(PartialEq, Eq, Debug)]
550 : pub enum PagestreamFeMessage {
551 : Exists(PagestreamExistsRequest),
552 : Nblocks(PagestreamNblocksRequest),
553 : GetPage(PagestreamGetPageRequest),
554 : DbSize(PagestreamDbSizeRequest),
555 : }
556 :
557 : // Wrapped in libpq CopyData
558 : pub enum PagestreamBeMessage {
559 : Exists(PagestreamExistsResponse),
560 : Nblocks(PagestreamNblocksResponse),
561 : GetPage(PagestreamGetPageResponse),
562 : Error(PagestreamErrorResponse),
563 : DbSize(PagestreamDbSizeResponse),
564 : }
565 :
566 1 : #[derive(Debug, PartialEq, Eq)]
567 : pub struct PagestreamExistsRequest {
568 : pub latest: bool,
569 : pub lsn: Lsn,
570 : pub rel: RelTag,
571 : }
572 :
573 1 : #[derive(Debug, PartialEq, Eq)]
574 : pub struct PagestreamNblocksRequest {
575 : pub latest: bool,
576 : pub lsn: Lsn,
577 : pub rel: RelTag,
578 : }
579 :
580 1 : #[derive(Debug, PartialEq, Eq)]
581 : pub struct PagestreamGetPageRequest {
582 : pub latest: bool,
583 : pub lsn: Lsn,
584 : pub rel: RelTag,
585 : pub blkno: u32,
586 : }
587 :
588 1 : #[derive(Debug, PartialEq, Eq)]
589 : pub struct PagestreamDbSizeRequest {
590 : pub latest: bool,
591 : pub lsn: Lsn,
592 : pub dbnode: u32,
593 : }
594 :
595 0 : #[derive(Debug)]
596 : pub struct PagestreamExistsResponse {
597 : pub exists: bool,
598 : }
599 :
600 0 : #[derive(Debug)]
601 : pub struct PagestreamNblocksResponse {
602 : pub n_blocks: u32,
603 : }
604 :
605 0 : #[derive(Debug)]
606 : pub struct PagestreamGetPageResponse {
607 : pub page: Bytes,
608 : }
609 :
610 0 : #[derive(Debug)]
611 : pub struct PagestreamErrorResponse {
612 : pub message: String,
613 : }
614 :
615 0 : #[derive(Debug)]
616 : pub struct PagestreamDbSizeResponse {
617 : pub db_size: i64,
618 : }
619 :
620 : impl PagestreamFeMessage {
621 4 : pub fn serialize(&self) -> Bytes {
622 4 : let mut bytes = BytesMut::new();
623 4 :
624 4 : match self {
625 1 : Self::Exists(req) => {
626 1 : bytes.put_u8(0);
627 1 : bytes.put_u8(u8::from(req.latest));
628 1 : bytes.put_u64(req.lsn.0);
629 1 : bytes.put_u32(req.rel.spcnode);
630 1 : bytes.put_u32(req.rel.dbnode);
631 1 : bytes.put_u32(req.rel.relnode);
632 1 : bytes.put_u8(req.rel.forknum);
633 1 : }
634 :
635 1 : Self::Nblocks(req) => {
636 1 : bytes.put_u8(1);
637 1 : bytes.put_u8(u8::from(req.latest));
638 1 : bytes.put_u64(req.lsn.0);
639 1 : bytes.put_u32(req.rel.spcnode);
640 1 : bytes.put_u32(req.rel.dbnode);
641 1 : bytes.put_u32(req.rel.relnode);
642 1 : bytes.put_u8(req.rel.forknum);
643 1 : }
644 :
645 1 : Self::GetPage(req) => {
646 1 : bytes.put_u8(2);
647 1 : bytes.put_u8(u8::from(req.latest));
648 1 : bytes.put_u64(req.lsn.0);
649 1 : bytes.put_u32(req.rel.spcnode);
650 1 : bytes.put_u32(req.rel.dbnode);
651 1 : bytes.put_u32(req.rel.relnode);
652 1 : bytes.put_u8(req.rel.forknum);
653 1 : bytes.put_u32(req.blkno);
654 1 : }
655 :
656 1 : Self::DbSize(req) => {
657 1 : bytes.put_u8(3);
658 1 : bytes.put_u8(u8::from(req.latest));
659 1 : bytes.put_u64(req.lsn.0);
660 1 : bytes.put_u32(req.dbnode);
661 1 : }
662 : }
663 :
664 4 : bytes.into()
665 4 : }
666 :
667 4599619 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
668 : // TODO these gets can fail
669 :
670 : // these correspond to the NeonMessageTag enum in pagestore_client.h
671 : //
672 : // TODO: consider using protobuf or serde bincode for less error prone
673 : // serialization.
674 4599619 : let msg_tag = body.read_u8()?;
675 4599619 : match msg_tag {
676 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
677 47956 : latest: body.read_u8()? != 0,
678 47956 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
679 : rel: RelTag {
680 47956 : spcnode: body.read_u32::<BigEndian>()?,
681 47956 : dbnode: body.read_u32::<BigEndian>()?,
682 47956 : relnode: body.read_u32::<BigEndian>()?,
683 47956 : forknum: body.read_u8()?,
684 : },
685 : })),
686 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
687 18196 : latest: body.read_u8()? != 0,
688 18196 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
689 : rel: RelTag {
690 18196 : spcnode: body.read_u32::<BigEndian>()?,
691 18196 : dbnode: body.read_u32::<BigEndian>()?,
692 18196 : relnode: body.read_u32::<BigEndian>()?,
693 18196 : forknum: body.read_u8()?,
694 : },
695 : })),
696 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
697 4533461 : latest: body.read_u8()? != 0,
698 4533461 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
699 : rel: RelTag {
700 4533461 : spcnode: body.read_u32::<BigEndian>()?,
701 4533461 : dbnode: body.read_u32::<BigEndian>()?,
702 4533461 : relnode: body.read_u32::<BigEndian>()?,
703 4533461 : forknum: body.read_u8()?,
704 : },
705 4533461 : blkno: body.read_u32::<BigEndian>()?,
706 : })),
707 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
708 6 : latest: body.read_u8()? != 0,
709 6 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
710 6 : dbnode: body.read_u32::<BigEndian>()?,
711 : })),
712 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
713 : }
714 4599619 : }
715 : }
716 :
717 : impl PagestreamBeMessage {
718 4599585 : pub fn serialize(&self) -> Bytes {
719 4599585 : let mut bytes = BytesMut::new();
720 4599585 :
721 4599585 : match self {
722 47955 : Self::Exists(resp) => {
723 47955 : bytes.put_u8(100); /* tag from pagestore_client.h */
724 47955 : bytes.put_u8(resp.exists as u8);
725 47955 : }
726 :
727 18195 : Self::Nblocks(resp) => {
728 18195 : bytes.put_u8(101); /* tag from pagestore_client.h */
729 18195 : bytes.put_u32(resp.n_blocks);
730 18195 : }
731 :
732 4533429 : Self::GetPage(resp) => {
733 4533429 : bytes.put_u8(102); /* tag from pagestore_client.h */
734 4533429 : bytes.put(&resp.page[..]);
735 4533429 : }
736 :
737 1 : Self::Error(resp) => {
738 1 : bytes.put_u8(103); /* tag from pagestore_client.h */
739 1 : bytes.put(resp.message.as_bytes());
740 1 : bytes.put_u8(0); // null terminator
741 1 : }
742 5 : Self::DbSize(resp) => {
743 5 : bytes.put_u8(104); /* tag from pagestore_client.h */
744 5 : bytes.put_i64(resp.db_size);
745 5 : }
746 : }
747 :
748 4599585 : bytes.into()
749 4599585 : }
750 : }
751 :
752 : #[cfg(test)]
753 : mod tests {
754 : use bytes::Buf;
755 : use serde_json::json;
756 :
757 : use super::*;
758 :
759 1 : #[test]
760 1 : fn test_pagestream() {
761 1 : // Test serialization/deserialization of PagestreamFeMessage
762 1 : let messages = vec![
763 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
764 1 : latest: true,
765 1 : lsn: Lsn(4),
766 1 : rel: RelTag {
767 1 : forknum: 1,
768 1 : spcnode: 2,
769 1 : dbnode: 3,
770 1 : relnode: 4,
771 1 : },
772 1 : }),
773 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
774 1 : latest: false,
775 1 : lsn: Lsn(4),
776 1 : rel: RelTag {
777 1 : forknum: 1,
778 1 : spcnode: 2,
779 1 : dbnode: 3,
780 1 : relnode: 4,
781 1 : },
782 1 : }),
783 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
784 1 : latest: true,
785 1 : lsn: Lsn(4),
786 1 : rel: RelTag {
787 1 : forknum: 1,
788 1 : spcnode: 2,
789 1 : dbnode: 3,
790 1 : relnode: 4,
791 1 : },
792 1 : blkno: 7,
793 1 : }),
794 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
795 1 : latest: true,
796 1 : lsn: Lsn(4),
797 1 : dbnode: 7,
798 1 : }),
799 1 : ];
800 5 : for msg in messages {
801 4 : let bytes = msg.serialize();
802 4 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
803 4 : assert!(msg == reconstructed);
804 : }
805 1 : }
806 :
807 1 : #[test]
808 1 : fn test_tenantinfo_serde() {
809 1 : // Test serialization/deserialization of TenantInfo
810 1 : let original_active = TenantInfo {
811 1 : id: TenantId::generate(),
812 1 : state: TenantState::Active,
813 1 : current_physical_size: Some(42),
814 1 : attachment_status: TenantAttachmentStatus::Attached,
815 1 : };
816 1 : let expected_active = json!({
817 1 : "id": original_active.id.to_string(),
818 1 : "state": {
819 1 : "slug": "Active",
820 1 : },
821 1 : "current_physical_size": 42,
822 1 : "attachment_status": {
823 1 : "slug":"attached",
824 1 : }
825 1 : });
826 1 :
827 1 : let original_broken = TenantInfo {
828 1 : id: TenantId::generate(),
829 1 : state: TenantState::Broken {
830 1 : reason: "reason".into(),
831 1 : backtrace: "backtrace info".into(),
832 1 : },
833 1 : current_physical_size: Some(42),
834 1 : attachment_status: TenantAttachmentStatus::Attached,
835 1 : };
836 1 : let expected_broken = json!({
837 1 : "id": original_broken.id.to_string(),
838 1 : "state": {
839 1 : "slug": "Broken",
840 1 : "data": {
841 1 : "backtrace": "backtrace info",
842 1 : "reason": "reason",
843 1 : }
844 1 : },
845 1 : "current_physical_size": 42,
846 1 : "attachment_status": {
847 1 : "slug":"attached",
848 1 : }
849 1 : });
850 1 :
851 1 : assert_eq!(
852 1 : serde_json::to_value(&original_active).unwrap(),
853 1 : expected_active
854 1 : );
855 :
856 1 : assert_eq!(
857 1 : serde_json::to_value(&original_broken).unwrap(),
858 1 : expected_broken
859 1 : );
860 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
861 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
862 1 : }
863 :
864 1 : #[test]
865 1 : fn test_reject_unknown_field() {
866 1 : let id = TenantId::generate();
867 1 : let create_request = json!({
868 1 : "new_tenant_id": id.to_string(),
869 1 : "unknown_field": "unknown_value".to_string(),
870 1 : });
871 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
872 1 : assert!(
873 1 : err.to_string().contains("unknown field `unknown_field`"),
874 0 : "expect unknown field `unknown_field` error, got: {}",
875 : err
876 : );
877 :
878 1 : let id = TenantId::generate();
879 1 : let config_request = json!({
880 1 : "tenant_id": id.to_string(),
881 1 : "unknown_field": "unknown_value".to_string(),
882 1 : });
883 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
884 1 : assert!(
885 1 : err.to_string().contains("unknown field `unknown_field`"),
886 0 : "expect unknown field `unknown_field` error, got: {}",
887 : err
888 : );
889 :
890 1 : let attach_request = json!({
891 1 : "config": {
892 1 : "unknown_field": "unknown_value".to_string(),
893 1 : },
894 1 : });
895 1 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
896 1 : assert!(
897 1 : err.to_string().contains("unknown field `unknown_field`"),
898 0 : "expect unknown field `unknown_field` error, got: {}",
899 : err
900 : );
901 1 : }
902 :
903 1 : #[test]
904 1 : fn tenantstatus_activating_serde() {
905 1 : let states = [
906 1 : TenantState::Activating(ActivatingFrom::Loading),
907 1 : TenantState::Activating(ActivatingFrom::Attaching),
908 1 : ];
909 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
910 1 :
911 1 : let actual = serde_json::to_string(&states).unwrap();
912 1 :
913 1 : assert_eq!(actual, expected);
914 :
915 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
916 1 :
917 1 : assert_eq!(states.as_slice(), &parsed);
918 1 : }
919 :
920 1 : #[test]
921 1 : fn tenantstatus_activating_strum() {
922 1 : // tests added, because we use these for metrics
923 1 : let examples = [
924 1 : (line!(), TenantState::Loading, "Loading"),
925 1 : (line!(), TenantState::Attaching, "Attaching"),
926 1 : (
927 1 : line!(),
928 1 : TenantState::Activating(ActivatingFrom::Loading),
929 1 : "Activating",
930 1 : ),
931 1 : (
932 1 : line!(),
933 1 : TenantState::Activating(ActivatingFrom::Attaching),
934 1 : "Activating",
935 1 : ),
936 1 : (line!(), TenantState::Active, "Active"),
937 1 : (
938 1 : line!(),
939 1 : TenantState::Stopping {
940 1 : progress: utils::completion::Barrier::default(),
941 1 : },
942 1 : "Stopping",
943 1 : ),
944 1 : (
945 1 : line!(),
946 1 : TenantState::Broken {
947 1 : reason: "Example".into(),
948 1 : backtrace: "Looooong backtrace".into(),
949 1 : },
950 1 : "Broken",
951 1 : ),
952 1 : ];
953 :
954 8 : for (line, rendered, expected) in examples {
955 7 : let actual: &'static str = rendered.into();
956 7 : assert_eq!(actual, expected, "example on {line}");
957 : }
958 1 : }
959 : }
|