TLA Line data Source code
1 : pub mod partitioning;
2 :
3 : use std::{
4 : collections::HashMap,
5 : io::Read,
6 : num::{NonZeroU64, NonZeroUsize},
7 : time::SystemTime,
8 : };
9 :
10 : use byteorder::{BigEndian, ReadBytesExt};
11 : use serde::{Deserialize, Serialize};
12 : use serde_with::serde_as;
13 : use strum_macros;
14 : use utils::{
15 : completion,
16 : history_buffer::HistoryBufferWithDropCounter,
17 : id::{NodeId, TenantId, TimelineId},
18 : lsn::Lsn,
19 : };
20 :
21 : use crate::{reltag::RelTag, shard::TenantShardId};
22 : use anyhow::bail;
23 : use bytes::{Buf, BufMut, Bytes, BytesMut};
24 :
25 : /// The state of a tenant in this pageserver.
26 : ///
27 : /// ```mermaid
28 : /// stateDiagram-v2
29 : ///
30 : /// [*] --> Loading: spawn_load()
31 : /// [*] --> Attaching: spawn_attach()
32 : ///
33 : /// Loading --> Activating: activate()
34 : /// Attaching --> Activating: activate()
35 : /// Activating --> Active: infallible
36 : ///
37 : /// Loading --> Broken: load() failure
38 : /// Attaching --> Broken: attach() failure
39 : ///
40 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
41 : /// Stopping --> Broken: late error in remove_tenant_from_memory
42 : ///
43 : /// Broken --> [*]: ignore / detach / shutdown
44 : /// Stopping --> [*]: remove_from_memory complete
45 : ///
46 : /// Active --> Broken: cfg(testing)-only tenant break point
47 : /// ```
48 : #[derive(
49 CBC 23276 : Clone,
50 4534 : PartialEq,
51 : Eq,
52 855 : serde::Serialize,
53 28 : serde::Deserialize,
54 22 : strum_macros::Display,
55 : strum_macros::EnumVariantNames,
56 UBC 0 : strum_macros::AsRefStr,
57 CBC 2997 : strum_macros::IntoStaticStr,
58 : )]
59 : #[serde(tag = "slug", content = "data")]
60 : pub enum TenantState {
61 : /// This tenant is being loaded from local disk.
62 : ///
63 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
64 : Loading,
65 : /// This tenant is being attached to the pageserver.
66 : ///
67 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
68 : Attaching,
69 : /// The tenant is transitioning from Loading/Attaching to Active.
70 : ///
71 : /// While in this state, the individual timelines are being activated.
72 : ///
73 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
74 : Activating(ActivatingFrom),
75 : /// The tenant has finished activating and is open for business.
76 : ///
77 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
78 : Active,
79 : /// The tenant is recognized by pageserver, but it is being detached or the
80 : /// system is being shut down.
81 : ///
82 : /// Transitions out of this state are possible through `set_broken()`.
83 : Stopping {
84 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
85 : // otherwise it will not be skipped during deserialization
86 : #[serde(skip)]
87 : progress: completion::Barrier,
88 : },
89 : /// The tenant is recognized by the pageserver, but can no longer be used for
90 : /// any operations.
91 : ///
92 : /// If the tenant fails to load or attach, it will transition to this state
93 : /// and it is guaranteed that no background tasks are running in its name.
94 : ///
95 : /// The other way to transition into this state is from `Stopping` state
96 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
97 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
98 : Broken { reason: String, backtrace: String },
99 : }
100 :
101 : impl TenantState {
102 626 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
103 : use TenantAttachmentStatus::*;
104 :
105 : // Below TenantState::Activating is used as "transient" or "transparent" state for
106 : // attachment_status determining.
107 UBC 0 : match self {
108 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
109 : // So, technically, we can return Attached here.
110 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
111 : // But, our attach task might still be fetching the remote timelines, etc.
112 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
113 CBC 216 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
114 : // tenant mgr startup distinguishes attaching from loading via marker file.
115 UBC 0 : Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
116 : // We only reach Active after successful load / attach.
117 : // So, call atttachment status Attached.
118 CBC 186 : Self::Active => Attached,
119 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
120 : // However, it also becomes Broken if the regular load fails.
121 : // From Console's perspective there's no practical difference
122 : // because attachment_status is polled by console only during attach operation execution.
123 112 : Self::Broken { reason, .. } => Failed {
124 112 : reason: reason.to_owned(),
125 112 : },
126 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
127 : // we set the Stopping state irrespective of whether the tenant
128 : // has finished attaching or not.
129 112 : Self::Stopping { .. } => Maybe,
130 : }
131 626 : }
132 :
133 57 : pub fn broken_from_reason(reason: String) -> Self {
134 57 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
135 57 : Self::Broken {
136 57 : reason,
137 57 : backtrace: backtrace_str,
138 57 : }
139 57 : }
140 : }
141 :
142 : impl std::fmt::Debug for TenantState {
143 27 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 8 : match self {
145 8 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
146 8 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
147 : }
148 19 : _ => write!(f, "{self}"),
149 : }
150 27 : }
151 : }
152 :
153 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
154 662 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
155 : pub enum ActivatingFrom {
156 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
157 : Loading,
158 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
159 : Attaching,
160 : }
161 :
162 : /// A state of a timeline in pageserver's memory.
163 3158842 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
164 : pub enum TimelineState {
165 : /// The timeline is recognized by the pageserver but is not yet operational.
166 : /// In particular, the walreceiver connection loop is not running for this timeline.
167 : /// It will eventually transition to state Active or Broken.
168 : Loading,
169 : /// The timeline is fully operational.
170 : /// It can be queried, and the walreceiver connection loop is running.
171 : Active,
172 : /// The timeline was previously Loading or Active but is shutting down.
173 : /// It cannot transition back into any other state.
174 : Stopping,
175 : /// The timeline is broken and not operational (previous states: Loading or Active).
176 : Broken { reason: String, backtrace: String },
177 : }
178 :
179 8624 : #[derive(Serialize, Deserialize)]
180 : pub struct TimelineCreateRequest {
181 : pub new_timeline_id: TimelineId,
182 : #[serde(default)]
183 : pub ancestor_timeline_id: Option<TimelineId>,
184 : #[serde(default)]
185 : pub existing_initdb_timeline_id: Option<TimelineId>,
186 : #[serde(default)]
187 : pub ancestor_start_lsn: Option<Lsn>,
188 : pub pg_version: Option<u32>,
189 : }
190 :
191 9940 : #[derive(Serialize, Deserialize, Debug)]
192 : #[serde(deny_unknown_fields)]
193 : pub struct TenantCreateRequest {
194 : pub new_tenant_id: TenantShardId,
195 : #[serde(default)]
196 : #[serde(skip_serializing_if = "Option::is_none")]
197 : pub generation: Option<u32>,
198 : #[serde(flatten)]
199 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
200 : }
201 :
202 15 : #[derive(Deserialize, Debug)]
203 : #[serde(deny_unknown_fields)]
204 : pub struct TenantLoadRequest {
205 : #[serde(default)]
206 : #[serde(skip_serializing_if = "Option::is_none")]
207 : pub generation: Option<u32>,
208 : }
209 :
210 : impl std::ops::Deref for TenantCreateRequest {
211 : type Target = TenantConfig;
212 :
213 UBC 0 : fn deref(&self) -> &Self::Target {
214 0 : &self.config
215 0 : }
216 : }
217 :
218 : /// An alternative representation of `pageserver::tenant::TenantConf` with
219 : /// simpler types.
220 CBC 16038 : #[derive(Serialize, Deserialize, Debug, Default)]
221 : pub struct TenantConfig {
222 : pub checkpoint_distance: Option<u64>,
223 : pub checkpoint_timeout: Option<String>,
224 : pub compaction_target_size: Option<u64>,
225 : pub compaction_period: Option<String>,
226 : pub compaction_threshold: Option<usize>,
227 : pub gc_horizon: Option<u64>,
228 : pub gc_period: Option<String>,
229 : pub image_creation_threshold: Option<usize>,
230 : pub pitr_interval: Option<String>,
231 : pub walreceiver_connect_timeout: Option<String>,
232 : pub lagging_wal_timeout: Option<String>,
233 : pub max_lsn_wal_lag: Option<NonZeroU64>,
234 : pub trace_read_requests: Option<bool>,
235 : // We defer the parsing of the eviction_policy field to the request handler.
236 : // Otherwise we'd have to move the types for eviction policy into this package.
237 : // We might do that once the eviction feature has stabilizied.
238 : // For now, this field is not even documented in the openapi_spec.yml.
239 : pub eviction_policy: Option<serde_json::Value>,
240 : pub min_resident_size_override: Option<u64>,
241 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
242 : pub gc_feedback: Option<bool>,
243 : pub heatmap_period: Option<String>,
244 : }
245 :
246 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
247 : /// lists out all possible states (and the virtual "Detached" state)
248 : /// in a flat form rather than using rust-style enums.
249 324 : #[derive(Serialize, Deserialize, Debug)]
250 : pub enum LocationConfigMode {
251 : AttachedSingle,
252 : AttachedMulti,
253 : AttachedStale,
254 : Secondary,
255 : Detached,
256 : }
257 :
258 84 : #[derive(Serialize, Deserialize, Debug)]
259 : pub struct LocationConfigSecondary {
260 : pub warm: bool,
261 : }
262 :
263 : /// An alternative representation of `pageserver::tenant::LocationConf`,
264 : /// for use in external-facing APIs.
265 1432 : #[derive(Serialize, Deserialize, Debug)]
266 : pub struct LocationConfig {
267 : pub mode: LocationConfigMode,
268 : /// If attaching, in what generation?
269 : #[serde(default)]
270 : pub generation: Option<u32>,
271 : #[serde(default)]
272 : pub secondary_conf: Option<LocationConfigSecondary>,
273 :
274 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
275 : // must be set accurately.
276 : #[serde(default)]
277 : pub shard_number: u8,
278 : #[serde(default)]
279 : pub shard_count: u8,
280 : #[serde(default)]
281 : pub shard_stripe_size: u32,
282 :
283 : // If requesting mode `Secondary`, configuration for that.
284 : // Custom storage configuration for the tenant, if any
285 : pub tenant_conf: TenantConfig,
286 : }
287 :
288 430 : #[derive(Serialize, Deserialize)]
289 : #[serde(transparent)]
290 : pub struct TenantCreateResponse(pub TenantId);
291 :
292 563 : #[derive(Serialize)]
293 : pub struct StatusResponse {
294 : pub id: NodeId,
295 : }
296 :
297 1283 : #[derive(Serialize, Deserialize, Debug)]
298 : #[serde(deny_unknown_fields)]
299 : pub struct TenantLocationConfigRequest {
300 : pub tenant_id: TenantId,
301 : #[serde(flatten)]
302 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
303 : }
304 :
305 438 : #[derive(Serialize, Deserialize, Debug)]
306 : #[serde(deny_unknown_fields)]
307 : pub struct TenantConfigRequest {
308 : pub tenant_id: TenantId,
309 : #[serde(flatten)]
310 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
311 : }
312 :
313 : impl std::ops::Deref for TenantConfigRequest {
314 : type Target = TenantConfig;
315 :
316 UBC 0 : fn deref(&self) -> &Self::Target {
317 0 : &self.config
318 0 : }
319 : }
320 :
321 : impl TenantConfigRequest {
322 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
323 0 : let config = TenantConfig::default();
324 0 : TenantConfigRequest { tenant_id, config }
325 0 : }
326 : }
327 :
328 CBC 228 : #[derive(Debug, Deserialize)]
329 : pub struct TenantAttachRequest {
330 : #[serde(default)]
331 : pub config: TenantAttachConfig,
332 : #[serde(default)]
333 : pub generation: Option<u32>,
334 : }
335 :
336 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
337 : /// its usage inside `TenantAttachRequest`.
338 86 : #[derive(Debug, Serialize, Deserialize, Default)]
339 : #[serde(deny_unknown_fields)]
340 : pub struct TenantAttachConfig {
341 : #[serde(flatten)]
342 : allowing_unknown_fields: TenantConfig,
343 : }
344 :
345 : impl std::ops::Deref for TenantAttachConfig {
346 : type Target = TenantConfig;
347 :
348 44 : fn deref(&self) -> &Self::Target {
349 44 : &self.allowing_unknown_fields
350 44 : }
351 : }
352 :
353 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
354 740 : #[derive(Serialize, Deserialize, Clone)]
355 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
356 : pub enum TenantAttachmentStatus {
357 : Maybe,
358 : Attached,
359 : Failed { reason: String },
360 : }
361 :
362 628 : #[derive(Serialize, Deserialize, Clone)]
363 : pub struct TenantInfo {
364 : pub id: TenantShardId,
365 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
366 : pub state: TenantState,
367 : /// Sum of the size of all layer files.
368 : /// If a layer is present in both local FS and S3, it counts only once.
369 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
370 : pub attachment_status: TenantAttachmentStatus,
371 : }
372 :
373 466 : #[derive(Serialize, Deserialize, Clone)]
374 : pub struct TenantDetails {
375 : #[serde(flatten)]
376 : pub tenant_info: TenantInfo,
377 :
378 : pub timelines: Vec<TimelineId>,
379 : }
380 :
381 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
382 34740 : #[derive(Debug, Serialize, Deserialize, Clone)]
383 : pub struct TimelineInfo {
384 : pub tenant_id: TenantShardId,
385 : pub timeline_id: TimelineId,
386 :
387 : pub ancestor_timeline_id: Option<TimelineId>,
388 : pub ancestor_lsn: Option<Lsn>,
389 : pub last_record_lsn: Lsn,
390 : pub prev_record_lsn: Option<Lsn>,
391 : pub latest_gc_cutoff_lsn: Lsn,
392 : pub disk_consistent_lsn: Lsn,
393 :
394 : /// The LSN that we have succesfully uploaded to remote storage
395 : pub remote_consistent_lsn: Lsn,
396 :
397 : /// The LSN that we are advertizing to safekeepers
398 : pub remote_consistent_lsn_visible: Lsn,
399 :
400 : /// The LSN from the start of the root timeline (never changes)
401 : pub initdb_lsn: Lsn,
402 :
403 : pub current_logical_size: u64,
404 : pub current_logical_size_is_accurate: bool,
405 :
406 : /// Sum of the size of all layer files.
407 : /// If a layer is present in both local FS and S3, it counts only once.
408 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
409 : pub current_logical_size_non_incremental: Option<u64>,
410 :
411 : pub timeline_dir_layer_file_size_sum: Option<u64>,
412 :
413 : pub wal_source_connstr: Option<String>,
414 : pub last_received_msg_lsn: Option<Lsn>,
415 : /// the timestamp (in microseconds) of the last received message
416 : pub last_received_msg_ts: Option<u128>,
417 : pub pg_version: u32,
418 :
419 : pub state: TimelineState,
420 :
421 : pub walreceiver_status: String,
422 : }
423 :
424 83 : #[derive(Debug, Clone, Serialize)]
425 : pub struct LayerMapInfo {
426 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
427 : pub historic_layers: Vec<HistoricLayerInfo>,
428 : }
429 :
430 30918412 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
431 : #[repr(usize)]
432 : pub enum LayerAccessKind {
433 : GetValueReconstructData,
434 : Iter,
435 : KeyIter,
436 : Dump,
437 : }
438 :
439 10218 : #[derive(Debug, Clone, Serialize, Deserialize)]
440 : pub struct LayerAccessStatFullDetails {
441 : pub when_millis_since_epoch: u64,
442 : pub task_kind: &'static str,
443 : pub access_kind: LayerAccessKind,
444 : }
445 :
446 : /// An event that impacts the layer's residence status.
447 : #[serde_as]
448 6972 : #[derive(Debug, Clone, Serialize, Deserialize)]
449 : pub struct LayerResidenceEvent {
450 : /// The time when the event occurred.
451 : /// NB: this timestamp is captured while the residence status changes.
452 : /// So, it might be behind/ahead of the actual residence change by a short amount of time.
453 : ///
454 : #[serde(rename = "timestamp_millis_since_epoch")]
455 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
456 : pub timestamp: SystemTime,
457 : /// The new residence status of the layer.
458 : pub status: LayerResidenceStatus,
459 : /// The reason why we had to record this event.
460 : pub reason: LayerResidenceEventReason,
461 : }
462 :
463 : /// The reason for recording a given [`LayerResidenceEvent`].
464 6972 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
465 : pub enum LayerResidenceEventReason {
466 : /// The layer map is being populated, e.g. during timeline load or attach.
467 : /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
468 : /// We need to record such events because there is no persistent storage for the events.
469 : ///
470 : // https://github.com/rust-lang/rust/issues/74481
471 : /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
472 : /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
473 : LayerLoad,
474 : /// We just created the layer (e.g., freeze_and_flush or compaction).
475 : /// Such layers are always [`LayerResidenceStatus::Resident`].
476 : LayerCreate,
477 : /// We on-demand downloaded or evicted the given layer.
478 : ResidenceChange,
479 : }
480 :
481 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
482 6972 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
483 : pub enum LayerResidenceStatus {
484 : /// Residence status for a layer file that exists locally.
485 : /// It may also exist on the remote, we don't care here.
486 : Resident,
487 : /// Residence status for a layer file that only exists on the remote.
488 : Evicted,
489 : }
490 :
491 : impl LayerResidenceEvent {
492 180374 : pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
493 180374 : Self {
494 180374 : status,
495 180374 : reason,
496 180374 : timestamp: SystemTime::now(),
497 180374 : }
498 180374 : }
499 : }
500 :
501 2974 : #[derive(Debug, Clone, Serialize)]
502 : pub struct LayerAccessStats {
503 : pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
504 : pub task_kind_access_flag: Vec<&'static str>,
505 : pub first: Option<LayerAccessStatFullDetails>,
506 : pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
507 : pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
508 : }
509 :
510 8 : #[derive(Debug, Clone, Serialize)]
511 : #[serde(tag = "kind")]
512 : pub enum InMemoryLayerInfo {
513 : Open { lsn_start: Lsn },
514 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
515 : }
516 :
517 2974 : #[derive(Debug, Clone, Serialize)]
518 : #[serde(tag = "kind")]
519 : pub enum HistoricLayerInfo {
520 : Delta {
521 : layer_file_name: String,
522 : layer_file_size: u64,
523 :
524 : lsn_start: Lsn,
525 : lsn_end: Lsn,
526 : remote: bool,
527 : access_stats: LayerAccessStats,
528 : },
529 : Image {
530 : layer_file_name: String,
531 : layer_file_size: u64,
532 :
533 : lsn_start: Lsn,
534 : remote: bool,
535 : access_stats: LayerAccessStats,
536 : },
537 : }
538 :
539 6 : #[derive(Debug, Serialize, Deserialize)]
540 : pub struct DownloadRemoteLayersTaskSpawnRequest {
541 : pub max_concurrent_downloads: NonZeroUsize,
542 : }
543 :
544 59 : #[derive(Debug, Serialize, Deserialize, Clone)]
545 : pub struct DownloadRemoteLayersTaskInfo {
546 : pub task_id: String,
547 : pub state: DownloadRemoteLayersTaskState,
548 : pub total_layer_count: u64, // stable once `completed`
549 : pub successful_download_count: u64, // stable once `completed`
550 : pub failed_download_count: u64, // stable once `completed`
551 : }
552 :
553 59 : #[derive(Debug, Serialize, Deserialize, Clone)]
554 : pub enum DownloadRemoteLayersTaskState {
555 : Running,
556 : Completed,
557 : ShutDown,
558 : }
559 :
560 966 : #[derive(Debug, Serialize, Deserialize)]
561 : pub struct TimelineGcRequest {
562 : pub gc_horizon: Option<u64>,
563 : }
564 :
565 : // Wrapped in libpq CopyData
566 4 : #[derive(PartialEq, Eq, Debug)]
567 : pub enum PagestreamFeMessage {
568 : Exists(PagestreamExistsRequest),
569 : Nblocks(PagestreamNblocksRequest),
570 : GetPage(PagestreamGetPageRequest),
571 : DbSize(PagestreamDbSizeRequest),
572 : }
573 :
574 : // Wrapped in libpq CopyData
575 UBC 0 : #[derive(strum_macros::EnumProperty)]
576 : pub enum PagestreamBeMessage {
577 : Exists(PagestreamExistsResponse),
578 : Nblocks(PagestreamNblocksResponse),
579 : GetPage(PagestreamGetPageResponse),
580 : Error(PagestreamErrorResponse),
581 : DbSize(PagestreamDbSizeResponse),
582 : }
583 :
584 : // Keep in sync with `pagestore_client.h`
585 : #[repr(u8)]
586 : enum PagestreamBeMessageTag {
587 : Exists = 100,
588 : Nblocks = 101,
589 : GetPage = 102,
590 : Error = 103,
591 : DbSize = 104,
592 : }
593 : impl TryFrom<u8> for PagestreamBeMessageTag {
594 : type Error = u8;
595 0 : fn try_from(value: u8) -> Result<Self, u8> {
596 0 : match value {
597 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
598 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
599 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
600 0 : 103 => Ok(PagestreamBeMessageTag::Error),
601 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
602 0 : _ => Err(value),
603 : }
604 0 : }
605 : }
606 :
607 CBC 1 : #[derive(Debug, PartialEq, Eq)]
608 : pub struct PagestreamExistsRequest {
609 : pub latest: bool,
610 : pub lsn: Lsn,
611 : pub rel: RelTag,
612 : }
613 :
614 1 : #[derive(Debug, PartialEq, Eq)]
615 : pub struct PagestreamNblocksRequest {
616 : pub latest: bool,
617 : pub lsn: Lsn,
618 : pub rel: RelTag,
619 : }
620 :
621 1 : #[derive(Debug, PartialEq, Eq)]
622 : pub struct PagestreamGetPageRequest {
623 : pub latest: bool,
624 : pub lsn: Lsn,
625 : pub rel: RelTag,
626 : pub blkno: u32,
627 : }
628 :
629 1 : #[derive(Debug, PartialEq, Eq)]
630 : pub struct PagestreamDbSizeRequest {
631 : pub latest: bool,
632 : pub lsn: Lsn,
633 : pub dbnode: u32,
634 : }
635 :
636 UBC 0 : #[derive(Debug)]
637 : pub struct PagestreamExistsResponse {
638 : pub exists: bool,
639 : }
640 :
641 0 : #[derive(Debug)]
642 : pub struct PagestreamNblocksResponse {
643 : pub n_blocks: u32,
644 : }
645 :
646 0 : #[derive(Debug)]
647 : pub struct PagestreamGetPageResponse {
648 : pub page: Bytes,
649 : }
650 :
651 0 : #[derive(Debug)]
652 : pub struct PagestreamErrorResponse {
653 : pub message: String,
654 : }
655 :
656 0 : #[derive(Debug)]
657 : pub struct PagestreamDbSizeResponse {
658 : pub db_size: i64,
659 : }
660 :
661 : impl PagestreamFeMessage {
662 CBC 4 : pub fn serialize(&self) -> Bytes {
663 4 : let mut bytes = BytesMut::new();
664 4 :
665 4 : match self {
666 1 : Self::Exists(req) => {
667 1 : bytes.put_u8(0);
668 1 : bytes.put_u8(u8::from(req.latest));
669 1 : bytes.put_u64(req.lsn.0);
670 1 : bytes.put_u32(req.rel.spcnode);
671 1 : bytes.put_u32(req.rel.dbnode);
672 1 : bytes.put_u32(req.rel.relnode);
673 1 : bytes.put_u8(req.rel.forknum);
674 1 : }
675 :
676 1 : Self::Nblocks(req) => {
677 1 : bytes.put_u8(1);
678 1 : bytes.put_u8(u8::from(req.latest));
679 1 : bytes.put_u64(req.lsn.0);
680 1 : bytes.put_u32(req.rel.spcnode);
681 1 : bytes.put_u32(req.rel.dbnode);
682 1 : bytes.put_u32(req.rel.relnode);
683 1 : bytes.put_u8(req.rel.forknum);
684 1 : }
685 :
686 1 : Self::GetPage(req) => {
687 1 : bytes.put_u8(2);
688 1 : bytes.put_u8(u8::from(req.latest));
689 1 : bytes.put_u64(req.lsn.0);
690 1 : bytes.put_u32(req.rel.spcnode);
691 1 : bytes.put_u32(req.rel.dbnode);
692 1 : bytes.put_u32(req.rel.relnode);
693 1 : bytes.put_u8(req.rel.forknum);
694 1 : bytes.put_u32(req.blkno);
695 1 : }
696 :
697 1 : Self::DbSize(req) => {
698 1 : bytes.put_u8(3);
699 1 : bytes.put_u8(u8::from(req.latest));
700 1 : bytes.put_u64(req.lsn.0);
701 1 : bytes.put_u32(req.dbnode);
702 1 : }
703 : }
704 :
705 4 : bytes.into()
706 4 : }
707 :
708 3641540 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
709 : // TODO these gets can fail
710 :
711 : // these correspond to the NeonMessageTag enum in pagestore_client.h
712 : //
713 : // TODO: consider using protobuf or serde bincode for less error prone
714 : // serialization.
715 3641540 : let msg_tag = body.read_u8()?;
716 3641540 : match msg_tag {
717 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
718 42615 : latest: body.read_u8()? != 0,
719 42615 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
720 : rel: RelTag {
721 42615 : spcnode: body.read_u32::<BigEndian>()?,
722 42615 : dbnode: body.read_u32::<BigEndian>()?,
723 42615 : relnode: body.read_u32::<BigEndian>()?,
724 42615 : forknum: body.read_u8()?,
725 : },
726 : })),
727 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
728 15893 : latest: body.read_u8()? != 0,
729 15893 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
730 : rel: RelTag {
731 15893 : spcnode: body.read_u32::<BigEndian>()?,
732 15893 : dbnode: body.read_u32::<BigEndian>()?,
733 15893 : relnode: body.read_u32::<BigEndian>()?,
734 15893 : forknum: body.read_u8()?,
735 : },
736 : })),
737 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
738 3583026 : latest: body.read_u8()? != 0,
739 3583026 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
740 : rel: RelTag {
741 3583026 : spcnode: body.read_u32::<BigEndian>()?,
742 3583026 : dbnode: body.read_u32::<BigEndian>()?,
743 3583026 : relnode: body.read_u32::<BigEndian>()?,
744 3583026 : forknum: body.read_u8()?,
745 : },
746 3583026 : blkno: body.read_u32::<BigEndian>()?,
747 : })),
748 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
749 6 : latest: body.read_u8()? != 0,
750 6 : lsn: Lsn::from(body.read_u64::<BigEndian>()?),
751 6 : dbnode: body.read_u32::<BigEndian>()?,
752 : })),
753 UBC 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
754 : }
755 CBC 3641540 : }
756 : }
757 :
758 : impl PagestreamBeMessage {
759 3641507 : pub fn serialize(&self) -> Bytes {
760 3641507 : let mut bytes = BytesMut::new();
761 3641507 :
762 3641507 : use PagestreamBeMessageTag as Tag;
763 3641507 : match self {
764 42614 : Self::Exists(resp) => {
765 42614 : bytes.put_u8(Tag::Exists as u8);
766 42614 : bytes.put_u8(resp.exists as u8);
767 42614 : }
768 :
769 15892 : Self::Nblocks(resp) => {
770 15892 : bytes.put_u8(Tag::Nblocks as u8);
771 15892 : bytes.put_u32(resp.n_blocks);
772 15892 : }
773 :
774 3582995 : Self::GetPage(resp) => {
775 3582995 : bytes.put_u8(Tag::GetPage as u8);
776 3582995 : bytes.put(&resp.page[..]);
777 3582995 : }
778 :
779 1 : Self::Error(resp) => {
780 1 : bytes.put_u8(Tag::Error as u8);
781 1 : bytes.put(resp.message.as_bytes());
782 1 : bytes.put_u8(0); // null terminator
783 1 : }
784 5 : Self::DbSize(resp) => {
785 5 : bytes.put_u8(Tag::DbSize as u8);
786 5 : bytes.put_i64(resp.db_size);
787 5 : }
788 : }
789 :
790 3641507 : bytes.into()
791 3641507 : }
792 :
793 UBC 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
794 0 : let mut buf = buf.reader();
795 0 : let msg_tag = buf.read_u8()?;
796 :
797 : use PagestreamBeMessageTag as Tag;
798 0 : let ok =
799 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
800 : Tag::Exists => {
801 0 : let exists = buf.read_u8()?;
802 0 : Self::Exists(PagestreamExistsResponse {
803 0 : exists: exists != 0,
804 0 : })
805 : }
806 : Tag::Nblocks => {
807 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
808 0 : Self::Nblocks(PagestreamNblocksResponse { n_blocks })
809 : }
810 : Tag::GetPage => {
811 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
812 0 : buf.read_exact(&mut page)?;
813 0 : PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
814 : }
815 : Tag::Error => {
816 0 : let buf = buf.get_ref();
817 0 : let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
818 0 : let rust_str = cstr.to_str()?;
819 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
820 0 : message: rust_str.to_owned(),
821 0 : })
822 : }
823 : Tag::DbSize => {
824 0 : let db_size = buf.read_i64::<BigEndian>()?;
825 0 : Self::DbSize(PagestreamDbSizeResponse { db_size })
826 : }
827 : };
828 0 : let remaining = buf.into_inner();
829 0 : if !remaining.is_empty() {
830 0 : anyhow::bail!(
831 0 : "remaining bytes in msg with tag={msg_tag}: {}",
832 0 : remaining.len()
833 0 : );
834 0 : }
835 0 : Ok(ok)
836 0 : }
837 :
838 0 : pub fn kind(&self) -> &'static str {
839 0 : match self {
840 0 : Self::Exists(_) => "Exists",
841 0 : Self::Nblocks(_) => "Nblocks",
842 0 : Self::GetPage(_) => "GetPage",
843 0 : Self::Error(_) => "Error",
844 0 : Self::DbSize(_) => "DbSize",
845 : }
846 0 : }
847 : }
848 :
849 : #[cfg(test)]
850 : mod tests {
851 : use bytes::Buf;
852 : use serde_json::json;
853 :
854 : use super::*;
855 :
856 CBC 1 : #[test]
857 1 : fn test_pagestream() {
858 1 : // Test serialization/deserialization of PagestreamFeMessage
859 1 : let messages = vec![
860 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
861 1 : latest: true,
862 1 : lsn: Lsn(4),
863 1 : rel: RelTag {
864 1 : forknum: 1,
865 1 : spcnode: 2,
866 1 : dbnode: 3,
867 1 : relnode: 4,
868 1 : },
869 1 : }),
870 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
871 1 : latest: false,
872 1 : lsn: Lsn(4),
873 1 : rel: RelTag {
874 1 : forknum: 1,
875 1 : spcnode: 2,
876 1 : dbnode: 3,
877 1 : relnode: 4,
878 1 : },
879 1 : }),
880 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
881 1 : latest: true,
882 1 : lsn: Lsn(4),
883 1 : rel: RelTag {
884 1 : forknum: 1,
885 1 : spcnode: 2,
886 1 : dbnode: 3,
887 1 : relnode: 4,
888 1 : },
889 1 : blkno: 7,
890 1 : }),
891 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
892 1 : latest: true,
893 1 : lsn: Lsn(4),
894 1 : dbnode: 7,
895 1 : }),
896 1 : ];
897 5 : for msg in messages {
898 4 : let bytes = msg.serialize();
899 4 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
900 4 : assert!(msg == reconstructed);
901 : }
902 1 : }
903 :
904 1 : #[test]
905 1 : fn test_tenantinfo_serde() {
906 1 : // Test serialization/deserialization of TenantInfo
907 1 : let original_active = TenantInfo {
908 1 : id: TenantShardId::unsharded(TenantId::generate()),
909 1 : state: TenantState::Active,
910 1 : current_physical_size: Some(42),
911 1 : attachment_status: TenantAttachmentStatus::Attached,
912 1 : };
913 1 : let expected_active = json!({
914 1 : "id": original_active.id.to_string(),
915 1 : "state": {
916 1 : "slug": "Active",
917 1 : },
918 1 : "current_physical_size": 42,
919 1 : "attachment_status": {
920 1 : "slug":"attached",
921 1 : }
922 1 : });
923 1 :
924 1 : let original_broken = TenantInfo {
925 1 : id: TenantShardId::unsharded(TenantId::generate()),
926 1 : state: TenantState::Broken {
927 1 : reason: "reason".into(),
928 1 : backtrace: "backtrace info".into(),
929 1 : },
930 1 : current_physical_size: Some(42),
931 1 : attachment_status: TenantAttachmentStatus::Attached,
932 1 : };
933 1 : let expected_broken = json!({
934 1 : "id": original_broken.id.to_string(),
935 1 : "state": {
936 1 : "slug": "Broken",
937 1 : "data": {
938 1 : "backtrace": "backtrace info",
939 1 : "reason": "reason",
940 1 : }
941 1 : },
942 1 : "current_physical_size": 42,
943 1 : "attachment_status": {
944 1 : "slug":"attached",
945 1 : }
946 1 : });
947 1 :
948 1 : assert_eq!(
949 1 : serde_json::to_value(&original_active).unwrap(),
950 1 : expected_active
951 1 : );
952 :
953 1 : assert_eq!(
954 1 : serde_json::to_value(&original_broken).unwrap(),
955 1 : expected_broken
956 1 : );
957 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
958 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
959 1 : }
960 :
961 1 : #[test]
962 1 : fn test_reject_unknown_field() {
963 1 : let id = TenantId::generate();
964 1 : let create_request = json!({
965 1 : "new_tenant_id": id.to_string(),
966 1 : "unknown_field": "unknown_value".to_string(),
967 1 : });
968 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
969 1 : assert!(
970 1 : err.to_string().contains("unknown field `unknown_field`"),
971 UBC 0 : "expect unknown field `unknown_field` error, got: {}",
972 : err
973 : );
974 :
975 CBC 1 : let id = TenantId::generate();
976 1 : let config_request = json!({
977 1 : "tenant_id": id.to_string(),
978 1 : "unknown_field": "unknown_value".to_string(),
979 1 : });
980 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
981 1 : assert!(
982 1 : err.to_string().contains("unknown field `unknown_field`"),
983 UBC 0 : "expect unknown field `unknown_field` error, got: {}",
984 : err
985 : );
986 :
987 CBC 1 : let attach_request = json!({
988 1 : "config": {
989 1 : "unknown_field": "unknown_value".to_string(),
990 1 : },
991 1 : });
992 1 : let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
993 1 : assert!(
994 1 : err.to_string().contains("unknown field `unknown_field`"),
995 UBC 0 : "expect unknown field `unknown_field` error, got: {}",
996 : err
997 : );
998 CBC 1 : }
999 :
1000 1 : #[test]
1001 1 : fn tenantstatus_activating_serde() {
1002 1 : let states = [
1003 1 : TenantState::Activating(ActivatingFrom::Loading),
1004 1 : TenantState::Activating(ActivatingFrom::Attaching),
1005 1 : ];
1006 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
1007 1 :
1008 1 : let actual = serde_json::to_string(&states).unwrap();
1009 1 :
1010 1 : assert_eq!(actual, expected);
1011 :
1012 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
1013 1 :
1014 1 : assert_eq!(states.as_slice(), &parsed);
1015 1 : }
1016 :
1017 1 : #[test]
1018 1 : fn tenantstatus_activating_strum() {
1019 1 : // tests added, because we use these for metrics
1020 1 : let examples = [
1021 1 : (line!(), TenantState::Loading, "Loading"),
1022 1 : (line!(), TenantState::Attaching, "Attaching"),
1023 1 : (
1024 1 : line!(),
1025 1 : TenantState::Activating(ActivatingFrom::Loading),
1026 1 : "Activating",
1027 1 : ),
1028 1 : (
1029 1 : line!(),
1030 1 : TenantState::Activating(ActivatingFrom::Attaching),
1031 1 : "Activating",
1032 1 : ),
1033 1 : (line!(), TenantState::Active, "Active"),
1034 1 : (
1035 1 : line!(),
1036 1 : TenantState::Stopping {
1037 1 : progress: utils::completion::Barrier::default(),
1038 1 : },
1039 1 : "Stopping",
1040 1 : ),
1041 1 : (
1042 1 : line!(),
1043 1 : TenantState::Broken {
1044 1 : reason: "Example".into(),
1045 1 : backtrace: "Looooong backtrace".into(),
1046 1 : },
1047 1 : "Broken",
1048 1 : ),
1049 1 : ];
1050 :
1051 8 : for (line, rendered, expected) in examples {
1052 7 : let actual: &'static str = rendered.into();
1053 7 : assert_eq!(actual, expected, "example on {line}");
1054 : }
1055 1 : }
1056 : }
|