Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : #[cfg(feature = "testing")]
6 : use camino::Utf8PathBuf;
7 : pub use utilization::PageserverUtilization;
8 :
9 : use std::{
10 : collections::HashMap,
11 : fmt::Display,
12 : io::{BufRead, Read},
13 : num::{NonZeroU32, NonZeroU64, NonZeroUsize},
14 : str::FromStr,
15 : time::{Duration, SystemTime},
16 : };
17 :
18 : use byteorder::{BigEndian, ReadBytesExt};
19 : use postgres_ffi::BLCKSZ;
20 : use serde::{Deserialize, Serialize};
21 : use serde_with::serde_as;
22 : use utils::{
23 : completion,
24 : id::{NodeId, TenantId, TimelineId},
25 : lsn::Lsn,
26 : postgres_client::PostgresClientProtocol,
27 : serde_system_time,
28 : };
29 :
30 : use crate::{
31 : reltag::RelTag,
32 : shard::{ShardCount, ShardStripeSize, TenantShardId},
33 : };
34 : use anyhow::bail;
35 : use bytes::{Buf, BufMut, Bytes, BytesMut};
36 :
37 : /// The state of a tenant in this pageserver.
38 : ///
39 : /// ```mermaid
40 : /// stateDiagram-v2
41 : ///
42 : /// [*] --> Attaching: spawn_attach()
43 : ///
44 : /// Attaching --> Activating: activate()
45 : /// Activating --> Active: infallible
46 : ///
47 : /// Attaching --> Broken: attach() failure
48 : ///
49 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
50 : /// Stopping --> Broken: late error in remove_tenant_from_memory
51 : ///
52 : /// Broken --> [*]: ignore / detach / shutdown
53 : /// Stopping --> [*]: remove_from_memory complete
54 : ///
55 : /// Active --> Broken: cfg(testing)-only tenant break point
56 : /// ```
57 : #[derive(
58 : Clone,
59 : PartialEq,
60 : Eq,
61 1 : serde::Serialize,
62 3 : serde::Deserialize,
63 0 : strum_macros::Display,
64 : strum_macros::VariantNames,
65 0 : strum_macros::AsRefStr,
66 389 : strum_macros::IntoStaticStr,
67 : )]
68 : #[serde(tag = "slug", content = "data")]
69 : pub enum TenantState {
70 : /// This tenant is being attached to the pageserver.
71 : ///
72 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
73 : Attaching,
74 : /// The tenant is transitioning from Loading/Attaching to Active.
75 : ///
76 : /// While in this state, the individual timelines are being activated.
77 : ///
78 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
79 : Activating(ActivatingFrom),
80 : /// The tenant has finished activating and is open for business.
81 : ///
82 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
83 : Active,
84 : /// The tenant is recognized by pageserver, but it is being detached or the
85 : /// system is being shut down.
86 : ///
87 : /// Transitions out of this state are possible through `set_broken()`.
88 : Stopping {
89 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
90 : // otherwise it will not be skipped during deserialization
91 : #[serde(skip)]
92 : progress: completion::Barrier,
93 : },
94 : /// The tenant is recognized by the pageserver, but can no longer be used for
95 : /// any operations.
96 : ///
97 : /// If the tenant fails to load or attach, it will transition to this state
98 : /// and it is guaranteed that no background tasks are running in its name.
99 : ///
100 : /// The other way to transition into this state is from `Stopping` state
101 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
102 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
103 : Broken { reason: String, backtrace: String },
104 : }
105 :
106 : impl TenantState {
107 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
108 : use TenantAttachmentStatus::*;
109 :
110 : // Below TenantState::Activating is used as "transient" or "transparent" state for
111 : // attachment_status determining.
112 0 : match self {
113 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
114 : // So, technically, we can return Attached here.
115 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
116 : // But, our attach task might still be fetching the remote timelines, etc.
117 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
118 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
119 : // We only reach Active after successful load / attach.
120 : // So, call atttachment status Attached.
121 0 : Self::Active => Attached,
122 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
123 : // However, it also becomes Broken if the regular load fails.
124 : // From Console's perspective there's no practical difference
125 : // because attachment_status is polled by console only during attach operation execution.
126 0 : Self::Broken { reason, .. } => Failed {
127 0 : reason: reason.to_owned(),
128 0 : },
129 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
130 : // we set the Stopping state irrespective of whether the tenant
131 : // has finished attaching or not.
132 0 : Self::Stopping { .. } => Maybe,
133 : }
134 0 : }
135 :
136 0 : pub fn broken_from_reason(reason: String) -> Self {
137 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
138 0 : Self::Broken {
139 0 : reason,
140 0 : backtrace: backtrace_str,
141 0 : }
142 0 : }
143 : }
144 :
145 : impl std::fmt::Debug for TenantState {
146 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 2 : match self {
148 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
149 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
150 : }
151 0 : _ => write!(f, "{self}"),
152 : }
153 2 : }
154 : }
155 :
156 : /// A temporary lease to a specific lsn inside a timeline.
157 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
158 : #[serde_as]
159 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
160 : pub struct LsnLease {
161 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
162 : pub valid_until: SystemTime,
163 : }
164 :
165 : serde_with::serde_conv!(
166 : SystemTimeAsRfc3339Millis,
167 : SystemTime,
168 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
169 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
170 : );
171 :
172 : impl LsnLease {
173 : /// The default length for an explicit LSN lease request (10 minutes).
174 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
175 :
176 : /// The default length for an implicit LSN lease granted during
177 : /// `get_lsn_by_timestamp` request (1 minutes).
178 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
179 :
180 : /// Checks whether the lease is expired.
181 6 : pub fn is_expired(&self, now: &SystemTime) -> bool {
182 6 : now > &self.valid_until
183 6 : }
184 : }
185 :
186 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
187 : ///
188 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
189 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
190 2 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
191 : pub enum ActivatingFrom {
192 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
193 : Attaching,
194 : }
195 :
196 : /// A state of a timeline in pageserver's memory.
197 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
198 : pub enum TimelineState {
199 : /// The timeline is recognized by the pageserver but is not yet operational.
200 : /// In particular, the walreceiver connection loop is not running for this timeline.
201 : /// It will eventually transition to state Active or Broken.
202 : Loading,
203 : /// The timeline is fully operational.
204 : /// It can be queried, and the walreceiver connection loop is running.
205 : Active,
206 : /// The timeline was previously Loading or Active but is shutting down.
207 : /// It cannot transition back into any other state.
208 : Stopping,
209 : /// The timeline is broken and not operational (previous states: Loading or Active).
210 : Broken { reason: String, backtrace: String },
211 : }
212 :
213 0 : #[derive(Serialize, Deserialize, Clone)]
214 : pub struct TimelineCreateRequest {
215 : pub new_timeline_id: TimelineId,
216 : #[serde(flatten)]
217 : pub mode: TimelineCreateRequestMode,
218 : }
219 :
220 0 : #[derive(Serialize, Deserialize, Clone)]
221 : #[serde(untagged)]
222 : pub enum TimelineCreateRequestMode {
223 : Branch {
224 : ancestor_timeline_id: TimelineId,
225 : #[serde(default)]
226 : ancestor_start_lsn: Option<Lsn>,
227 : // TODO: cplane sets this, but, the branching code always
228 : // inherits the ancestor's pg_version. Earlier code wasn't
229 : // using a flattened enum, so, it was an accepted field, and
230 : // we continue to accept it by having it here.
231 : pg_version: Option<u32>,
232 : },
233 : ImportPgdata {
234 : import_pgdata: TimelineCreateRequestModeImportPgdata,
235 : },
236 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
237 : // (serde picks the first matching enum variant, in declaration order).
238 : Bootstrap {
239 : #[serde(default)]
240 : existing_initdb_timeline_id: Option<TimelineId>,
241 : pg_version: Option<u32>,
242 : },
243 : }
244 :
245 0 : #[derive(Serialize, Deserialize, Clone)]
246 : pub struct TimelineCreateRequestModeImportPgdata {
247 : pub location: ImportPgdataLocation,
248 : pub idempotency_key: ImportPgdataIdempotencyKey,
249 : }
250 :
251 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
252 : pub enum ImportPgdataLocation {
253 : #[cfg(feature = "testing")]
254 : LocalFs { path: Utf8PathBuf },
255 : AwsS3 {
256 : region: String,
257 : bucket: String,
258 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
259 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
260 : key: String,
261 : },
262 : }
263 :
264 0 : #[derive(Serialize, Deserialize, Clone)]
265 : #[serde(transparent)]
266 : pub struct ImportPgdataIdempotencyKey(pub String);
267 :
268 : impl ImportPgdataIdempotencyKey {
269 0 : pub fn random() -> Self {
270 : use rand::{distributions::Alphanumeric, Rng};
271 0 : Self(
272 0 : rand::thread_rng()
273 0 : .sample_iter(&Alphanumeric)
274 0 : .take(20)
275 0 : .map(char::from)
276 0 : .collect(),
277 0 : )
278 0 : }
279 : }
280 :
281 0 : #[derive(Serialize, Deserialize, Clone)]
282 : pub struct LsnLeaseRequest {
283 : pub lsn: Lsn,
284 : }
285 :
286 0 : #[derive(Serialize, Deserialize)]
287 : pub struct TenantShardSplitRequest {
288 : pub new_shard_count: u8,
289 :
290 : // A tenant's stripe size is only meaningful the first time their shard count goes
291 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
292 : //
293 : // If this is set while the stripe count is being increased from an already >1 value,
294 : // then the request will fail with 400.
295 : pub new_stripe_size: Option<ShardStripeSize>,
296 : }
297 :
298 0 : #[derive(Serialize, Deserialize)]
299 : pub struct TenantShardSplitResponse {
300 : pub new_shards: Vec<TenantShardId>,
301 : }
302 :
303 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
304 0 : #[derive(Serialize, Deserialize, Debug)]
305 : #[serde(deny_unknown_fields)]
306 : pub struct ShardParameters {
307 : pub count: ShardCount,
308 : pub stripe_size: ShardStripeSize,
309 : }
310 :
311 : impl ShardParameters {
312 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
313 :
314 0 : pub fn is_unsharded(&self) -> bool {
315 0 : self.count.is_unsharded()
316 0 : }
317 : }
318 :
319 : impl Default for ShardParameters {
320 193 : fn default() -> Self {
321 193 : Self {
322 193 : count: ShardCount::new(0),
323 193 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
324 193 : }
325 193 : }
326 : }
327 :
328 : /// An alternative representation of `pageserver::tenant::TenantConf` with
329 : /// simpler types.
330 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
331 : pub struct TenantConfig {
332 : pub checkpoint_distance: Option<u64>,
333 : pub checkpoint_timeout: Option<String>,
334 : pub compaction_target_size: Option<u64>,
335 : pub compaction_period: Option<String>,
336 : pub compaction_threshold: Option<usize>,
337 : // defer parsing compaction_algorithm, like eviction_policy
338 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
339 : pub gc_horizon: Option<u64>,
340 : pub gc_period: Option<String>,
341 : pub image_creation_threshold: Option<usize>,
342 : pub pitr_interval: Option<String>,
343 : pub walreceiver_connect_timeout: Option<String>,
344 : pub lagging_wal_timeout: Option<String>,
345 : pub max_lsn_wal_lag: Option<NonZeroU64>,
346 : pub eviction_policy: Option<EvictionPolicy>,
347 : pub min_resident_size_override: Option<u64>,
348 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
349 : pub heatmap_period: Option<String>,
350 : pub lazy_slru_download: Option<bool>,
351 : pub timeline_get_throttle: Option<ThrottleConfig>,
352 : pub image_layer_creation_check_threshold: Option<u8>,
353 : pub lsn_lease_length: Option<String>,
354 : pub lsn_lease_length_for_ts: Option<String>,
355 : pub timeline_offloading: Option<bool>,
356 : pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
357 : }
358 :
359 : /// The policy for the aux file storage.
360 : ///
361 : /// It can be switched through `switch_aux_file_policy` tenant config.
362 : /// When the first aux file written, the policy will be persisted in the
363 : /// `index_part.json` file and has a limited migration path.
364 : ///
365 : /// Currently, we only allow the following migration path:
366 : ///
367 : /// Unset -> V1
368 : /// -> V2
369 : /// -> CrossValidation -> V2
370 : #[derive(
371 : Eq,
372 : PartialEq,
373 : Debug,
374 : Copy,
375 : Clone,
376 2 : strum_macros::EnumString,
377 0 : strum_macros::Display,
378 0 : serde_with::DeserializeFromStr,
379 : serde_with::SerializeDisplay,
380 : )]
381 : #[strum(serialize_all = "kebab-case")]
382 : pub enum AuxFilePolicy {
383 : /// V1 aux file policy: store everything in AUX_FILE_KEY
384 : #[strum(ascii_case_insensitive)]
385 : V1,
386 : /// V2 aux file policy: store in the AUX_FILE keyspace
387 : #[strum(ascii_case_insensitive)]
388 : V2,
389 : /// Cross validation runs both formats on the write path and does validation
390 : /// on the read path.
391 : #[strum(ascii_case_insensitive)]
392 : CrossValidation,
393 : }
394 :
395 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
396 : #[serde(tag = "kind")]
397 : pub enum EvictionPolicy {
398 : NoEviction,
399 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
400 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
401 : }
402 :
403 : impl EvictionPolicy {
404 0 : pub fn discriminant_str(&self) -> &'static str {
405 0 : match self {
406 0 : EvictionPolicy::NoEviction => "NoEviction",
407 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
408 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
409 : }
410 0 : }
411 : }
412 :
413 : #[derive(
414 : Eq,
415 : PartialEq,
416 : Debug,
417 : Copy,
418 : Clone,
419 0 : strum_macros::EnumString,
420 0 : strum_macros::Display,
421 0 : serde_with::DeserializeFromStr,
422 : serde_with::SerializeDisplay,
423 : )]
424 : #[strum(serialize_all = "kebab-case")]
425 : pub enum CompactionAlgorithm {
426 : Legacy,
427 : Tiered,
428 : }
429 :
430 : #[derive(
431 0 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
432 : )]
433 : pub enum ImageCompressionAlgorithm {
434 : // Disabled for writes, support decompressing during read path
435 : Disabled,
436 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
437 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
438 : Zstd {
439 : level: Option<i8>,
440 : },
441 : }
442 :
443 : impl FromStr for ImageCompressionAlgorithm {
444 : type Err = anyhow::Error;
445 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
446 8 : let mut components = s.split(['(', ')']);
447 8 : let first = components
448 8 : .next()
449 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
450 8 : match first {
451 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
452 6 : "zstd" => {
453 6 : let level = if let Some(v) = components.next() {
454 4 : let v: i8 = v.parse()?;
455 4 : Some(v)
456 : } else {
457 2 : None
458 : };
459 :
460 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
461 : }
462 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
463 : }
464 8 : }
465 : }
466 :
467 : impl Display for ImageCompressionAlgorithm {
468 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 12 : match self {
470 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
471 9 : ImageCompressionAlgorithm::Zstd { level } => {
472 9 : if let Some(level) = level {
473 6 : write!(f, "zstd({})", level)
474 : } else {
475 3 : write!(f, "zstd")
476 : }
477 : }
478 : }
479 12 : }
480 : }
481 :
482 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
483 : pub struct CompactionAlgorithmSettings {
484 : pub kind: CompactionAlgorithm,
485 : }
486 :
487 6 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
488 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
489 : pub enum L0FlushConfig {
490 : #[serde(rename_all = "snake_case")]
491 : Direct { max_concurrency: NonZeroUsize },
492 : }
493 :
494 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
495 : pub struct EvictionPolicyLayerAccessThreshold {
496 : #[serde(with = "humantime_serde")]
497 : pub period: Duration,
498 : #[serde(with = "humantime_serde")]
499 : pub threshold: Duration,
500 : }
501 :
502 8 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
503 : pub struct ThrottleConfig {
504 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
505 : #[serde(rename = "task_kinds")]
506 : pub enabled: ThrottleConfigTaskKinds,
507 : pub initial: u32,
508 : #[serde(with = "humantime_serde")]
509 : pub refill_interval: Duration,
510 : pub refill_amount: NonZeroU32,
511 : pub max: u32,
512 : }
513 :
514 : /// Before <https://github.com/neondatabase/neon/pull/9962>
515 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
516 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
517 : /// were subject to the throttle.
518 : ///
519 : /// After that PR, the throttle is applied at pagestream request level
520 : /// and the `task_kinds` field does not apply since the only task kind
521 : /// that us subject to the throttle is that of the page service.
522 : ///
523 : /// However, we don't want to make a breaking config change right now
524 : /// because it means we have to migrate all the tenant configs.
525 : /// This will be done in a future PR.
526 : ///
527 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
528 : /// field to determine if the throttle is enabled or not.
529 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
530 : #[serde(transparent)]
531 : pub struct ThrottleConfigTaskKinds(Vec<String>);
532 :
533 : impl ThrottleConfigTaskKinds {
534 395 : pub fn disabled() -> Self {
535 395 : Self(vec![])
536 395 : }
537 194 : pub fn is_enabled(&self) -> bool {
538 194 : !self.0.is_empty()
539 194 : }
540 : }
541 :
542 : impl ThrottleConfig {
543 395 : pub fn disabled() -> Self {
544 395 : Self {
545 395 : enabled: ThrottleConfigTaskKinds::disabled(),
546 395 : // other values don't matter with emtpy `task_kinds`.
547 395 : initial: 0,
548 395 : refill_interval: Duration::from_millis(1),
549 395 : refill_amount: NonZeroU32::new(1).unwrap(),
550 395 : max: 1,
551 395 : }
552 395 : }
553 : /// The requests per second allowed by the given config.
554 0 : pub fn steady_rps(&self) -> f64 {
555 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
556 0 : }
557 : }
558 :
559 : #[cfg(test)]
560 : mod throttle_config_tests {
561 : use super::*;
562 :
563 : #[test]
564 1 : fn test_disabled_is_disabled() {
565 1 : let config = ThrottleConfig::disabled();
566 1 : assert!(!config.enabled.is_enabled());
567 1 : }
568 : #[test]
569 1 : fn test_enabled_backwards_compat() {
570 1 : let input = serde_json::json!({
571 1 : "task_kinds": ["PageRequestHandler"],
572 1 : "initial": 40000,
573 1 : "refill_interval": "50ms",
574 1 : "refill_amount": 1000,
575 1 : "max": 40000,
576 1 : "fair": true
577 1 : });
578 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
579 1 : assert!(config.enabled.is_enabled());
580 1 : }
581 : }
582 :
583 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
584 : /// lists out all possible states (and the virtual "Detached" state)
585 : /// in a flat form rather than using rust-style enums.
586 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
587 : pub enum LocationConfigMode {
588 : AttachedSingle,
589 : AttachedMulti,
590 : AttachedStale,
591 : Secondary,
592 : Detached,
593 : }
594 :
595 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
596 : pub struct LocationConfigSecondary {
597 : pub warm: bool,
598 : }
599 :
600 : /// An alternative representation of `pageserver::tenant::LocationConf`,
601 : /// for use in external-facing APIs.
602 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
603 : pub struct LocationConfig {
604 : pub mode: LocationConfigMode,
605 : /// If attaching, in what generation?
606 : #[serde(default)]
607 : pub generation: Option<u32>,
608 :
609 : // If requesting mode `Secondary`, configuration for that.
610 : #[serde(default)]
611 : pub secondary_conf: Option<LocationConfigSecondary>,
612 :
613 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
614 : // must be set accurately.
615 : #[serde(default)]
616 : pub shard_number: u8,
617 : #[serde(default)]
618 : pub shard_count: u8,
619 : #[serde(default)]
620 : pub shard_stripe_size: u32,
621 :
622 : // This configuration only affects attached mode, but should be provided irrespective
623 : // of the mode, as a secondary location might transition on startup if the response
624 : // to the `/re-attach` control plane API requests it.
625 : pub tenant_conf: TenantConfig,
626 : }
627 :
628 0 : #[derive(Serialize, Deserialize)]
629 : pub struct LocationConfigListResponse {
630 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
631 : }
632 :
633 : #[derive(Serialize)]
634 : pub struct StatusResponse {
635 : pub id: NodeId,
636 : }
637 :
638 0 : #[derive(Serialize, Deserialize, Debug)]
639 : #[serde(deny_unknown_fields)]
640 : pub struct TenantLocationConfigRequest {
641 : #[serde(flatten)]
642 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
643 : }
644 :
645 0 : #[derive(Serialize, Deserialize, Debug)]
646 : #[serde(deny_unknown_fields)]
647 : pub struct TenantTimeTravelRequest {
648 : pub shard_counts: Vec<ShardCount>,
649 : }
650 :
651 0 : #[derive(Serialize, Deserialize, Debug)]
652 : #[serde(deny_unknown_fields)]
653 : pub struct TenantShardLocation {
654 : pub shard_id: TenantShardId,
655 : pub node_id: NodeId,
656 : }
657 :
658 0 : #[derive(Serialize, Deserialize, Debug)]
659 : #[serde(deny_unknown_fields)]
660 : pub struct TenantLocationConfigResponse {
661 : pub shards: Vec<TenantShardLocation>,
662 : // If the shards' ShardCount count is >1, stripe_size will be set.
663 : pub stripe_size: Option<ShardStripeSize>,
664 : }
665 :
666 3 : #[derive(Serialize, Deserialize, Debug)]
667 : #[serde(deny_unknown_fields)]
668 : pub struct TenantConfigRequest {
669 : pub tenant_id: TenantId,
670 : #[serde(flatten)]
671 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
672 : }
673 :
674 : impl std::ops::Deref for TenantConfigRequest {
675 : type Target = TenantConfig;
676 :
677 0 : fn deref(&self) -> &Self::Target {
678 0 : &self.config
679 0 : }
680 : }
681 :
682 : impl TenantConfigRequest {
683 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
684 0 : let config = TenantConfig::default();
685 0 : TenantConfigRequest { tenant_id, config }
686 0 : }
687 : }
688 :
689 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
690 0 : #[derive(Serialize, Deserialize, Clone)]
691 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
692 : pub enum TenantAttachmentStatus {
693 : Maybe,
694 : Attached,
695 : Failed { reason: String },
696 : }
697 :
698 0 : #[derive(Serialize, Deserialize, Clone)]
699 : pub struct TenantInfo {
700 : pub id: TenantShardId,
701 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
702 : pub state: TenantState,
703 : /// Sum of the size of all layer files.
704 : /// If a layer is present in both local FS and S3, it counts only once.
705 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
706 : pub attachment_status: TenantAttachmentStatus,
707 : pub generation: u32,
708 :
709 : /// Opaque explanation if gc is being blocked.
710 : ///
711 : /// Only looked up for the individual tenant detail, not the listing. This is purely for
712 : /// debugging, not included in openapi.
713 : #[serde(skip_serializing_if = "Option::is_none")]
714 : pub gc_blocking: Option<String>,
715 : }
716 :
717 0 : #[derive(Serialize, Deserialize, Clone)]
718 : pub struct TenantDetails {
719 : #[serde(flatten)]
720 : pub tenant_info: TenantInfo,
721 :
722 : pub walredo: Option<WalRedoManagerStatus>,
723 :
724 : pub timelines: Vec<TimelineId>,
725 : }
726 :
727 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
728 : pub enum TimelineArchivalState {
729 : Archived,
730 : Unarchived,
731 : }
732 :
733 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
734 : pub struct TimelineArchivalConfigRequest {
735 : pub state: TimelineArchivalState,
736 : }
737 :
738 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
739 : pub struct TimelinesInfoAndOffloaded {
740 : pub timelines: Vec<TimelineInfo>,
741 : pub offloaded: Vec<OffloadedTimelineInfo>,
742 : }
743 :
744 : /// Analog of [`TimelineInfo`] for offloaded timelines.
745 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
746 : pub struct OffloadedTimelineInfo {
747 : pub tenant_id: TenantShardId,
748 : pub timeline_id: TimelineId,
749 : /// Whether the timeline has a parent it has been branched off from or not
750 : pub ancestor_timeline_id: Option<TimelineId>,
751 : /// Whether to retain the branch lsn at the ancestor or not
752 : pub ancestor_retain_lsn: Option<Lsn>,
753 : /// The time point when the timeline was archived
754 : pub archived_at: chrono::DateTime<chrono::Utc>,
755 : }
756 :
757 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
758 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
759 : pub struct TimelineInfo {
760 : pub tenant_id: TenantShardId,
761 : pub timeline_id: TimelineId,
762 :
763 : pub ancestor_timeline_id: Option<TimelineId>,
764 : pub ancestor_lsn: Option<Lsn>,
765 : pub last_record_lsn: Lsn,
766 : pub prev_record_lsn: Option<Lsn>,
767 : pub latest_gc_cutoff_lsn: Lsn,
768 : pub disk_consistent_lsn: Lsn,
769 :
770 : /// The LSN that we have succesfully uploaded to remote storage
771 : pub remote_consistent_lsn: Lsn,
772 :
773 : /// The LSN that we are advertizing to safekeepers
774 : pub remote_consistent_lsn_visible: Lsn,
775 :
776 : /// The LSN from the start of the root timeline (never changes)
777 : pub initdb_lsn: Lsn,
778 :
779 : pub current_logical_size: u64,
780 : pub current_logical_size_is_accurate: bool,
781 :
782 : pub directory_entries_counts: Vec<u64>,
783 :
784 : /// Sum of the size of all layer files.
785 : /// If a layer is present in both local FS and S3, it counts only once.
786 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
787 : pub current_logical_size_non_incremental: Option<u64>,
788 :
789 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
790 : /// beyond the branch's branch point, we only count up to the branch point.
791 : pub pitr_history_size: u64,
792 :
793 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
794 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
795 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
796 : /// otherwise be able to GC.
797 : pub within_ancestor_pitr: bool,
798 :
799 : pub timeline_dir_layer_file_size_sum: Option<u64>,
800 :
801 : pub wal_source_connstr: Option<String>,
802 : pub last_received_msg_lsn: Option<Lsn>,
803 : /// the timestamp (in microseconds) of the last received message
804 : pub last_received_msg_ts: Option<u128>,
805 : pub pg_version: u32,
806 :
807 : pub state: TimelineState,
808 :
809 : pub walreceiver_status: String,
810 :
811 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
812 : // Backward compatibility: you will get a JSON not containing the newly-added field.
813 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
814 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
815 : // read.
816 : pub is_archived: Option<bool>,
817 : }
818 :
819 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
820 : pub struct LayerMapInfo {
821 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
822 : pub historic_layers: Vec<HistoricLayerInfo>,
823 : }
824 :
825 : /// The residence status of a layer
826 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
827 : pub enum LayerResidenceStatus {
828 : /// Residence status for a layer file that exists locally.
829 : /// It may also exist on the remote, we don't care here.
830 : Resident,
831 : /// Residence status for a layer file that only exists on the remote.
832 : Evicted,
833 : }
834 :
835 : #[serde_as]
836 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
837 : pub struct LayerAccessStats {
838 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
839 : pub access_time: SystemTime,
840 :
841 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
842 : pub residence_time: SystemTime,
843 :
844 : pub visible: bool,
845 : }
846 :
847 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
848 : #[serde(tag = "kind")]
849 : pub enum InMemoryLayerInfo {
850 : Open { lsn_start: Lsn },
851 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
852 : }
853 :
854 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
855 : #[serde(tag = "kind")]
856 : pub enum HistoricLayerInfo {
857 : Delta {
858 : layer_file_name: String,
859 : layer_file_size: u64,
860 :
861 : lsn_start: Lsn,
862 : lsn_end: Lsn,
863 : remote: bool,
864 : access_stats: LayerAccessStats,
865 :
866 : l0: bool,
867 : },
868 : Image {
869 : layer_file_name: String,
870 : layer_file_size: u64,
871 :
872 : lsn_start: Lsn,
873 : remote: bool,
874 : access_stats: LayerAccessStats,
875 : },
876 : }
877 :
878 : impl HistoricLayerInfo {
879 0 : pub fn layer_file_name(&self) -> &str {
880 0 : match self {
881 : HistoricLayerInfo::Delta {
882 0 : layer_file_name, ..
883 0 : } => layer_file_name,
884 : HistoricLayerInfo::Image {
885 0 : layer_file_name, ..
886 0 : } => layer_file_name,
887 : }
888 0 : }
889 0 : pub fn is_remote(&self) -> bool {
890 0 : match self {
891 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
892 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
893 : }
894 0 : }
895 0 : pub fn set_remote(&mut self, value: bool) {
896 0 : let field = match self {
897 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
898 0 : HistoricLayerInfo::Image { remote, .. } => remote,
899 : };
900 0 : *field = value;
901 0 : }
902 0 : pub fn layer_file_size(&self) -> u64 {
903 0 : match self {
904 : HistoricLayerInfo::Delta {
905 0 : layer_file_size, ..
906 0 : } => *layer_file_size,
907 : HistoricLayerInfo::Image {
908 0 : layer_file_size, ..
909 0 : } => *layer_file_size,
910 : }
911 0 : }
912 : }
913 :
914 0 : #[derive(Debug, Serialize, Deserialize)]
915 : pub struct DownloadRemoteLayersTaskSpawnRequest {
916 : pub max_concurrent_downloads: NonZeroUsize,
917 : }
918 :
919 0 : #[derive(Debug, Serialize, Deserialize)]
920 : pub struct IngestAuxFilesRequest {
921 : pub aux_files: HashMap<String, String>,
922 : }
923 :
924 0 : #[derive(Debug, Serialize, Deserialize)]
925 : pub struct ListAuxFilesRequest {
926 : pub lsn: Lsn,
927 : }
928 :
929 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
930 : pub struct DownloadRemoteLayersTaskInfo {
931 : pub task_id: String,
932 : pub state: DownloadRemoteLayersTaskState,
933 : pub total_layer_count: u64, // stable once `completed`
934 : pub successful_download_count: u64, // stable once `completed`
935 : pub failed_download_count: u64, // stable once `completed`
936 : }
937 :
938 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
939 : pub enum DownloadRemoteLayersTaskState {
940 : Running,
941 : Completed,
942 : ShutDown,
943 : }
944 :
945 0 : #[derive(Debug, Serialize, Deserialize)]
946 : pub struct TimelineGcRequest {
947 : pub gc_horizon: Option<u64>,
948 : }
949 :
950 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
951 : pub struct WalRedoManagerProcessStatus {
952 : pub pid: u32,
953 : }
954 :
955 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
956 : pub struct WalRedoManagerStatus {
957 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
958 : pub process: Option<WalRedoManagerProcessStatus>,
959 : }
960 :
961 : /// The progress of a secondary tenant.
962 : ///
963 : /// It is mostly useful when doing a long running download: e.g. initiating
964 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
965 : /// what's happening.
966 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
967 : pub struct SecondaryProgress {
968 : /// The remote storage LastModified time of the heatmap object we last downloaded.
969 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
970 :
971 : /// The number of layers currently on-disk
972 : pub layers_downloaded: usize,
973 : /// The number of layers in the most recently seen heatmap
974 : pub layers_total: usize,
975 :
976 : /// The number of layer bytes currently on-disk
977 : pub bytes_downloaded: u64,
978 : /// The number of layer bytes in the most recently seen heatmap
979 : pub bytes_total: u64,
980 : }
981 :
982 0 : #[derive(Serialize, Deserialize, Debug)]
983 : pub struct TenantScanRemoteStorageShard {
984 : pub tenant_shard_id: TenantShardId,
985 : pub generation: Option<u32>,
986 : }
987 :
988 0 : #[derive(Serialize, Deserialize, Debug, Default)]
989 : pub struct TenantScanRemoteStorageResponse {
990 : pub shards: Vec<TenantScanRemoteStorageShard>,
991 : }
992 :
993 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
994 : #[serde(rename_all = "snake_case")]
995 : pub enum TenantSorting {
996 : ResidentSize,
997 : MaxLogicalSize,
998 : }
999 :
1000 : impl Default for TenantSorting {
1001 0 : fn default() -> Self {
1002 0 : Self::ResidentSize
1003 0 : }
1004 : }
1005 :
1006 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1007 : pub struct TopTenantShardsRequest {
1008 : // How would you like to sort the tenants?
1009 : pub order_by: TenantSorting,
1010 :
1011 : // How many results?
1012 : pub limit: usize,
1013 :
1014 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1015 : // that the caller would ever split to)
1016 : pub where_shards_lt: Option<ShardCount>,
1017 :
1018 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1019 : // let us quickly exclude numerous tiny shards)
1020 : pub where_gt: Option<u64>,
1021 : }
1022 :
1023 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1024 : pub struct TopTenantShardItem {
1025 : pub id: TenantShardId,
1026 :
1027 : /// Total size of layers on local disk for all timelines in this tenant
1028 : pub resident_size: u64,
1029 :
1030 : /// Total size of layers in remote storage for all timelines in this tenant
1031 : pub physical_size: u64,
1032 :
1033 : /// The largest logical size of a timeline within this tenant
1034 : pub max_logical_size: u64,
1035 : }
1036 :
1037 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1038 : pub struct TopTenantShardsResponse {
1039 : pub shards: Vec<TopTenantShardItem>,
1040 : }
1041 :
1042 : pub mod virtual_file {
1043 : #[derive(
1044 : Copy,
1045 : Clone,
1046 : PartialEq,
1047 : Eq,
1048 : Hash,
1049 204 : strum_macros::EnumString,
1050 0 : strum_macros::Display,
1051 0 : serde_with::DeserializeFromStr,
1052 : serde_with::SerializeDisplay,
1053 : Debug,
1054 : )]
1055 : #[strum(serialize_all = "kebab-case")]
1056 : pub enum IoEngineKind {
1057 : StdFs,
1058 : #[cfg(target_os = "linux")]
1059 : TokioEpollUring,
1060 : }
1061 :
1062 : /// Direct IO modes for a pageserver.
1063 : #[derive(
1064 : Copy,
1065 : Clone,
1066 : PartialEq,
1067 : Eq,
1068 : Hash,
1069 0 : strum_macros::EnumString,
1070 0 : strum_macros::Display,
1071 0 : serde_with::DeserializeFromStr,
1072 : serde_with::SerializeDisplay,
1073 : Debug,
1074 : )]
1075 : #[strum(serialize_all = "kebab-case")]
1076 : #[repr(u8)]
1077 : pub enum IoMode {
1078 : /// Uses buffered IO.
1079 : Buffered,
1080 : /// Uses direct IO, error out if the operation fails.
1081 : #[cfg(target_os = "linux")]
1082 : Direct,
1083 : }
1084 :
1085 : impl IoMode {
1086 210 : pub const fn preferred() -> Self {
1087 210 : Self::Buffered
1088 210 : }
1089 : }
1090 :
1091 : impl TryFrom<u8> for IoMode {
1092 : type Error = u8;
1093 :
1094 2466 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1095 2466 : Ok(match value {
1096 2466 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1097 : #[cfg(target_os = "linux")]
1098 0 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1099 0 : x => return Err(x),
1100 : })
1101 2466 : }
1102 : }
1103 : }
1104 :
1105 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1106 : pub struct ScanDisposableKeysResponse {
1107 : pub disposable_count: usize,
1108 : pub not_disposable_count: usize,
1109 : }
1110 :
1111 : // Wrapped in libpq CopyData
1112 : #[derive(PartialEq, Eq, Debug)]
1113 : pub enum PagestreamFeMessage {
1114 : Exists(PagestreamExistsRequest),
1115 : Nblocks(PagestreamNblocksRequest),
1116 : GetPage(PagestreamGetPageRequest),
1117 : DbSize(PagestreamDbSizeRequest),
1118 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1119 : }
1120 :
1121 : // Wrapped in libpq CopyData
1122 0 : #[derive(strum_macros::EnumProperty)]
1123 : pub enum PagestreamBeMessage {
1124 : Exists(PagestreamExistsResponse),
1125 : Nblocks(PagestreamNblocksResponse),
1126 : GetPage(PagestreamGetPageResponse),
1127 : Error(PagestreamErrorResponse),
1128 : DbSize(PagestreamDbSizeResponse),
1129 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1130 : }
1131 :
1132 : // Keep in sync with `pagestore_client.h`
1133 : #[repr(u8)]
1134 : enum PagestreamBeMessageTag {
1135 : Exists = 100,
1136 : Nblocks = 101,
1137 : GetPage = 102,
1138 : Error = 103,
1139 : DbSize = 104,
1140 : GetSlruSegment = 105,
1141 : }
1142 : impl TryFrom<u8> for PagestreamBeMessageTag {
1143 : type Error = u8;
1144 0 : fn try_from(value: u8) -> Result<Self, u8> {
1145 0 : match value {
1146 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1147 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1148 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1149 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1150 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1151 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1152 0 : _ => Err(value),
1153 : }
1154 0 : }
1155 : }
1156 :
1157 : // A GetPage request contains two LSN values:
1158 : //
1159 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
1160 : // "get the latest version present". It's used by the primary server, which knows that no one else
1161 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
1162 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
1163 : //
1164 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
1165 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
1166 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
1167 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
1168 : // request without waiting for 'request_lsn' to arrive.
1169 : //
1170 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
1171 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
1172 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
1173 : // standby to request a page at a particular non-latest LSN, and also include the
1174 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
1175 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
1176 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
1177 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
1178 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
1179 : // difference in the responses between V1 and V2.
1180 : //
1181 : #[derive(Clone, Copy)]
1182 : pub enum PagestreamProtocolVersion {
1183 : V2,
1184 : }
1185 :
1186 : #[derive(Debug, PartialEq, Eq)]
1187 : pub struct PagestreamExistsRequest {
1188 : pub request_lsn: Lsn,
1189 : pub not_modified_since: Lsn,
1190 : pub rel: RelTag,
1191 : }
1192 :
1193 : #[derive(Debug, PartialEq, Eq)]
1194 : pub struct PagestreamNblocksRequest {
1195 : pub request_lsn: Lsn,
1196 : pub not_modified_since: Lsn,
1197 : pub rel: RelTag,
1198 : }
1199 :
1200 : #[derive(Debug, PartialEq, Eq)]
1201 : pub struct PagestreamGetPageRequest {
1202 : pub request_lsn: Lsn,
1203 : pub not_modified_since: Lsn,
1204 : pub rel: RelTag,
1205 : pub blkno: u32,
1206 : }
1207 :
1208 : #[derive(Debug, PartialEq, Eq)]
1209 : pub struct PagestreamDbSizeRequest {
1210 : pub request_lsn: Lsn,
1211 : pub not_modified_since: Lsn,
1212 : pub dbnode: u32,
1213 : }
1214 :
1215 : #[derive(Debug, PartialEq, Eq)]
1216 : pub struct PagestreamGetSlruSegmentRequest {
1217 : pub request_lsn: Lsn,
1218 : pub not_modified_since: Lsn,
1219 : pub kind: u8,
1220 : pub segno: u32,
1221 : }
1222 :
1223 : #[derive(Debug)]
1224 : pub struct PagestreamExistsResponse {
1225 : pub exists: bool,
1226 : }
1227 :
1228 : #[derive(Debug)]
1229 : pub struct PagestreamNblocksResponse {
1230 : pub n_blocks: u32,
1231 : }
1232 :
1233 : #[derive(Debug)]
1234 : pub struct PagestreamGetPageResponse {
1235 : pub page: Bytes,
1236 : }
1237 :
1238 : #[derive(Debug)]
1239 : pub struct PagestreamGetSlruSegmentResponse {
1240 : pub segment: Bytes,
1241 : }
1242 :
1243 : #[derive(Debug)]
1244 : pub struct PagestreamErrorResponse {
1245 : pub message: String,
1246 : }
1247 :
1248 : #[derive(Debug)]
1249 : pub struct PagestreamDbSizeResponse {
1250 : pub db_size: i64,
1251 : }
1252 :
1253 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
1254 : // that require pageserver-internal types. It is sufficient to get the total size.
1255 0 : #[derive(Serialize, Deserialize, Debug)]
1256 : pub struct TenantHistorySize {
1257 : pub id: TenantId,
1258 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1259 : ///
1260 : /// Will be none if `?inputs_only=true` was given.
1261 : pub size: Option<u64>,
1262 : }
1263 :
1264 : impl PagestreamFeMessage {
1265 : /// Serialize a compute -> pageserver message. This is currently only used in testing
1266 : /// tools. Always uses protocol version 2.
1267 4 : pub fn serialize(&self) -> Bytes {
1268 4 : let mut bytes = BytesMut::new();
1269 4 :
1270 4 : match self {
1271 1 : Self::Exists(req) => {
1272 1 : bytes.put_u8(0);
1273 1 : bytes.put_u64(req.request_lsn.0);
1274 1 : bytes.put_u64(req.not_modified_since.0);
1275 1 : bytes.put_u32(req.rel.spcnode);
1276 1 : bytes.put_u32(req.rel.dbnode);
1277 1 : bytes.put_u32(req.rel.relnode);
1278 1 : bytes.put_u8(req.rel.forknum);
1279 1 : }
1280 :
1281 1 : Self::Nblocks(req) => {
1282 1 : bytes.put_u8(1);
1283 1 : bytes.put_u64(req.request_lsn.0);
1284 1 : bytes.put_u64(req.not_modified_since.0);
1285 1 : bytes.put_u32(req.rel.spcnode);
1286 1 : bytes.put_u32(req.rel.dbnode);
1287 1 : bytes.put_u32(req.rel.relnode);
1288 1 : bytes.put_u8(req.rel.forknum);
1289 1 : }
1290 :
1291 1 : Self::GetPage(req) => {
1292 1 : bytes.put_u8(2);
1293 1 : bytes.put_u64(req.request_lsn.0);
1294 1 : bytes.put_u64(req.not_modified_since.0);
1295 1 : bytes.put_u32(req.rel.spcnode);
1296 1 : bytes.put_u32(req.rel.dbnode);
1297 1 : bytes.put_u32(req.rel.relnode);
1298 1 : bytes.put_u8(req.rel.forknum);
1299 1 : bytes.put_u32(req.blkno);
1300 1 : }
1301 :
1302 1 : Self::DbSize(req) => {
1303 1 : bytes.put_u8(3);
1304 1 : bytes.put_u64(req.request_lsn.0);
1305 1 : bytes.put_u64(req.not_modified_since.0);
1306 1 : bytes.put_u32(req.dbnode);
1307 1 : }
1308 :
1309 0 : Self::GetSlruSegment(req) => {
1310 0 : bytes.put_u8(4);
1311 0 : bytes.put_u64(req.request_lsn.0);
1312 0 : bytes.put_u64(req.not_modified_since.0);
1313 0 : bytes.put_u8(req.kind);
1314 0 : bytes.put_u32(req.segno);
1315 0 : }
1316 : }
1317 :
1318 4 : bytes.into()
1319 4 : }
1320 :
1321 4 : pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
1322 : // these correspond to the NeonMessageTag enum in pagestore_client.h
1323 : //
1324 : // TODO: consider using protobuf or serde bincode for less error prone
1325 : // serialization.
1326 4 : let msg_tag = body.read_u8()?;
1327 :
1328 : // these two fields are the same for every request type
1329 4 : let request_lsn = Lsn::from(body.read_u64::<BigEndian>()?);
1330 4 : let not_modified_since = Lsn::from(body.read_u64::<BigEndian>()?);
1331 :
1332 4 : match msg_tag {
1333 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
1334 1 : request_lsn,
1335 1 : not_modified_since,
1336 1 : rel: RelTag {
1337 1 : spcnode: body.read_u32::<BigEndian>()?,
1338 1 : dbnode: body.read_u32::<BigEndian>()?,
1339 1 : relnode: body.read_u32::<BigEndian>()?,
1340 1 : forknum: body.read_u8()?,
1341 : },
1342 : })),
1343 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1344 1 : request_lsn,
1345 1 : not_modified_since,
1346 1 : rel: RelTag {
1347 1 : spcnode: body.read_u32::<BigEndian>()?,
1348 1 : dbnode: body.read_u32::<BigEndian>()?,
1349 1 : relnode: body.read_u32::<BigEndian>()?,
1350 1 : forknum: body.read_u8()?,
1351 : },
1352 : })),
1353 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1354 1 : request_lsn,
1355 1 : not_modified_since,
1356 1 : rel: RelTag {
1357 1 : spcnode: body.read_u32::<BigEndian>()?,
1358 1 : dbnode: body.read_u32::<BigEndian>()?,
1359 1 : relnode: body.read_u32::<BigEndian>()?,
1360 1 : forknum: body.read_u8()?,
1361 : },
1362 1 : blkno: body.read_u32::<BigEndian>()?,
1363 : })),
1364 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1365 1 : request_lsn,
1366 1 : not_modified_since,
1367 1 : dbnode: body.read_u32::<BigEndian>()?,
1368 : })),
1369 : 4 => Ok(PagestreamFeMessage::GetSlruSegment(
1370 : PagestreamGetSlruSegmentRequest {
1371 0 : request_lsn,
1372 0 : not_modified_since,
1373 0 : kind: body.read_u8()?,
1374 0 : segno: body.read_u32::<BigEndian>()?,
1375 : },
1376 : )),
1377 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
1378 : }
1379 4 : }
1380 : }
1381 :
1382 : impl PagestreamBeMessage {
1383 0 : pub fn serialize(&self) -> Bytes {
1384 0 : let mut bytes = BytesMut::new();
1385 :
1386 : use PagestreamBeMessageTag as Tag;
1387 0 : match self {
1388 0 : Self::Exists(resp) => {
1389 0 : bytes.put_u8(Tag::Exists as u8);
1390 0 : bytes.put_u8(resp.exists as u8);
1391 0 : }
1392 :
1393 0 : Self::Nblocks(resp) => {
1394 0 : bytes.put_u8(Tag::Nblocks as u8);
1395 0 : bytes.put_u32(resp.n_blocks);
1396 0 : }
1397 :
1398 0 : Self::GetPage(resp) => {
1399 0 : bytes.put_u8(Tag::GetPage as u8);
1400 0 : bytes.put(&resp.page[..]);
1401 0 : }
1402 :
1403 0 : Self::Error(resp) => {
1404 0 : bytes.put_u8(Tag::Error as u8);
1405 0 : bytes.put(resp.message.as_bytes());
1406 0 : bytes.put_u8(0); // null terminator
1407 0 : }
1408 0 : Self::DbSize(resp) => {
1409 0 : bytes.put_u8(Tag::DbSize as u8);
1410 0 : bytes.put_i64(resp.db_size);
1411 0 : }
1412 :
1413 0 : Self::GetSlruSegment(resp) => {
1414 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1415 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1416 0 : bytes.put(&resp.segment[..]);
1417 0 : }
1418 : }
1419 :
1420 0 : bytes.into()
1421 0 : }
1422 :
1423 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
1424 0 : let mut buf = buf.reader();
1425 0 : let msg_tag = buf.read_u8()?;
1426 :
1427 : use PagestreamBeMessageTag as Tag;
1428 0 : let ok =
1429 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
1430 : Tag::Exists => {
1431 0 : let exists = buf.read_u8()?;
1432 0 : Self::Exists(PagestreamExistsResponse {
1433 0 : exists: exists != 0,
1434 0 : })
1435 : }
1436 : Tag::Nblocks => {
1437 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1438 0 : Self::Nblocks(PagestreamNblocksResponse { n_blocks })
1439 : }
1440 : Tag::GetPage => {
1441 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
1442 0 : buf.read_exact(&mut page)?;
1443 0 : PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
1444 : }
1445 : Tag::Error => {
1446 0 : let mut msg = Vec::new();
1447 0 : buf.read_until(0, &mut msg)?;
1448 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
1449 0 : let rust_str = cstring.to_str()?;
1450 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1451 0 : message: rust_str.to_owned(),
1452 0 : })
1453 : }
1454 : Tag::DbSize => {
1455 0 : let db_size = buf.read_i64::<BigEndian>()?;
1456 0 : Self::DbSize(PagestreamDbSizeResponse { db_size })
1457 : }
1458 : Tag::GetSlruSegment => {
1459 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1460 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
1461 0 : buf.read_exact(&mut segment)?;
1462 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
1463 0 : segment: segment.into(),
1464 0 : })
1465 : }
1466 : };
1467 0 : let remaining = buf.into_inner();
1468 0 : if !remaining.is_empty() {
1469 0 : anyhow::bail!(
1470 0 : "remaining bytes in msg with tag={msg_tag}: {}",
1471 0 : remaining.len()
1472 0 : );
1473 0 : }
1474 0 : Ok(ok)
1475 0 : }
1476 :
1477 0 : pub fn kind(&self) -> &'static str {
1478 0 : match self {
1479 0 : Self::Exists(_) => "Exists",
1480 0 : Self::Nblocks(_) => "Nblocks",
1481 0 : Self::GetPage(_) => "GetPage",
1482 0 : Self::Error(_) => "Error",
1483 0 : Self::DbSize(_) => "DbSize",
1484 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
1485 : }
1486 0 : }
1487 : }
1488 :
1489 : #[cfg(test)]
1490 : mod tests {
1491 : use serde_json::json;
1492 : use std::str::FromStr;
1493 :
1494 : use super::*;
1495 :
1496 : #[test]
1497 1 : fn test_pagestream() {
1498 1 : // Test serialization/deserialization of PagestreamFeMessage
1499 1 : let messages = vec![
1500 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
1501 1 : request_lsn: Lsn(4),
1502 1 : not_modified_since: Lsn(3),
1503 1 : rel: RelTag {
1504 1 : forknum: 1,
1505 1 : spcnode: 2,
1506 1 : dbnode: 3,
1507 1 : relnode: 4,
1508 1 : },
1509 1 : }),
1510 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1511 1 : request_lsn: Lsn(4),
1512 1 : not_modified_since: Lsn(4),
1513 1 : rel: RelTag {
1514 1 : forknum: 1,
1515 1 : spcnode: 2,
1516 1 : dbnode: 3,
1517 1 : relnode: 4,
1518 1 : },
1519 1 : }),
1520 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1521 1 : request_lsn: Lsn(4),
1522 1 : not_modified_since: Lsn(3),
1523 1 : rel: RelTag {
1524 1 : forknum: 1,
1525 1 : spcnode: 2,
1526 1 : dbnode: 3,
1527 1 : relnode: 4,
1528 1 : },
1529 1 : blkno: 7,
1530 1 : }),
1531 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1532 1 : request_lsn: Lsn(4),
1533 1 : not_modified_since: Lsn(3),
1534 1 : dbnode: 7,
1535 1 : }),
1536 1 : ];
1537 5 : for msg in messages {
1538 4 : let bytes = msg.serialize();
1539 4 : let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
1540 4 : assert!(msg == reconstructed);
1541 : }
1542 1 : }
1543 :
1544 : #[test]
1545 1 : fn test_tenantinfo_serde() {
1546 1 : // Test serialization/deserialization of TenantInfo
1547 1 : let original_active = TenantInfo {
1548 1 : id: TenantShardId::unsharded(TenantId::generate()),
1549 1 : state: TenantState::Active,
1550 1 : current_physical_size: Some(42),
1551 1 : attachment_status: TenantAttachmentStatus::Attached,
1552 1 : generation: 1,
1553 1 : gc_blocking: None,
1554 1 : };
1555 1 : let expected_active = json!({
1556 1 : "id": original_active.id.to_string(),
1557 1 : "state": {
1558 1 : "slug": "Active",
1559 1 : },
1560 1 : "current_physical_size": 42,
1561 1 : "attachment_status": {
1562 1 : "slug":"attached",
1563 1 : },
1564 1 : "generation" : 1
1565 1 : });
1566 1 :
1567 1 : let original_broken = TenantInfo {
1568 1 : id: TenantShardId::unsharded(TenantId::generate()),
1569 1 : state: TenantState::Broken {
1570 1 : reason: "reason".into(),
1571 1 : backtrace: "backtrace info".into(),
1572 1 : },
1573 1 : current_physical_size: Some(42),
1574 1 : attachment_status: TenantAttachmentStatus::Attached,
1575 1 : generation: 1,
1576 1 : gc_blocking: None,
1577 1 : };
1578 1 : let expected_broken = json!({
1579 1 : "id": original_broken.id.to_string(),
1580 1 : "state": {
1581 1 : "slug": "Broken",
1582 1 : "data": {
1583 1 : "backtrace": "backtrace info",
1584 1 : "reason": "reason",
1585 1 : }
1586 1 : },
1587 1 : "current_physical_size": 42,
1588 1 : "attachment_status": {
1589 1 : "slug":"attached",
1590 1 : },
1591 1 : "generation" : 1
1592 1 : });
1593 1 :
1594 1 : assert_eq!(
1595 1 : serde_json::to_value(&original_active).unwrap(),
1596 1 : expected_active
1597 1 : );
1598 :
1599 1 : assert_eq!(
1600 1 : serde_json::to_value(&original_broken).unwrap(),
1601 1 : expected_broken
1602 1 : );
1603 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
1604 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
1605 1 : }
1606 :
1607 : #[test]
1608 1 : fn test_reject_unknown_field() {
1609 1 : let id = TenantId::generate();
1610 1 : let config_request = json!({
1611 1 : "tenant_id": id.to_string(),
1612 1 : "unknown_field": "unknown_value".to_string(),
1613 1 : });
1614 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
1615 1 : assert!(
1616 1 : err.to_string().contains("unknown field `unknown_field`"),
1617 0 : "expect unknown field `unknown_field` error, got: {}",
1618 : err
1619 : );
1620 1 : }
1621 :
1622 : #[test]
1623 1 : fn tenantstatus_activating_serde() {
1624 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
1625 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
1626 1 :
1627 1 : let actual = serde_json::to_string(&states).unwrap();
1628 1 :
1629 1 : assert_eq!(actual, expected);
1630 :
1631 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
1632 1 :
1633 1 : assert_eq!(states.as_slice(), &parsed);
1634 1 : }
1635 :
1636 : #[test]
1637 1 : fn tenantstatus_activating_strum() {
1638 1 : // tests added, because we use these for metrics
1639 1 : let examples = [
1640 1 : (line!(), TenantState::Attaching, "Attaching"),
1641 1 : (
1642 1 : line!(),
1643 1 : TenantState::Activating(ActivatingFrom::Attaching),
1644 1 : "Activating",
1645 1 : ),
1646 1 : (line!(), TenantState::Active, "Active"),
1647 1 : (
1648 1 : line!(),
1649 1 : TenantState::Stopping {
1650 1 : progress: utils::completion::Barrier::default(),
1651 1 : },
1652 1 : "Stopping",
1653 1 : ),
1654 1 : (
1655 1 : line!(),
1656 1 : TenantState::Broken {
1657 1 : reason: "Example".into(),
1658 1 : backtrace: "Looooong backtrace".into(),
1659 1 : },
1660 1 : "Broken",
1661 1 : ),
1662 1 : ];
1663 :
1664 6 : for (line, rendered, expected) in examples {
1665 5 : let actual: &'static str = rendered.into();
1666 5 : assert_eq!(actual, expected, "example on {line}");
1667 : }
1668 1 : }
1669 :
1670 : #[test]
1671 1 : fn test_image_compression_algorithm_parsing() {
1672 : use ImageCompressionAlgorithm::*;
1673 1 : let cases = [
1674 1 : ("disabled", Disabled),
1675 1 : ("zstd", Zstd { level: None }),
1676 1 : ("zstd(18)", Zstd { level: Some(18) }),
1677 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
1678 1 : ];
1679 :
1680 5 : for (display, expected) in cases {
1681 4 : assert_eq!(
1682 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
1683 : expected,
1684 0 : "parsing works"
1685 : );
1686 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
1687 :
1688 4 : let ser = serde_json::to_string(&expected).expect("serialization");
1689 4 : assert_eq!(
1690 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
1691 : expected,
1692 0 : "serde roundtrip"
1693 : );
1694 :
1695 4 : assert_eq!(
1696 4 : serde_json::Value::String(display.to_string()),
1697 4 : serde_json::to_value(expected).unwrap(),
1698 0 : "Display is the serde serialization"
1699 : );
1700 : }
1701 1 : }
1702 : }
|