Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : #[cfg(feature = "testing")]
6 : use camino::Utf8PathBuf;
7 : pub use utilization::PageserverUtilization;
8 :
9 : use core::ops::Range;
10 : use std::{
11 : collections::HashMap,
12 : fmt::Display,
13 : io::{BufRead, Read},
14 : num::{NonZeroU32, NonZeroU64, NonZeroUsize},
15 : str::FromStr,
16 : time::{Duration, SystemTime},
17 : };
18 :
19 : use byteorder::{BigEndian, ReadBytesExt};
20 : use postgres_ffi::BLCKSZ;
21 : use serde::{Deserialize, Deserializer, Serialize, Serializer};
22 : use serde_with::serde_as;
23 : use utils::{
24 : completion,
25 : id::{NodeId, TenantId, TimelineId},
26 : lsn::Lsn,
27 : postgres_client::PostgresClientProtocol,
28 : serde_system_time,
29 : };
30 :
31 : use crate::{
32 : key::{CompactKey, Key},
33 : reltag::RelTag,
34 : shard::{ShardCount, ShardStripeSize, TenantShardId},
35 : };
36 : use anyhow::bail;
37 : use bytes::{Buf, BufMut, Bytes, BytesMut};
38 :
39 : /// The state of a tenant in this pageserver.
40 : ///
41 : /// ```mermaid
42 : /// stateDiagram-v2
43 : ///
44 : /// [*] --> Attaching: spawn_attach()
45 : ///
46 : /// Attaching --> Activating: activate()
47 : /// Activating --> Active: infallible
48 : ///
49 : /// Attaching --> Broken: attach() failure
50 : ///
51 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
52 : /// Stopping --> Broken: late error in remove_tenant_from_memory
53 : ///
54 : /// Broken --> [*]: ignore / detach / shutdown
55 : /// Stopping --> [*]: remove_from_memory complete
56 : ///
57 : /// Active --> Broken: cfg(testing)-only tenant break point
58 : /// ```
59 : #[derive(
60 : Clone,
61 : PartialEq,
62 : Eq,
63 0 : serde::Serialize,
64 1 : serde::Deserialize,
65 : strum_macros::Display,
66 : strum_macros::VariantNames,
67 : strum_macros::AsRefStr,
68 : strum_macros::IntoStaticStr,
69 : )]
70 : #[serde(tag = "slug", content = "data")]
71 : pub enum TenantState {
72 : /// This tenant is being attached to the pageserver.
73 : ///
74 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
75 : Attaching,
76 : /// The tenant is transitioning from Loading/Attaching to Active.
77 : ///
78 : /// While in this state, the individual timelines are being activated.
79 : ///
80 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
81 : Activating(ActivatingFrom),
82 : /// The tenant has finished activating and is open for business.
83 : ///
84 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
85 : Active,
86 : /// The tenant is recognized by pageserver, but it is being detached or the
87 : /// system is being shut down.
88 : ///
89 : /// Transitions out of this state are possible through `set_broken()`.
90 : Stopping {
91 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
92 : // otherwise it will not be skipped during deserialization
93 : #[serde(skip)]
94 : progress: completion::Barrier,
95 : },
96 : /// The tenant is recognized by the pageserver, but can no longer be used for
97 : /// any operations.
98 : ///
99 : /// If the tenant fails to load or attach, it will transition to this state
100 : /// and it is guaranteed that no background tasks are running in its name.
101 : ///
102 : /// The other way to transition into this state is from `Stopping` state
103 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
104 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
105 : Broken { reason: String, backtrace: String },
106 : }
107 :
108 : impl TenantState {
109 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
110 : use TenantAttachmentStatus::*;
111 :
112 : // Below TenantState::Activating is used as "transient" or "transparent" state for
113 : // attachment_status determining.
114 0 : match self {
115 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
116 : // So, technically, we can return Attached here.
117 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
118 : // But, our attach task might still be fetching the remote timelines, etc.
119 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
120 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
121 : // We only reach Active after successful load / attach.
122 : // So, call atttachment status Attached.
123 0 : Self::Active => Attached,
124 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
125 : // However, it also becomes Broken if the regular load fails.
126 : // From Console's perspective there's no practical difference
127 : // because attachment_status is polled by console only during attach operation execution.
128 0 : Self::Broken { reason, .. } => Failed {
129 0 : reason: reason.to_owned(),
130 0 : },
131 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
132 : // we set the Stopping state irrespective of whether the tenant
133 : // has finished attaching or not.
134 0 : Self::Stopping { .. } => Maybe,
135 : }
136 0 : }
137 :
138 0 : pub fn broken_from_reason(reason: String) -> Self {
139 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
140 0 : Self::Broken {
141 0 : reason,
142 0 : backtrace: backtrace_str,
143 0 : }
144 0 : }
145 : }
146 :
147 : impl std::fmt::Debug for TenantState {
148 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 2 : match self {
150 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
151 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
152 : }
153 0 : _ => write!(f, "{self}"),
154 : }
155 2 : }
156 : }
157 :
158 : /// A temporary lease to a specific lsn inside a timeline.
159 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
160 : #[serde_as]
161 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
162 : pub struct LsnLease {
163 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
164 : pub valid_until: SystemTime,
165 : }
166 :
167 : serde_with::serde_conv!(
168 : SystemTimeAsRfc3339Millis,
169 : SystemTime,
170 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
171 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
172 : );
173 :
174 : impl LsnLease {
175 : /// The default length for an explicit LSN lease request (10 minutes).
176 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
177 :
178 : /// The default length for an implicit LSN lease granted during
179 : /// `get_lsn_by_timestamp` request (1 minutes).
180 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
181 :
182 : /// Checks whether the lease is expired.
183 6 : pub fn is_expired(&self, now: &SystemTime) -> bool {
184 6 : now > &self.valid_until
185 6 : }
186 : }
187 :
188 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
189 : ///
190 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
191 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
192 1 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
193 : pub enum ActivatingFrom {
194 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
195 : Attaching,
196 : }
197 :
198 : /// A state of a timeline in pageserver's memory.
199 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
200 : pub enum TimelineState {
201 : /// The timeline is recognized by the pageserver but is not yet operational.
202 : /// In particular, the walreceiver connection loop is not running for this timeline.
203 : /// It will eventually transition to state Active or Broken.
204 : Loading,
205 : /// The timeline is fully operational.
206 : /// It can be queried, and the walreceiver connection loop is running.
207 : Active,
208 : /// The timeline was previously Loading or Active but is shutting down.
209 : /// It cannot transition back into any other state.
210 : Stopping,
211 : /// The timeline is broken and not operational (previous states: Loading or Active).
212 : Broken { reason: String, backtrace: String },
213 : }
214 :
215 : #[serde_with::serde_as]
216 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
217 : pub struct CompactLsnRange {
218 : pub start: Lsn,
219 : pub end: Lsn,
220 : }
221 :
222 : #[serde_with::serde_as]
223 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
224 : pub struct CompactKeyRange {
225 : #[serde_as(as = "serde_with::DisplayFromStr")]
226 : pub start: Key,
227 : #[serde_as(as = "serde_with::DisplayFromStr")]
228 : pub end: Key,
229 : }
230 :
231 : impl From<Range<Lsn>> for CompactLsnRange {
232 6 : fn from(range: Range<Lsn>) -> Self {
233 6 : Self {
234 6 : start: range.start,
235 6 : end: range.end,
236 6 : }
237 6 : }
238 : }
239 :
240 : impl From<Range<Key>> for CompactKeyRange {
241 16 : fn from(range: Range<Key>) -> Self {
242 16 : Self {
243 16 : start: range.start,
244 16 : end: range.end,
245 16 : }
246 16 : }
247 : }
248 :
249 : impl From<CompactLsnRange> for Range<Lsn> {
250 10 : fn from(range: CompactLsnRange) -> Self {
251 10 : range.start..range.end
252 10 : }
253 : }
254 :
255 : impl From<CompactKeyRange> for Range<Key> {
256 16 : fn from(range: CompactKeyRange) -> Self {
257 16 : range.start..range.end
258 16 : }
259 : }
260 :
261 : impl CompactLsnRange {
262 4 : pub fn above(lsn: Lsn) -> Self {
263 4 : Self {
264 4 : start: lsn,
265 4 : end: Lsn::MAX,
266 4 : }
267 4 : }
268 : }
269 :
270 : #[derive(Debug, Clone, Serialize)]
271 : pub struct CompactInfoResponse {
272 : pub compact_key_range: Option<CompactKeyRange>,
273 : pub compact_lsn_range: Option<CompactLsnRange>,
274 : pub sub_compaction: bool,
275 : pub running: bool,
276 : pub job_id: usize,
277 : }
278 :
279 0 : #[derive(Serialize, Deserialize, Clone)]
280 : pub struct TimelineCreateRequest {
281 : pub new_timeline_id: TimelineId,
282 : #[serde(flatten)]
283 : pub mode: TimelineCreateRequestMode,
284 : }
285 :
286 0 : #[derive(Serialize, Deserialize, Clone)]
287 : #[serde(untagged)]
288 : pub enum TimelineCreateRequestMode {
289 : Branch {
290 : ancestor_timeline_id: TimelineId,
291 : #[serde(default)]
292 : ancestor_start_lsn: Option<Lsn>,
293 : // TODO: cplane sets this, but, the branching code always
294 : // inherits the ancestor's pg_version. Earlier code wasn't
295 : // using a flattened enum, so, it was an accepted field, and
296 : // we continue to accept it by having it here.
297 : pg_version: Option<u32>,
298 : },
299 : ImportPgdata {
300 : import_pgdata: TimelineCreateRequestModeImportPgdata,
301 : },
302 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
303 : // (serde picks the first matching enum variant, in declaration order).
304 : Bootstrap {
305 : #[serde(default)]
306 : existing_initdb_timeline_id: Option<TimelineId>,
307 : pg_version: Option<u32>,
308 : },
309 : }
310 :
311 0 : #[derive(Serialize, Deserialize, Clone)]
312 : pub struct TimelineCreateRequestModeImportPgdata {
313 : pub location: ImportPgdataLocation,
314 : pub idempotency_key: ImportPgdataIdempotencyKey,
315 : }
316 :
317 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
318 : pub enum ImportPgdataLocation {
319 : #[cfg(feature = "testing")]
320 : LocalFs { path: Utf8PathBuf },
321 : AwsS3 {
322 : region: String,
323 : bucket: String,
324 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
325 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
326 : key: String,
327 : },
328 : }
329 :
330 0 : #[derive(Serialize, Deserialize, Clone)]
331 : #[serde(transparent)]
332 : pub struct ImportPgdataIdempotencyKey(pub String);
333 :
334 : impl ImportPgdataIdempotencyKey {
335 0 : pub fn random() -> Self {
336 : use rand::{distributions::Alphanumeric, Rng};
337 0 : Self(
338 0 : rand::thread_rng()
339 0 : .sample_iter(&Alphanumeric)
340 0 : .take(20)
341 0 : .map(char::from)
342 0 : .collect(),
343 0 : )
344 0 : }
345 : }
346 :
347 0 : #[derive(Serialize, Deserialize, Clone)]
348 : pub struct LsnLeaseRequest {
349 : pub lsn: Lsn,
350 : }
351 :
352 0 : #[derive(Serialize, Deserialize)]
353 : pub struct TenantShardSplitRequest {
354 : pub new_shard_count: u8,
355 :
356 : // A tenant's stripe size is only meaningful the first time their shard count goes
357 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
358 : //
359 : // If this is set while the stripe count is being increased from an already >1 value,
360 : // then the request will fail with 400.
361 : pub new_stripe_size: Option<ShardStripeSize>,
362 : }
363 :
364 0 : #[derive(Serialize, Deserialize)]
365 : pub struct TenantShardSplitResponse {
366 : pub new_shards: Vec<TenantShardId>,
367 : }
368 :
369 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
370 0 : #[derive(Serialize, Deserialize, Debug)]
371 : #[serde(deny_unknown_fields)]
372 : pub struct ShardParameters {
373 : pub count: ShardCount,
374 : pub stripe_size: ShardStripeSize,
375 : }
376 :
377 : impl ShardParameters {
378 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
379 :
380 0 : pub fn is_unsharded(&self) -> bool {
381 0 : self.count.is_unsharded()
382 0 : }
383 : }
384 :
385 : impl Default for ShardParameters {
386 221 : fn default() -> Self {
387 221 : Self {
388 221 : count: ShardCount::new(0),
389 221 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
390 221 : }
391 221 : }
392 : }
393 :
394 : #[derive(Debug, Default, Clone, Eq, PartialEq)]
395 : pub enum FieldPatch<T> {
396 : Upsert(T),
397 : Remove,
398 : #[default]
399 : Noop,
400 : }
401 :
402 : impl<T> FieldPatch<T> {
403 48 : fn is_noop(&self) -> bool {
404 48 : matches!(self, FieldPatch::Noop)
405 48 : }
406 :
407 24 : pub fn apply(self, target: &mut Option<T>) {
408 24 : match self {
409 1 : Self::Upsert(v) => *target = Some(v),
410 1 : Self::Remove => *target = None,
411 22 : Self::Noop => {}
412 : }
413 24 : }
414 :
415 0 : pub fn map<U, E, F: FnOnce(T) -> Result<U, E>>(self, map: F) -> Result<FieldPatch<U>, E> {
416 0 : match self {
417 0 : Self::Upsert(v) => Ok(FieldPatch::<U>::Upsert(map(v)?)),
418 0 : Self::Remove => Ok(FieldPatch::<U>::Remove),
419 0 : Self::Noop => Ok(FieldPatch::<U>::Noop),
420 : }
421 0 : }
422 : }
423 :
424 : impl<'de, T: Deserialize<'de>> Deserialize<'de> for FieldPatch<T> {
425 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
426 2 : where
427 2 : D: Deserializer<'de>,
428 2 : {
429 2 : Option::deserialize(deserializer).map(|opt| match opt {
430 1 : None => FieldPatch::Remove,
431 1 : Some(val) => FieldPatch::Upsert(val),
432 2 : })
433 2 : }
434 : }
435 :
436 : impl<T: Serialize> Serialize for FieldPatch<T> {
437 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
438 2 : where
439 2 : S: Serializer,
440 2 : {
441 2 : match self {
442 1 : FieldPatch::Upsert(val) => serializer.serialize_some(val),
443 1 : FieldPatch::Remove => serializer.serialize_none(),
444 0 : FieldPatch::Noop => unreachable!(),
445 : }
446 2 : }
447 : }
448 :
449 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
450 : #[serde(default)]
451 : pub struct TenantConfigPatch {
452 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
453 : pub checkpoint_distance: FieldPatch<u64>,
454 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
455 : pub checkpoint_timeout: FieldPatch<String>,
456 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
457 : pub compaction_target_size: FieldPatch<u64>,
458 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
459 : pub compaction_period: FieldPatch<String>,
460 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
461 : pub compaction_threshold: FieldPatch<usize>,
462 : // defer parsing compaction_algorithm, like eviction_policy
463 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
464 : pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
465 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
466 : pub gc_horizon: FieldPatch<u64>,
467 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
468 : pub gc_period: FieldPatch<String>,
469 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
470 : pub image_creation_threshold: FieldPatch<usize>,
471 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
472 : pub pitr_interval: FieldPatch<String>,
473 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
474 : pub walreceiver_connect_timeout: FieldPatch<String>,
475 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
476 : pub lagging_wal_timeout: FieldPatch<String>,
477 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
478 : pub max_lsn_wal_lag: FieldPatch<NonZeroU64>,
479 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
480 : pub eviction_policy: FieldPatch<EvictionPolicy>,
481 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
482 : pub min_resident_size_override: FieldPatch<u64>,
483 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
484 : pub evictions_low_residence_duration_metric_threshold: FieldPatch<String>,
485 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
486 : pub heatmap_period: FieldPatch<String>,
487 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
488 : pub lazy_slru_download: FieldPatch<bool>,
489 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
490 : pub timeline_get_throttle: FieldPatch<ThrottleConfig>,
491 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
492 : pub image_layer_creation_check_threshold: FieldPatch<u8>,
493 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
494 : pub lsn_lease_length: FieldPatch<String>,
495 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
496 : pub lsn_lease_length_for_ts: FieldPatch<String>,
497 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
498 : pub timeline_offloading: FieldPatch<bool>,
499 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
500 : pub wal_receiver_protocol_override: FieldPatch<PostgresClientProtocol>,
501 : }
502 :
503 : /// An alternative representation of `pageserver::tenant::TenantConf` with
504 : /// simpler types.
505 0 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
506 : pub struct TenantConfig {
507 : pub checkpoint_distance: Option<u64>,
508 : pub checkpoint_timeout: Option<String>,
509 : pub compaction_target_size: Option<u64>,
510 : pub compaction_period: Option<String>,
511 : pub compaction_threshold: Option<usize>,
512 : // defer parsing compaction_algorithm, like eviction_policy
513 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
514 : pub gc_horizon: Option<u64>,
515 : pub gc_period: Option<String>,
516 : pub image_creation_threshold: Option<usize>,
517 : pub pitr_interval: Option<String>,
518 : pub walreceiver_connect_timeout: Option<String>,
519 : pub lagging_wal_timeout: Option<String>,
520 : pub max_lsn_wal_lag: Option<NonZeroU64>,
521 : pub eviction_policy: Option<EvictionPolicy>,
522 : pub min_resident_size_override: Option<u64>,
523 : pub evictions_low_residence_duration_metric_threshold: Option<String>,
524 : pub heatmap_period: Option<String>,
525 : pub lazy_slru_download: Option<bool>,
526 : pub timeline_get_throttle: Option<ThrottleConfig>,
527 : pub image_layer_creation_check_threshold: Option<u8>,
528 : pub lsn_lease_length: Option<String>,
529 : pub lsn_lease_length_for_ts: Option<String>,
530 : pub timeline_offloading: Option<bool>,
531 : pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
532 : }
533 :
534 : impl TenantConfig {
535 1 : pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
536 1 : let Self {
537 1 : mut checkpoint_distance,
538 1 : mut checkpoint_timeout,
539 1 : mut compaction_target_size,
540 1 : mut compaction_period,
541 1 : mut compaction_threshold,
542 1 : mut compaction_algorithm,
543 1 : mut gc_horizon,
544 1 : mut gc_period,
545 1 : mut image_creation_threshold,
546 1 : mut pitr_interval,
547 1 : mut walreceiver_connect_timeout,
548 1 : mut lagging_wal_timeout,
549 1 : mut max_lsn_wal_lag,
550 1 : mut eviction_policy,
551 1 : mut min_resident_size_override,
552 1 : mut evictions_low_residence_duration_metric_threshold,
553 1 : mut heatmap_period,
554 1 : mut lazy_slru_download,
555 1 : mut timeline_get_throttle,
556 1 : mut image_layer_creation_check_threshold,
557 1 : mut lsn_lease_length,
558 1 : mut lsn_lease_length_for_ts,
559 1 : mut timeline_offloading,
560 1 : mut wal_receiver_protocol_override,
561 1 : } = self;
562 1 :
563 1 : patch.checkpoint_distance.apply(&mut checkpoint_distance);
564 1 : patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
565 1 : patch
566 1 : .compaction_target_size
567 1 : .apply(&mut compaction_target_size);
568 1 : patch.compaction_period.apply(&mut compaction_period);
569 1 : patch.compaction_threshold.apply(&mut compaction_threshold);
570 1 : patch.compaction_algorithm.apply(&mut compaction_algorithm);
571 1 : patch.gc_horizon.apply(&mut gc_horizon);
572 1 : patch.gc_period.apply(&mut gc_period);
573 1 : patch
574 1 : .image_creation_threshold
575 1 : .apply(&mut image_creation_threshold);
576 1 : patch.pitr_interval.apply(&mut pitr_interval);
577 1 : patch
578 1 : .walreceiver_connect_timeout
579 1 : .apply(&mut walreceiver_connect_timeout);
580 1 : patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
581 1 : patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
582 1 : patch.eviction_policy.apply(&mut eviction_policy);
583 1 : patch
584 1 : .min_resident_size_override
585 1 : .apply(&mut min_resident_size_override);
586 1 : patch
587 1 : .evictions_low_residence_duration_metric_threshold
588 1 : .apply(&mut evictions_low_residence_duration_metric_threshold);
589 1 : patch.heatmap_period.apply(&mut heatmap_period);
590 1 : patch.lazy_slru_download.apply(&mut lazy_slru_download);
591 1 : patch
592 1 : .timeline_get_throttle
593 1 : .apply(&mut timeline_get_throttle);
594 1 : patch
595 1 : .image_layer_creation_check_threshold
596 1 : .apply(&mut image_layer_creation_check_threshold);
597 1 : patch.lsn_lease_length.apply(&mut lsn_lease_length);
598 1 : patch
599 1 : .lsn_lease_length_for_ts
600 1 : .apply(&mut lsn_lease_length_for_ts);
601 1 : patch.timeline_offloading.apply(&mut timeline_offloading);
602 1 : patch
603 1 : .wal_receiver_protocol_override
604 1 : .apply(&mut wal_receiver_protocol_override);
605 1 :
606 1 : Self {
607 1 : checkpoint_distance,
608 1 : checkpoint_timeout,
609 1 : compaction_target_size,
610 1 : compaction_period,
611 1 : compaction_threshold,
612 1 : compaction_algorithm,
613 1 : gc_horizon,
614 1 : gc_period,
615 1 : image_creation_threshold,
616 1 : pitr_interval,
617 1 : walreceiver_connect_timeout,
618 1 : lagging_wal_timeout,
619 1 : max_lsn_wal_lag,
620 1 : eviction_policy,
621 1 : min_resident_size_override,
622 1 : evictions_low_residence_duration_metric_threshold,
623 1 : heatmap_period,
624 1 : lazy_slru_download,
625 1 : timeline_get_throttle,
626 1 : image_layer_creation_check_threshold,
627 1 : lsn_lease_length,
628 1 : lsn_lease_length_for_ts,
629 1 : timeline_offloading,
630 1 : wal_receiver_protocol_override,
631 1 : }
632 1 : }
633 : }
634 :
635 : /// The policy for the aux file storage.
636 : ///
637 : /// It can be switched through `switch_aux_file_policy` tenant config.
638 : /// When the first aux file written, the policy will be persisted in the
639 : /// `index_part.json` file and has a limited migration path.
640 : ///
641 : /// Currently, we only allow the following migration path:
642 : ///
643 : /// Unset -> V1
644 : /// -> V2
645 : /// -> CrossValidation -> V2
646 : #[derive(
647 : Eq,
648 : PartialEq,
649 : Debug,
650 : Copy,
651 : Clone,
652 0 : strum_macros::EnumString,
653 : strum_macros::Display,
654 2 : serde_with::DeserializeFromStr,
655 : serde_with::SerializeDisplay,
656 : )]
657 : #[strum(serialize_all = "kebab-case")]
658 : pub enum AuxFilePolicy {
659 : /// V1 aux file policy: store everything in AUX_FILE_KEY
660 : #[strum(ascii_case_insensitive)]
661 : V1,
662 : /// V2 aux file policy: store in the AUX_FILE keyspace
663 : #[strum(ascii_case_insensitive)]
664 : V2,
665 : /// Cross validation runs both formats on the write path and does validation
666 : /// on the read path.
667 : #[strum(ascii_case_insensitive)]
668 : CrossValidation,
669 : }
670 :
671 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
672 : #[serde(tag = "kind")]
673 : pub enum EvictionPolicy {
674 : NoEviction,
675 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
676 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
677 : }
678 :
679 : impl EvictionPolicy {
680 0 : pub fn discriminant_str(&self) -> &'static str {
681 0 : match self {
682 0 : EvictionPolicy::NoEviction => "NoEviction",
683 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
684 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
685 : }
686 0 : }
687 : }
688 :
689 : #[derive(
690 : Eq,
691 : PartialEq,
692 : Debug,
693 : Copy,
694 : Clone,
695 0 : strum_macros::EnumString,
696 : strum_macros::Display,
697 0 : serde_with::DeserializeFromStr,
698 : serde_with::SerializeDisplay,
699 : )]
700 : #[strum(serialize_all = "kebab-case")]
701 : pub enum CompactionAlgorithm {
702 : Legacy,
703 : Tiered,
704 : }
705 :
706 : #[derive(
707 4 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
708 : )]
709 : pub enum ImageCompressionAlgorithm {
710 : // Disabled for writes, support decompressing during read path
711 : Disabled,
712 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
713 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
714 : Zstd {
715 : level: Option<i8>,
716 : },
717 : }
718 :
719 : impl FromStr for ImageCompressionAlgorithm {
720 : type Err = anyhow::Error;
721 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
722 8 : let mut components = s.split(['(', ')']);
723 8 : let first = components
724 8 : .next()
725 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
726 8 : match first {
727 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
728 6 : "zstd" => {
729 6 : let level = if let Some(v) = components.next() {
730 4 : let v: i8 = v.parse()?;
731 4 : Some(v)
732 : } else {
733 2 : None
734 : };
735 :
736 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
737 : }
738 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
739 : }
740 8 : }
741 : }
742 :
743 : impl Display for ImageCompressionAlgorithm {
744 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
745 12 : match self {
746 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
747 9 : ImageCompressionAlgorithm::Zstd { level } => {
748 9 : if let Some(level) = level {
749 6 : write!(f, "zstd({})", level)
750 : } else {
751 3 : write!(f, "zstd")
752 : }
753 : }
754 : }
755 12 : }
756 : }
757 :
758 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
759 : pub struct CompactionAlgorithmSettings {
760 : pub kind: CompactionAlgorithm,
761 : }
762 :
763 4 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
764 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
765 : pub enum L0FlushConfig {
766 : #[serde(rename_all = "snake_case")]
767 : Direct { max_concurrency: NonZeroUsize },
768 : }
769 :
770 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
771 : pub struct EvictionPolicyLayerAccessThreshold {
772 : #[serde(with = "humantime_serde")]
773 : pub period: Duration,
774 : #[serde(with = "humantime_serde")]
775 : pub threshold: Duration,
776 : }
777 :
778 6 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
779 : pub struct ThrottleConfig {
780 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
781 : #[serde(rename = "task_kinds")]
782 : pub enabled: ThrottleConfigTaskKinds,
783 : pub initial: u32,
784 : #[serde(with = "humantime_serde")]
785 : pub refill_interval: Duration,
786 : pub refill_amount: NonZeroU32,
787 : pub max: u32,
788 : }
789 :
790 : /// Before <https://github.com/neondatabase/neon/pull/9962>
791 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
792 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
793 : /// were subject to the throttle.
794 : ///
795 : /// After that PR, the throttle is applied at pagestream request level
796 : /// and the `task_kinds` field does not apply since the only task kind
797 : /// that us subject to the throttle is that of the page service.
798 : ///
799 : /// However, we don't want to make a breaking config change right now
800 : /// because it means we have to migrate all the tenant configs.
801 : /// This will be done in a future PR.
802 : ///
803 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
804 : /// field to determine if the throttle is enabled or not.
805 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
806 : #[serde(transparent)]
807 : pub struct ThrottleConfigTaskKinds(Vec<String>);
808 :
809 : impl ThrottleConfigTaskKinds {
810 451 : pub fn disabled() -> Self {
811 451 : Self(vec![])
812 451 : }
813 222 : pub fn is_enabled(&self) -> bool {
814 222 : !self.0.is_empty()
815 222 : }
816 : }
817 :
818 : impl ThrottleConfig {
819 451 : pub fn disabled() -> Self {
820 451 : Self {
821 451 : enabled: ThrottleConfigTaskKinds::disabled(),
822 451 : // other values don't matter with emtpy `task_kinds`.
823 451 : initial: 0,
824 451 : refill_interval: Duration::from_millis(1),
825 451 : refill_amount: NonZeroU32::new(1).unwrap(),
826 451 : max: 1,
827 451 : }
828 451 : }
829 : /// The requests per second allowed by the given config.
830 0 : pub fn steady_rps(&self) -> f64 {
831 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
832 0 : }
833 : }
834 :
835 : #[cfg(test)]
836 : mod throttle_config_tests {
837 : use super::*;
838 :
839 : #[test]
840 1 : fn test_disabled_is_disabled() {
841 1 : let config = ThrottleConfig::disabled();
842 1 : assert!(!config.enabled.is_enabled());
843 1 : }
844 : #[test]
845 1 : fn test_enabled_backwards_compat() {
846 1 : let input = serde_json::json!({
847 1 : "task_kinds": ["PageRequestHandler"],
848 1 : "initial": 40000,
849 1 : "refill_interval": "50ms",
850 1 : "refill_amount": 1000,
851 1 : "max": 40000,
852 1 : "fair": true
853 1 : });
854 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
855 1 : assert!(config.enabled.is_enabled());
856 1 : }
857 : }
858 :
859 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
860 : /// lists out all possible states (and the virtual "Detached" state)
861 : /// in a flat form rather than using rust-style enums.
862 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
863 : pub enum LocationConfigMode {
864 : AttachedSingle,
865 : AttachedMulti,
866 : AttachedStale,
867 : Secondary,
868 : Detached,
869 : }
870 :
871 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
872 : pub struct LocationConfigSecondary {
873 : pub warm: bool,
874 : }
875 :
876 : /// An alternative representation of `pageserver::tenant::LocationConf`,
877 : /// for use in external-facing APIs.
878 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
879 : pub struct LocationConfig {
880 : pub mode: LocationConfigMode,
881 : /// If attaching, in what generation?
882 : #[serde(default)]
883 : pub generation: Option<u32>,
884 :
885 : // If requesting mode `Secondary`, configuration for that.
886 : #[serde(default)]
887 : pub secondary_conf: Option<LocationConfigSecondary>,
888 :
889 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
890 : // must be set accurately.
891 : #[serde(default)]
892 : pub shard_number: u8,
893 : #[serde(default)]
894 : pub shard_count: u8,
895 : #[serde(default)]
896 : pub shard_stripe_size: u32,
897 :
898 : // This configuration only affects attached mode, but should be provided irrespective
899 : // of the mode, as a secondary location might transition on startup if the response
900 : // to the `/re-attach` control plane API requests it.
901 : pub tenant_conf: TenantConfig,
902 : }
903 :
904 0 : #[derive(Serialize, Deserialize)]
905 : pub struct LocationConfigListResponse {
906 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
907 : }
908 :
909 : #[derive(Serialize)]
910 : pub struct StatusResponse {
911 : pub id: NodeId,
912 : }
913 :
914 0 : #[derive(Serialize, Deserialize, Debug)]
915 : #[serde(deny_unknown_fields)]
916 : pub struct TenantLocationConfigRequest {
917 : #[serde(flatten)]
918 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
919 : }
920 :
921 0 : #[derive(Serialize, Deserialize, Debug)]
922 : #[serde(deny_unknown_fields)]
923 : pub struct TenantTimeTravelRequest {
924 : pub shard_counts: Vec<ShardCount>,
925 : }
926 :
927 0 : #[derive(Serialize, Deserialize, Debug)]
928 : #[serde(deny_unknown_fields)]
929 : pub struct TenantShardLocation {
930 : pub shard_id: TenantShardId,
931 : pub node_id: NodeId,
932 : }
933 :
934 0 : #[derive(Serialize, Deserialize, Debug)]
935 : #[serde(deny_unknown_fields)]
936 : pub struct TenantLocationConfigResponse {
937 : pub shards: Vec<TenantShardLocation>,
938 : // If the shards' ShardCount count is >1, stripe_size will be set.
939 : pub stripe_size: Option<ShardStripeSize>,
940 : }
941 :
942 2 : #[derive(Serialize, Deserialize, Debug)]
943 : #[serde(deny_unknown_fields)]
944 : pub struct TenantConfigRequest {
945 : pub tenant_id: TenantId,
946 : #[serde(flatten)]
947 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
948 : }
949 :
950 : impl std::ops::Deref for TenantConfigRequest {
951 : type Target = TenantConfig;
952 :
953 0 : fn deref(&self) -> &Self::Target {
954 0 : &self.config
955 0 : }
956 : }
957 :
958 : impl TenantConfigRequest {
959 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
960 0 : let config = TenantConfig::default();
961 0 : TenantConfigRequest { tenant_id, config }
962 0 : }
963 : }
964 :
965 3 : #[derive(Serialize, Deserialize, Debug)]
966 : #[serde(deny_unknown_fields)]
967 : pub struct TenantConfigPatchRequest {
968 : pub tenant_id: TenantId,
969 : #[serde(flatten)]
970 : pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
971 : }
972 :
973 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
974 0 : #[derive(Serialize, Deserialize, Clone)]
975 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
976 : pub enum TenantAttachmentStatus {
977 : Maybe,
978 : Attached,
979 : Failed { reason: String },
980 : }
981 :
982 0 : #[derive(Serialize, Deserialize, Clone)]
983 : pub struct TenantInfo {
984 : pub id: TenantShardId,
985 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
986 : pub state: TenantState,
987 : /// Sum of the size of all layer files.
988 : /// If a layer is present in both local FS and S3, it counts only once.
989 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
990 : pub attachment_status: TenantAttachmentStatus,
991 : pub generation: u32,
992 :
993 : /// Opaque explanation if gc is being blocked.
994 : ///
995 : /// Only looked up for the individual tenant detail, not the listing. This is purely for
996 : /// debugging, not included in openapi.
997 : #[serde(skip_serializing_if = "Option::is_none")]
998 : pub gc_blocking: Option<String>,
999 : }
1000 :
1001 0 : #[derive(Serialize, Deserialize, Clone)]
1002 : pub struct TenantDetails {
1003 : #[serde(flatten)]
1004 : pub tenant_info: TenantInfo,
1005 :
1006 : pub walredo: Option<WalRedoManagerStatus>,
1007 :
1008 : pub timelines: Vec<TimelineId>,
1009 : }
1010 :
1011 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
1012 : pub enum TimelineArchivalState {
1013 : Archived,
1014 : Unarchived,
1015 : }
1016 :
1017 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1018 : pub struct TimelineArchivalConfigRequest {
1019 : pub state: TimelineArchivalState,
1020 : }
1021 :
1022 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1023 : pub struct TimelinesInfoAndOffloaded {
1024 : pub timelines: Vec<TimelineInfo>,
1025 : pub offloaded: Vec<OffloadedTimelineInfo>,
1026 : }
1027 :
1028 : /// Analog of [`TimelineInfo`] for offloaded timelines.
1029 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1030 : pub struct OffloadedTimelineInfo {
1031 : pub tenant_id: TenantShardId,
1032 : pub timeline_id: TimelineId,
1033 : /// Whether the timeline has a parent it has been branched off from or not
1034 : pub ancestor_timeline_id: Option<TimelineId>,
1035 : /// Whether to retain the branch lsn at the ancestor or not
1036 : pub ancestor_retain_lsn: Option<Lsn>,
1037 : /// The time point when the timeline was archived
1038 : pub archived_at: chrono::DateTime<chrono::Utc>,
1039 : }
1040 :
1041 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
1042 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1043 : pub struct TimelineInfo {
1044 : pub tenant_id: TenantShardId,
1045 : pub timeline_id: TimelineId,
1046 :
1047 : pub ancestor_timeline_id: Option<TimelineId>,
1048 : pub ancestor_lsn: Option<Lsn>,
1049 : pub last_record_lsn: Lsn,
1050 : pub prev_record_lsn: Option<Lsn>,
1051 : pub latest_gc_cutoff_lsn: Lsn,
1052 : pub disk_consistent_lsn: Lsn,
1053 :
1054 : /// The LSN that we have succesfully uploaded to remote storage
1055 : pub remote_consistent_lsn: Lsn,
1056 :
1057 : /// The LSN that we are advertizing to safekeepers
1058 : pub remote_consistent_lsn_visible: Lsn,
1059 :
1060 : /// The LSN from the start of the root timeline (never changes)
1061 : pub initdb_lsn: Lsn,
1062 :
1063 : pub current_logical_size: u64,
1064 : pub current_logical_size_is_accurate: bool,
1065 :
1066 : pub directory_entries_counts: Vec<u64>,
1067 :
1068 : /// Sum of the size of all layer files.
1069 : /// If a layer is present in both local FS and S3, it counts only once.
1070 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
1071 : pub current_logical_size_non_incremental: Option<u64>,
1072 :
1073 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
1074 : /// beyond the branch's branch point, we only count up to the branch point.
1075 : pub pitr_history_size: u64,
1076 :
1077 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
1078 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
1079 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
1080 : /// otherwise be able to GC.
1081 : pub within_ancestor_pitr: bool,
1082 :
1083 : pub timeline_dir_layer_file_size_sum: Option<u64>,
1084 :
1085 : pub wal_source_connstr: Option<String>,
1086 : pub last_received_msg_lsn: Option<Lsn>,
1087 : /// the timestamp (in microseconds) of the last received message
1088 : pub last_received_msg_ts: Option<u128>,
1089 : pub pg_version: u32,
1090 :
1091 : pub state: TimelineState,
1092 :
1093 : pub walreceiver_status: String,
1094 :
1095 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
1096 : // Backward compatibility: you will get a JSON not containing the newly-added field.
1097 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
1098 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
1099 : // read.
1100 : pub is_archived: Option<bool>,
1101 : }
1102 :
1103 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1104 : pub struct LayerMapInfo {
1105 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
1106 : pub historic_layers: Vec<HistoricLayerInfo>,
1107 : }
1108 :
1109 : /// The residence status of a layer
1110 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1111 : pub enum LayerResidenceStatus {
1112 : /// Residence status for a layer file that exists locally.
1113 : /// It may also exist on the remote, we don't care here.
1114 : Resident,
1115 : /// Residence status for a layer file that only exists on the remote.
1116 : Evicted,
1117 : }
1118 :
1119 : #[serde_as]
1120 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1121 : pub struct LayerAccessStats {
1122 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1123 : pub access_time: SystemTime,
1124 :
1125 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1126 : pub residence_time: SystemTime,
1127 :
1128 : pub visible: bool,
1129 : }
1130 :
1131 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1132 : #[serde(tag = "kind")]
1133 : pub enum InMemoryLayerInfo {
1134 : Open { lsn_start: Lsn },
1135 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
1136 : }
1137 :
1138 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1139 : #[serde(tag = "kind")]
1140 : pub enum HistoricLayerInfo {
1141 : Delta {
1142 : layer_file_name: String,
1143 : layer_file_size: u64,
1144 :
1145 : lsn_start: Lsn,
1146 : lsn_end: Lsn,
1147 : remote: bool,
1148 : access_stats: LayerAccessStats,
1149 :
1150 : l0: bool,
1151 : },
1152 : Image {
1153 : layer_file_name: String,
1154 : layer_file_size: u64,
1155 :
1156 : lsn_start: Lsn,
1157 : remote: bool,
1158 : access_stats: LayerAccessStats,
1159 : },
1160 : }
1161 :
1162 : impl HistoricLayerInfo {
1163 0 : pub fn layer_file_name(&self) -> &str {
1164 0 : match self {
1165 : HistoricLayerInfo::Delta {
1166 0 : layer_file_name, ..
1167 0 : } => layer_file_name,
1168 : HistoricLayerInfo::Image {
1169 0 : layer_file_name, ..
1170 0 : } => layer_file_name,
1171 : }
1172 0 : }
1173 0 : pub fn is_remote(&self) -> bool {
1174 0 : match self {
1175 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
1176 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
1177 : }
1178 0 : }
1179 0 : pub fn set_remote(&mut self, value: bool) {
1180 0 : let field = match self {
1181 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
1182 0 : HistoricLayerInfo::Image { remote, .. } => remote,
1183 : };
1184 0 : *field = value;
1185 0 : }
1186 0 : pub fn layer_file_size(&self) -> u64 {
1187 0 : match self {
1188 : HistoricLayerInfo::Delta {
1189 0 : layer_file_size, ..
1190 0 : } => *layer_file_size,
1191 : HistoricLayerInfo::Image {
1192 0 : layer_file_size, ..
1193 0 : } => *layer_file_size,
1194 : }
1195 0 : }
1196 : }
1197 :
1198 0 : #[derive(Debug, Serialize, Deserialize)]
1199 : pub struct DownloadRemoteLayersTaskSpawnRequest {
1200 : pub max_concurrent_downloads: NonZeroUsize,
1201 : }
1202 :
1203 0 : #[derive(Debug, Serialize, Deserialize)]
1204 : pub struct IngestAuxFilesRequest {
1205 : pub aux_files: HashMap<String, String>,
1206 : }
1207 :
1208 0 : #[derive(Debug, Serialize, Deserialize)]
1209 : pub struct ListAuxFilesRequest {
1210 : pub lsn: Lsn,
1211 : }
1212 :
1213 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1214 : pub struct DownloadRemoteLayersTaskInfo {
1215 : pub task_id: String,
1216 : pub state: DownloadRemoteLayersTaskState,
1217 : pub total_layer_count: u64, // stable once `completed`
1218 : pub successful_download_count: u64, // stable once `completed`
1219 : pub failed_download_count: u64, // stable once `completed`
1220 : }
1221 :
1222 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1223 : pub enum DownloadRemoteLayersTaskState {
1224 : Running,
1225 : Completed,
1226 : ShutDown,
1227 : }
1228 :
1229 0 : #[derive(Debug, Serialize, Deserialize)]
1230 : pub struct TimelineGcRequest {
1231 : pub gc_horizon: Option<u64>,
1232 : }
1233 :
1234 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1235 : pub struct WalRedoManagerProcessStatus {
1236 : pub pid: u32,
1237 : }
1238 :
1239 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1240 : pub struct WalRedoManagerStatus {
1241 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
1242 : pub process: Option<WalRedoManagerProcessStatus>,
1243 : }
1244 :
1245 : /// The progress of a secondary tenant.
1246 : ///
1247 : /// It is mostly useful when doing a long running download: e.g. initiating
1248 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
1249 : /// what's happening.
1250 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
1251 : pub struct SecondaryProgress {
1252 : /// The remote storage LastModified time of the heatmap object we last downloaded.
1253 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
1254 :
1255 : /// The number of layers currently on-disk
1256 : pub layers_downloaded: usize,
1257 : /// The number of layers in the most recently seen heatmap
1258 : pub layers_total: usize,
1259 :
1260 : /// The number of layer bytes currently on-disk
1261 : pub bytes_downloaded: u64,
1262 : /// The number of layer bytes in the most recently seen heatmap
1263 : pub bytes_total: u64,
1264 : }
1265 :
1266 0 : #[derive(Serialize, Deserialize, Debug)]
1267 : pub struct TenantScanRemoteStorageShard {
1268 : pub tenant_shard_id: TenantShardId,
1269 : pub generation: Option<u32>,
1270 : }
1271 :
1272 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1273 : pub struct TenantScanRemoteStorageResponse {
1274 : pub shards: Vec<TenantScanRemoteStorageShard>,
1275 : }
1276 :
1277 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1278 : #[serde(rename_all = "snake_case")]
1279 : pub enum TenantSorting {
1280 : ResidentSize,
1281 : MaxLogicalSize,
1282 : }
1283 :
1284 : impl Default for TenantSorting {
1285 0 : fn default() -> Self {
1286 0 : Self::ResidentSize
1287 0 : }
1288 : }
1289 :
1290 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1291 : pub struct TopTenantShardsRequest {
1292 : // How would you like to sort the tenants?
1293 : pub order_by: TenantSorting,
1294 :
1295 : // How many results?
1296 : pub limit: usize,
1297 :
1298 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1299 : // that the caller would ever split to)
1300 : pub where_shards_lt: Option<ShardCount>,
1301 :
1302 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1303 : // let us quickly exclude numerous tiny shards)
1304 : pub where_gt: Option<u64>,
1305 : }
1306 :
1307 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1308 : pub struct TopTenantShardItem {
1309 : pub id: TenantShardId,
1310 :
1311 : /// Total size of layers on local disk for all timelines in this tenant
1312 : pub resident_size: u64,
1313 :
1314 : /// Total size of layers in remote storage for all timelines in this tenant
1315 : pub physical_size: u64,
1316 :
1317 : /// The largest logical size of a timeline within this tenant
1318 : pub max_logical_size: u64,
1319 : }
1320 :
1321 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1322 : pub struct TopTenantShardsResponse {
1323 : pub shards: Vec<TopTenantShardItem>,
1324 : }
1325 :
1326 : pub mod virtual_file {
1327 : #[derive(
1328 : Copy,
1329 : Clone,
1330 : PartialEq,
1331 : Eq,
1332 : Hash,
1333 0 : strum_macros::EnumString,
1334 : strum_macros::Display,
1335 0 : serde_with::DeserializeFromStr,
1336 : serde_with::SerializeDisplay,
1337 : Debug,
1338 : )]
1339 : #[strum(serialize_all = "kebab-case")]
1340 : pub enum IoEngineKind {
1341 : StdFs,
1342 : #[cfg(target_os = "linux")]
1343 : TokioEpollUring,
1344 : }
1345 :
1346 : /// Direct IO modes for a pageserver.
1347 : #[derive(
1348 : Copy,
1349 : Clone,
1350 : PartialEq,
1351 : Eq,
1352 : Hash,
1353 0 : strum_macros::EnumString,
1354 : strum_macros::Display,
1355 0 : serde_with::DeserializeFromStr,
1356 : serde_with::SerializeDisplay,
1357 : Debug,
1358 : )]
1359 : #[strum(serialize_all = "kebab-case")]
1360 : #[repr(u8)]
1361 : pub enum IoMode {
1362 : /// Uses buffered IO.
1363 : Buffered,
1364 : /// Uses direct IO, error out if the operation fails.
1365 : #[cfg(target_os = "linux")]
1366 : Direct,
1367 : }
1368 :
1369 : impl IoMode {
1370 238 : pub const fn preferred() -> Self {
1371 238 : Self::Buffered
1372 238 : }
1373 : }
1374 :
1375 : impl TryFrom<u8> for IoMode {
1376 : type Error = u8;
1377 :
1378 2532 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1379 2532 : Ok(match value {
1380 2532 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1381 : #[cfg(target_os = "linux")]
1382 0 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1383 0 : x => return Err(x),
1384 : })
1385 2532 : }
1386 : }
1387 : }
1388 :
1389 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1390 : pub struct ScanDisposableKeysResponse {
1391 : pub disposable_count: usize,
1392 : pub not_disposable_count: usize,
1393 : }
1394 :
1395 : // Wrapped in libpq CopyData
1396 : #[derive(PartialEq, Eq, Debug)]
1397 : pub enum PagestreamFeMessage {
1398 : Exists(PagestreamExistsRequest),
1399 : Nblocks(PagestreamNblocksRequest),
1400 : GetPage(PagestreamGetPageRequest),
1401 : DbSize(PagestreamDbSizeRequest),
1402 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1403 : }
1404 :
1405 : // Wrapped in libpq CopyData
1406 : #[derive(strum_macros::EnumProperty)]
1407 : pub enum PagestreamBeMessage {
1408 : Exists(PagestreamExistsResponse),
1409 : Nblocks(PagestreamNblocksResponse),
1410 : GetPage(PagestreamGetPageResponse),
1411 : Error(PagestreamErrorResponse),
1412 : DbSize(PagestreamDbSizeResponse),
1413 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1414 : }
1415 :
1416 : // Keep in sync with `pagestore_client.h`
1417 : #[repr(u8)]
1418 : enum PagestreamBeMessageTag {
1419 : Exists = 100,
1420 : Nblocks = 101,
1421 : GetPage = 102,
1422 : Error = 103,
1423 : DbSize = 104,
1424 : GetSlruSegment = 105,
1425 : }
1426 : impl TryFrom<u8> for PagestreamBeMessageTag {
1427 : type Error = u8;
1428 0 : fn try_from(value: u8) -> Result<Self, u8> {
1429 0 : match value {
1430 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1431 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1432 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1433 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1434 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1435 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1436 0 : _ => Err(value),
1437 : }
1438 0 : }
1439 : }
1440 :
1441 : // A GetPage request contains two LSN values:
1442 : //
1443 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
1444 : // "get the latest version present". It's used by the primary server, which knows that no one else
1445 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
1446 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
1447 : //
1448 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
1449 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
1450 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
1451 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
1452 : // request without waiting for 'request_lsn' to arrive.
1453 : //
1454 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
1455 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
1456 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
1457 : // standby to request a page at a particular non-latest LSN, and also include the
1458 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
1459 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
1460 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
1461 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
1462 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
1463 : // difference in the responses between V1 and V2.
1464 : //
1465 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
1466 : // as well as other fields from requests, which allows to verify that we receive response for our request.
1467 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
1468 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
1469 : //
1470 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1471 : pub enum PagestreamProtocolVersion {
1472 : V2,
1473 : V3,
1474 : }
1475 :
1476 : pub type RequestId = u64;
1477 :
1478 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1479 : pub struct PagestreamRequest {
1480 : pub reqid: RequestId,
1481 : pub request_lsn: Lsn,
1482 : pub not_modified_since: Lsn,
1483 : }
1484 :
1485 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1486 : pub struct PagestreamExistsRequest {
1487 : pub hdr: PagestreamRequest,
1488 : pub rel: RelTag,
1489 : }
1490 :
1491 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1492 : pub struct PagestreamNblocksRequest {
1493 : pub hdr: PagestreamRequest,
1494 : pub rel: RelTag,
1495 : }
1496 :
1497 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1498 : pub struct PagestreamGetPageRequest {
1499 : pub hdr: PagestreamRequest,
1500 : pub rel: RelTag,
1501 : pub blkno: u32,
1502 : }
1503 :
1504 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1505 : pub struct PagestreamDbSizeRequest {
1506 : pub hdr: PagestreamRequest,
1507 : pub dbnode: u32,
1508 : }
1509 :
1510 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1511 : pub struct PagestreamGetSlruSegmentRequest {
1512 : pub hdr: PagestreamRequest,
1513 : pub kind: u8,
1514 : pub segno: u32,
1515 : }
1516 :
1517 : #[derive(Debug)]
1518 : pub struct PagestreamExistsResponse {
1519 : pub req: PagestreamExistsRequest,
1520 : pub exists: bool,
1521 : }
1522 :
1523 : #[derive(Debug)]
1524 : pub struct PagestreamNblocksResponse {
1525 : pub req: PagestreamNblocksRequest,
1526 : pub n_blocks: u32,
1527 : }
1528 :
1529 : #[derive(Debug)]
1530 : pub struct PagestreamGetPageResponse {
1531 : pub req: PagestreamGetPageRequest,
1532 : pub page: Bytes,
1533 : }
1534 :
1535 : #[derive(Debug)]
1536 : pub struct PagestreamGetSlruSegmentResponse {
1537 : pub req: PagestreamGetSlruSegmentRequest,
1538 : pub segment: Bytes,
1539 : }
1540 :
1541 : #[derive(Debug)]
1542 : pub struct PagestreamErrorResponse {
1543 : pub req: PagestreamRequest,
1544 : pub message: String,
1545 : }
1546 :
1547 : #[derive(Debug)]
1548 : pub struct PagestreamDbSizeResponse {
1549 : pub req: PagestreamDbSizeRequest,
1550 : pub db_size: i64,
1551 : }
1552 :
1553 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
1554 : // that require pageserver-internal types. It is sufficient to get the total size.
1555 0 : #[derive(Serialize, Deserialize, Debug)]
1556 : pub struct TenantHistorySize {
1557 : pub id: TenantId,
1558 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1559 : ///
1560 : /// Will be none if `?inputs_only=true` was given.
1561 : pub size: Option<u64>,
1562 : }
1563 :
1564 : impl PagestreamFeMessage {
1565 : /// Serialize a compute -> pageserver message. This is currently only used in testing
1566 : /// tools. Always uses protocol version 3.
1567 4 : pub fn serialize(&self) -> Bytes {
1568 4 : let mut bytes = BytesMut::new();
1569 4 :
1570 4 : match self {
1571 1 : Self::Exists(req) => {
1572 1 : bytes.put_u8(0);
1573 1 : bytes.put_u64(req.hdr.reqid);
1574 1 : bytes.put_u64(req.hdr.request_lsn.0);
1575 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1576 1 : bytes.put_u32(req.rel.spcnode);
1577 1 : bytes.put_u32(req.rel.dbnode);
1578 1 : bytes.put_u32(req.rel.relnode);
1579 1 : bytes.put_u8(req.rel.forknum);
1580 1 : }
1581 :
1582 1 : Self::Nblocks(req) => {
1583 1 : bytes.put_u8(1);
1584 1 : bytes.put_u64(req.hdr.reqid);
1585 1 : bytes.put_u64(req.hdr.request_lsn.0);
1586 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1587 1 : bytes.put_u32(req.rel.spcnode);
1588 1 : bytes.put_u32(req.rel.dbnode);
1589 1 : bytes.put_u32(req.rel.relnode);
1590 1 : bytes.put_u8(req.rel.forknum);
1591 1 : }
1592 :
1593 1 : Self::GetPage(req) => {
1594 1 : bytes.put_u8(2);
1595 1 : bytes.put_u64(req.hdr.reqid);
1596 1 : bytes.put_u64(req.hdr.request_lsn.0);
1597 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1598 1 : bytes.put_u32(req.rel.spcnode);
1599 1 : bytes.put_u32(req.rel.dbnode);
1600 1 : bytes.put_u32(req.rel.relnode);
1601 1 : bytes.put_u8(req.rel.forknum);
1602 1 : bytes.put_u32(req.blkno);
1603 1 : }
1604 :
1605 1 : Self::DbSize(req) => {
1606 1 : bytes.put_u8(3);
1607 1 : bytes.put_u64(req.hdr.reqid);
1608 1 : bytes.put_u64(req.hdr.request_lsn.0);
1609 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1610 1 : bytes.put_u32(req.dbnode);
1611 1 : }
1612 :
1613 0 : Self::GetSlruSegment(req) => {
1614 0 : bytes.put_u8(4);
1615 0 : bytes.put_u64(req.hdr.reqid);
1616 0 : bytes.put_u64(req.hdr.request_lsn.0);
1617 0 : bytes.put_u64(req.hdr.not_modified_since.0);
1618 0 : bytes.put_u8(req.kind);
1619 0 : bytes.put_u32(req.segno);
1620 0 : }
1621 : }
1622 :
1623 4 : bytes.into()
1624 4 : }
1625 :
1626 4 : pub fn parse<R: std::io::Read>(
1627 4 : body: &mut R,
1628 4 : protocol_version: PagestreamProtocolVersion,
1629 4 : ) -> anyhow::Result<PagestreamFeMessage> {
1630 : // these correspond to the NeonMessageTag enum in pagestore_client.h
1631 : //
1632 : // TODO: consider using protobuf or serde bincode for less error prone
1633 : // serialization.
1634 4 : let msg_tag = body.read_u8()?;
1635 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
1636 : PagestreamProtocolVersion::V2 => (
1637 : 0,
1638 0 : Lsn::from(body.read_u64::<BigEndian>()?),
1639 0 : Lsn::from(body.read_u64::<BigEndian>()?),
1640 : ),
1641 : PagestreamProtocolVersion::V3 => (
1642 4 : body.read_u64::<BigEndian>()?,
1643 4 : Lsn::from(body.read_u64::<BigEndian>()?),
1644 4 : Lsn::from(body.read_u64::<BigEndian>()?),
1645 : ),
1646 : };
1647 :
1648 4 : match msg_tag {
1649 : 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
1650 1 : hdr: PagestreamRequest {
1651 1 : reqid,
1652 1 : request_lsn,
1653 1 : not_modified_since,
1654 1 : },
1655 1 : rel: RelTag {
1656 1 : spcnode: body.read_u32::<BigEndian>()?,
1657 1 : dbnode: body.read_u32::<BigEndian>()?,
1658 1 : relnode: body.read_u32::<BigEndian>()?,
1659 1 : forknum: body.read_u8()?,
1660 : },
1661 : })),
1662 : 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1663 1 : hdr: PagestreamRequest {
1664 1 : reqid,
1665 1 : request_lsn,
1666 1 : not_modified_since,
1667 1 : },
1668 1 : rel: RelTag {
1669 1 : spcnode: body.read_u32::<BigEndian>()?,
1670 1 : dbnode: body.read_u32::<BigEndian>()?,
1671 1 : relnode: body.read_u32::<BigEndian>()?,
1672 1 : forknum: body.read_u8()?,
1673 : },
1674 : })),
1675 : 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1676 1 : hdr: PagestreamRequest {
1677 1 : reqid,
1678 1 : request_lsn,
1679 1 : not_modified_since,
1680 1 : },
1681 1 : rel: RelTag {
1682 1 : spcnode: body.read_u32::<BigEndian>()?,
1683 1 : dbnode: body.read_u32::<BigEndian>()?,
1684 1 : relnode: body.read_u32::<BigEndian>()?,
1685 1 : forknum: body.read_u8()?,
1686 : },
1687 1 : blkno: body.read_u32::<BigEndian>()?,
1688 : })),
1689 : 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1690 1 : hdr: PagestreamRequest {
1691 1 : reqid,
1692 1 : request_lsn,
1693 1 : not_modified_since,
1694 1 : },
1695 1 : dbnode: body.read_u32::<BigEndian>()?,
1696 : })),
1697 : 4 => Ok(PagestreamFeMessage::GetSlruSegment(
1698 : PagestreamGetSlruSegmentRequest {
1699 0 : hdr: PagestreamRequest {
1700 0 : reqid,
1701 0 : request_lsn,
1702 0 : not_modified_since,
1703 0 : },
1704 0 : kind: body.read_u8()?,
1705 0 : segno: body.read_u32::<BigEndian>()?,
1706 : },
1707 : )),
1708 0 : _ => bail!("unknown smgr message tag: {:?}", msg_tag),
1709 : }
1710 4 : }
1711 : }
1712 :
1713 : impl PagestreamBeMessage {
1714 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
1715 0 : let mut bytes = BytesMut::new();
1716 :
1717 : use PagestreamBeMessageTag as Tag;
1718 0 : match protocol_version {
1719 : PagestreamProtocolVersion::V2 => {
1720 0 : match self {
1721 0 : Self::Exists(resp) => {
1722 0 : bytes.put_u8(Tag::Exists as u8);
1723 0 : bytes.put_u8(resp.exists as u8);
1724 0 : }
1725 :
1726 0 : Self::Nblocks(resp) => {
1727 0 : bytes.put_u8(Tag::Nblocks as u8);
1728 0 : bytes.put_u32(resp.n_blocks);
1729 0 : }
1730 :
1731 0 : Self::GetPage(resp) => {
1732 0 : bytes.put_u8(Tag::GetPage as u8);
1733 0 : bytes.put(&resp.page[..])
1734 : }
1735 :
1736 0 : Self::Error(resp) => {
1737 0 : bytes.put_u8(Tag::Error as u8);
1738 0 : bytes.put(resp.message.as_bytes());
1739 0 : bytes.put_u8(0); // null terminator
1740 0 : }
1741 0 : Self::DbSize(resp) => {
1742 0 : bytes.put_u8(Tag::DbSize as u8);
1743 0 : bytes.put_i64(resp.db_size);
1744 0 : }
1745 :
1746 0 : Self::GetSlruSegment(resp) => {
1747 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1748 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1749 0 : bytes.put(&resp.segment[..]);
1750 0 : }
1751 : }
1752 : }
1753 : PagestreamProtocolVersion::V3 => {
1754 0 : match self {
1755 0 : Self::Exists(resp) => {
1756 0 : bytes.put_u8(Tag::Exists as u8);
1757 0 : bytes.put_u64(resp.req.hdr.reqid);
1758 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1759 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1760 0 : bytes.put_u32(resp.req.rel.spcnode);
1761 0 : bytes.put_u32(resp.req.rel.dbnode);
1762 0 : bytes.put_u32(resp.req.rel.relnode);
1763 0 : bytes.put_u8(resp.req.rel.forknum);
1764 0 : bytes.put_u8(resp.exists as u8);
1765 0 : }
1766 :
1767 0 : Self::Nblocks(resp) => {
1768 0 : bytes.put_u8(Tag::Nblocks as u8);
1769 0 : bytes.put_u64(resp.req.hdr.reqid);
1770 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1771 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1772 0 : bytes.put_u32(resp.req.rel.spcnode);
1773 0 : bytes.put_u32(resp.req.rel.dbnode);
1774 0 : bytes.put_u32(resp.req.rel.relnode);
1775 0 : bytes.put_u8(resp.req.rel.forknum);
1776 0 : bytes.put_u32(resp.n_blocks);
1777 0 : }
1778 :
1779 0 : Self::GetPage(resp) => {
1780 0 : bytes.put_u8(Tag::GetPage as u8);
1781 0 : bytes.put_u64(resp.req.hdr.reqid);
1782 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1783 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1784 0 : bytes.put_u32(resp.req.rel.spcnode);
1785 0 : bytes.put_u32(resp.req.rel.dbnode);
1786 0 : bytes.put_u32(resp.req.rel.relnode);
1787 0 : bytes.put_u8(resp.req.rel.forknum);
1788 0 : bytes.put_u32(resp.req.blkno);
1789 0 : bytes.put(&resp.page[..])
1790 : }
1791 :
1792 0 : Self::Error(resp) => {
1793 0 : bytes.put_u8(Tag::Error as u8);
1794 0 : bytes.put_u64(resp.req.reqid);
1795 0 : bytes.put_u64(resp.req.request_lsn.0);
1796 0 : bytes.put_u64(resp.req.not_modified_since.0);
1797 0 : bytes.put(resp.message.as_bytes());
1798 0 : bytes.put_u8(0); // null terminator
1799 0 : }
1800 0 : Self::DbSize(resp) => {
1801 0 : bytes.put_u8(Tag::DbSize as u8);
1802 0 : bytes.put_u64(resp.req.hdr.reqid);
1803 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1804 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1805 0 : bytes.put_u32(resp.req.dbnode);
1806 0 : bytes.put_i64(resp.db_size);
1807 0 : }
1808 :
1809 0 : Self::GetSlruSegment(resp) => {
1810 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1811 0 : bytes.put_u64(resp.req.hdr.reqid);
1812 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
1813 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
1814 0 : bytes.put_u8(resp.req.kind);
1815 0 : bytes.put_u32(resp.req.segno);
1816 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1817 0 : bytes.put(&resp.segment[..]);
1818 0 : }
1819 : }
1820 : }
1821 : }
1822 0 : bytes.into()
1823 0 : }
1824 :
1825 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
1826 0 : let mut buf = buf.reader();
1827 0 : let msg_tag = buf.read_u8()?;
1828 :
1829 : use PagestreamBeMessageTag as Tag;
1830 0 : let ok =
1831 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
1832 : Tag::Exists => {
1833 0 : let reqid = buf.read_u64::<BigEndian>()?;
1834 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
1835 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
1836 0 : let rel = RelTag {
1837 0 : spcnode: buf.read_u32::<BigEndian>()?,
1838 0 : dbnode: buf.read_u32::<BigEndian>()?,
1839 0 : relnode: buf.read_u32::<BigEndian>()?,
1840 0 : forknum: buf.read_u8()?,
1841 : };
1842 0 : let exists = buf.read_u8()? != 0;
1843 0 : Self::Exists(PagestreamExistsResponse {
1844 0 : req: PagestreamExistsRequest {
1845 0 : hdr: PagestreamRequest {
1846 0 : reqid,
1847 0 : request_lsn,
1848 0 : not_modified_since,
1849 0 : },
1850 0 : rel,
1851 0 : },
1852 0 : exists,
1853 0 : })
1854 : }
1855 : Tag::Nblocks => {
1856 0 : let reqid = buf.read_u64::<BigEndian>()?;
1857 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
1858 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
1859 0 : let rel = RelTag {
1860 0 : spcnode: buf.read_u32::<BigEndian>()?,
1861 0 : dbnode: buf.read_u32::<BigEndian>()?,
1862 0 : relnode: buf.read_u32::<BigEndian>()?,
1863 0 : forknum: buf.read_u8()?,
1864 : };
1865 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1866 0 : Self::Nblocks(PagestreamNblocksResponse {
1867 0 : req: PagestreamNblocksRequest {
1868 0 : hdr: PagestreamRequest {
1869 0 : reqid,
1870 0 : request_lsn,
1871 0 : not_modified_since,
1872 0 : },
1873 0 : rel,
1874 0 : },
1875 0 : n_blocks,
1876 0 : })
1877 : }
1878 : Tag::GetPage => {
1879 0 : let reqid = buf.read_u64::<BigEndian>()?;
1880 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
1881 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
1882 0 : let rel = RelTag {
1883 0 : spcnode: buf.read_u32::<BigEndian>()?,
1884 0 : dbnode: buf.read_u32::<BigEndian>()?,
1885 0 : relnode: buf.read_u32::<BigEndian>()?,
1886 0 : forknum: buf.read_u8()?,
1887 : };
1888 0 : let blkno = buf.read_u32::<BigEndian>()?;
1889 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
1890 0 : buf.read_exact(&mut page)?;
1891 0 : Self::GetPage(PagestreamGetPageResponse {
1892 0 : req: PagestreamGetPageRequest {
1893 0 : hdr: PagestreamRequest {
1894 0 : reqid,
1895 0 : request_lsn,
1896 0 : not_modified_since,
1897 0 : },
1898 0 : rel,
1899 0 : blkno,
1900 0 : },
1901 0 : page: page.into(),
1902 0 : })
1903 : }
1904 : Tag::Error => {
1905 0 : let reqid = buf.read_u64::<BigEndian>()?;
1906 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
1907 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
1908 0 : let mut msg = Vec::new();
1909 0 : buf.read_until(0, &mut msg)?;
1910 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
1911 0 : let rust_str = cstring.to_str()?;
1912 0 : Self::Error(PagestreamErrorResponse {
1913 0 : req: PagestreamRequest {
1914 0 : reqid,
1915 0 : request_lsn,
1916 0 : not_modified_since,
1917 0 : },
1918 0 : message: rust_str.to_owned(),
1919 0 : })
1920 : }
1921 : Tag::DbSize => {
1922 0 : let reqid = buf.read_u64::<BigEndian>()?;
1923 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
1924 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
1925 0 : let dbnode = buf.read_u32::<BigEndian>()?;
1926 0 : let db_size = buf.read_i64::<BigEndian>()?;
1927 0 : Self::DbSize(PagestreamDbSizeResponse {
1928 0 : req: PagestreamDbSizeRequest {
1929 0 : hdr: PagestreamRequest {
1930 0 : reqid,
1931 0 : request_lsn,
1932 0 : not_modified_since,
1933 0 : },
1934 0 : dbnode,
1935 0 : },
1936 0 : db_size,
1937 0 : })
1938 : }
1939 : Tag::GetSlruSegment => {
1940 0 : let reqid = buf.read_u64::<BigEndian>()?;
1941 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
1942 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
1943 0 : let kind = buf.read_u8()?;
1944 0 : let segno = buf.read_u32::<BigEndian>()?;
1945 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
1946 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
1947 0 : buf.read_exact(&mut segment)?;
1948 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
1949 0 : req: PagestreamGetSlruSegmentRequest {
1950 0 : hdr: PagestreamRequest {
1951 0 : reqid,
1952 0 : request_lsn,
1953 0 : not_modified_since,
1954 0 : },
1955 0 : kind,
1956 0 : segno,
1957 0 : },
1958 0 : segment: segment.into(),
1959 0 : })
1960 : }
1961 : };
1962 0 : let remaining = buf.into_inner();
1963 0 : if !remaining.is_empty() {
1964 0 : anyhow::bail!(
1965 0 : "remaining bytes in msg with tag={msg_tag}: {}",
1966 0 : remaining.len()
1967 0 : );
1968 0 : }
1969 0 : Ok(ok)
1970 0 : }
1971 :
1972 0 : pub fn kind(&self) -> &'static str {
1973 0 : match self {
1974 0 : Self::Exists(_) => "Exists",
1975 0 : Self::Nblocks(_) => "Nblocks",
1976 0 : Self::GetPage(_) => "GetPage",
1977 0 : Self::Error(_) => "Error",
1978 0 : Self::DbSize(_) => "DbSize",
1979 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
1980 : }
1981 0 : }
1982 : }
1983 :
1984 0 : #[derive(Debug, Serialize, Deserialize)]
1985 : pub struct PageTraceEvent {
1986 : pub key: CompactKey,
1987 : pub effective_lsn: Lsn,
1988 : pub time: SystemTime,
1989 : }
1990 :
1991 : impl Default for PageTraceEvent {
1992 0 : fn default() -> Self {
1993 0 : Self {
1994 0 : key: Default::default(),
1995 0 : effective_lsn: Default::default(),
1996 0 : time: std::time::UNIX_EPOCH,
1997 0 : }
1998 0 : }
1999 : }
2000 :
2001 : #[cfg(test)]
2002 : mod tests {
2003 : use serde_json::json;
2004 : use std::str::FromStr;
2005 :
2006 : use super::*;
2007 :
2008 : #[test]
2009 1 : fn test_pagestream() {
2010 1 : // Test serialization/deserialization of PagestreamFeMessage
2011 1 : let messages = vec![
2012 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
2013 1 : hdr: PagestreamRequest {
2014 1 : reqid: 0,
2015 1 : request_lsn: Lsn(4),
2016 1 : not_modified_since: Lsn(3),
2017 1 : },
2018 1 : rel: RelTag {
2019 1 : forknum: 1,
2020 1 : spcnode: 2,
2021 1 : dbnode: 3,
2022 1 : relnode: 4,
2023 1 : },
2024 1 : }),
2025 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2026 1 : hdr: PagestreamRequest {
2027 1 : reqid: 0,
2028 1 : request_lsn: Lsn(4),
2029 1 : not_modified_since: Lsn(4),
2030 1 : },
2031 1 : rel: RelTag {
2032 1 : forknum: 1,
2033 1 : spcnode: 2,
2034 1 : dbnode: 3,
2035 1 : relnode: 4,
2036 1 : },
2037 1 : }),
2038 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2039 1 : hdr: PagestreamRequest {
2040 1 : reqid: 0,
2041 1 : request_lsn: Lsn(4),
2042 1 : not_modified_since: Lsn(3),
2043 1 : },
2044 1 : rel: RelTag {
2045 1 : forknum: 1,
2046 1 : spcnode: 2,
2047 1 : dbnode: 3,
2048 1 : relnode: 4,
2049 1 : },
2050 1 : blkno: 7,
2051 1 : }),
2052 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2053 1 : hdr: PagestreamRequest {
2054 1 : reqid: 0,
2055 1 : request_lsn: Lsn(4),
2056 1 : not_modified_since: Lsn(3),
2057 1 : },
2058 1 : dbnode: 7,
2059 1 : }),
2060 1 : ];
2061 5 : for msg in messages {
2062 4 : let bytes = msg.serialize();
2063 4 : let reconstructed =
2064 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
2065 4 : .unwrap();
2066 4 : assert!(msg == reconstructed);
2067 : }
2068 1 : }
2069 :
2070 : #[test]
2071 1 : fn test_tenantinfo_serde() {
2072 1 : // Test serialization/deserialization of TenantInfo
2073 1 : let original_active = TenantInfo {
2074 1 : id: TenantShardId::unsharded(TenantId::generate()),
2075 1 : state: TenantState::Active,
2076 1 : current_physical_size: Some(42),
2077 1 : attachment_status: TenantAttachmentStatus::Attached,
2078 1 : generation: 1,
2079 1 : gc_blocking: None,
2080 1 : };
2081 1 : let expected_active = json!({
2082 1 : "id": original_active.id.to_string(),
2083 1 : "state": {
2084 1 : "slug": "Active",
2085 1 : },
2086 1 : "current_physical_size": 42,
2087 1 : "attachment_status": {
2088 1 : "slug":"attached",
2089 1 : },
2090 1 : "generation" : 1
2091 1 : });
2092 1 :
2093 1 : let original_broken = TenantInfo {
2094 1 : id: TenantShardId::unsharded(TenantId::generate()),
2095 1 : state: TenantState::Broken {
2096 1 : reason: "reason".into(),
2097 1 : backtrace: "backtrace info".into(),
2098 1 : },
2099 1 : current_physical_size: Some(42),
2100 1 : attachment_status: TenantAttachmentStatus::Attached,
2101 1 : generation: 1,
2102 1 : gc_blocking: None,
2103 1 : };
2104 1 : let expected_broken = json!({
2105 1 : "id": original_broken.id.to_string(),
2106 1 : "state": {
2107 1 : "slug": "Broken",
2108 1 : "data": {
2109 1 : "backtrace": "backtrace info",
2110 1 : "reason": "reason",
2111 1 : }
2112 1 : },
2113 1 : "current_physical_size": 42,
2114 1 : "attachment_status": {
2115 1 : "slug":"attached",
2116 1 : },
2117 1 : "generation" : 1
2118 1 : });
2119 1 :
2120 1 : assert_eq!(
2121 1 : serde_json::to_value(&original_active).unwrap(),
2122 1 : expected_active
2123 1 : );
2124 :
2125 1 : assert_eq!(
2126 1 : serde_json::to_value(&original_broken).unwrap(),
2127 1 : expected_broken
2128 1 : );
2129 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
2130 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
2131 1 : }
2132 :
2133 : #[test]
2134 1 : fn test_reject_unknown_field() {
2135 1 : let id = TenantId::generate();
2136 1 : let config_request = json!({
2137 1 : "tenant_id": id.to_string(),
2138 1 : "unknown_field": "unknown_value".to_string(),
2139 1 : });
2140 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
2141 1 : assert!(
2142 1 : err.to_string().contains("unknown field `unknown_field`"),
2143 0 : "expect unknown field `unknown_field` error, got: {}",
2144 : err
2145 : );
2146 1 : }
2147 :
2148 : #[test]
2149 1 : fn tenantstatus_activating_serde() {
2150 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
2151 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
2152 1 :
2153 1 : let actual = serde_json::to_string(&states).unwrap();
2154 1 :
2155 1 : assert_eq!(actual, expected);
2156 :
2157 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
2158 1 :
2159 1 : assert_eq!(states.as_slice(), &parsed);
2160 1 : }
2161 :
2162 : #[test]
2163 1 : fn tenantstatus_activating_strum() {
2164 1 : // tests added, because we use these for metrics
2165 1 : let examples = [
2166 1 : (line!(), TenantState::Attaching, "Attaching"),
2167 1 : (
2168 1 : line!(),
2169 1 : TenantState::Activating(ActivatingFrom::Attaching),
2170 1 : "Activating",
2171 1 : ),
2172 1 : (line!(), TenantState::Active, "Active"),
2173 1 : (
2174 1 : line!(),
2175 1 : TenantState::Stopping {
2176 1 : progress: utils::completion::Barrier::default(),
2177 1 : },
2178 1 : "Stopping",
2179 1 : ),
2180 1 : (
2181 1 : line!(),
2182 1 : TenantState::Broken {
2183 1 : reason: "Example".into(),
2184 1 : backtrace: "Looooong backtrace".into(),
2185 1 : },
2186 1 : "Broken",
2187 1 : ),
2188 1 : ];
2189 :
2190 6 : for (line, rendered, expected) in examples {
2191 5 : let actual: &'static str = rendered.into();
2192 5 : assert_eq!(actual, expected, "example on {line}");
2193 : }
2194 1 : }
2195 :
2196 : #[test]
2197 1 : fn test_image_compression_algorithm_parsing() {
2198 : use ImageCompressionAlgorithm::*;
2199 1 : let cases = [
2200 1 : ("disabled", Disabled),
2201 1 : ("zstd", Zstd { level: None }),
2202 1 : ("zstd(18)", Zstd { level: Some(18) }),
2203 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
2204 1 : ];
2205 :
2206 5 : for (display, expected) in cases {
2207 4 : assert_eq!(
2208 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
2209 : expected,
2210 0 : "parsing works"
2211 : );
2212 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
2213 :
2214 4 : let ser = serde_json::to_string(&expected).expect("serialization");
2215 4 : assert_eq!(
2216 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
2217 : expected,
2218 0 : "serde roundtrip"
2219 : );
2220 :
2221 4 : assert_eq!(
2222 4 : serde_json::Value::String(display.to_string()),
2223 4 : serde_json::to_value(expected).unwrap(),
2224 0 : "Display is the serde serialization"
2225 : );
2226 : }
2227 1 : }
2228 :
2229 : #[test]
2230 1 : fn test_tenant_config_patch_request_serde() {
2231 1 : let patch_request = TenantConfigPatchRequest {
2232 1 : tenant_id: TenantId::from_str("17c6d121946a61e5ab0fe5a2fd4d8215").unwrap(),
2233 1 : config: TenantConfigPatch {
2234 1 : checkpoint_distance: FieldPatch::Upsert(42),
2235 1 : gc_horizon: FieldPatch::Remove,
2236 1 : compaction_threshold: FieldPatch::Noop,
2237 1 : ..TenantConfigPatch::default()
2238 1 : },
2239 1 : };
2240 1 :
2241 1 : let json = serde_json::to_string(&patch_request).unwrap();
2242 1 :
2243 1 : let expected = r#"{"tenant_id":"17c6d121946a61e5ab0fe5a2fd4d8215","checkpoint_distance":42,"gc_horizon":null}"#;
2244 1 : assert_eq!(json, expected);
2245 :
2246 1 : let decoded: TenantConfigPatchRequest = serde_json::from_str(&json).unwrap();
2247 1 : assert_eq!(decoded.tenant_id, patch_request.tenant_id);
2248 1 : assert_eq!(decoded.config, patch_request.config);
2249 :
2250 : // Now apply the patch to a config to demonstrate semantics
2251 :
2252 1 : let base = TenantConfig {
2253 1 : checkpoint_distance: Some(28),
2254 1 : gc_horizon: Some(100),
2255 1 : compaction_target_size: Some(1024),
2256 1 : ..Default::default()
2257 1 : };
2258 1 :
2259 1 : let expected = TenantConfig {
2260 1 : checkpoint_distance: Some(42),
2261 1 : gc_horizon: None,
2262 1 : ..base.clone()
2263 1 : };
2264 1 :
2265 1 : let patched = base.apply_patch(decoded.config);
2266 1 :
2267 1 : assert_eq!(patched, expected);
2268 1 : }
2269 : }
|