Line data Source code
1 : pub mod detach_ancestor;
2 : pub mod partitioning;
3 : pub mod utilization;
4 :
5 : use core::ops::Range;
6 : use std::collections::HashMap;
7 : use std::fmt::Display;
8 : use std::io::{BufRead, Read};
9 : use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
10 : use std::str::FromStr;
11 : use std::time::{Duration, SystemTime};
12 :
13 : use byteorder::{BigEndian, ReadBytesExt};
14 : use bytes::{Buf, BufMut, Bytes, BytesMut};
15 : #[cfg(feature = "testing")]
16 : use camino::Utf8PathBuf;
17 : use postgres_ffi::BLCKSZ;
18 : use serde::{Deserialize, Deserializer, Serialize, Serializer};
19 : use serde_with::serde_as;
20 : pub use utilization::PageserverUtilization;
21 : use utils::id::{NodeId, TenantId, TimelineId};
22 : use utils::lsn::Lsn;
23 : use utils::postgres_client::PostgresClientProtocol;
24 : use utils::{completion, serde_system_time};
25 :
26 : use crate::key::{CompactKey, Key};
27 : use crate::reltag::RelTag;
28 : use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
29 :
30 : /// The state of a tenant in this pageserver.
31 : ///
32 : /// ```mermaid
33 : /// stateDiagram-v2
34 : ///
35 : /// [*] --> Attaching: spawn_attach()
36 : ///
37 : /// Attaching --> Activating: activate()
38 : /// Activating --> Active: infallible
39 : ///
40 : /// Attaching --> Broken: attach() failure
41 : ///
42 : /// Active --> Stopping: set_stopping(), part of shutdown & detach
43 : /// Stopping --> Broken: late error in remove_tenant_from_memory
44 : ///
45 : /// Broken --> [*]: ignore / detach / shutdown
46 : /// Stopping --> [*]: remove_from_memory complete
47 : ///
48 : /// Active --> Broken: cfg(testing)-only tenant break point
49 : /// ```
50 : #[derive(
51 : Clone,
52 : PartialEq,
53 : Eq,
54 0 : serde::Serialize,
55 1 : serde::Deserialize,
56 : strum_macros::Display,
57 : strum_macros::VariantNames,
58 : strum_macros::AsRefStr,
59 : strum_macros::IntoStaticStr,
60 : )]
61 : #[serde(tag = "slug", content = "data")]
62 : pub enum TenantState {
63 : /// This tenant is being attached to the pageserver.
64 : ///
65 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
66 : Attaching,
67 : /// The tenant is transitioning from Loading/Attaching to Active.
68 : ///
69 : /// While in this state, the individual timelines are being activated.
70 : ///
71 : /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
72 : Activating(ActivatingFrom),
73 : /// The tenant has finished activating and is open for business.
74 : ///
75 : /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
76 : Active,
77 : /// The tenant is recognized by pageserver, but it is being detached or the
78 : /// system is being shut down.
79 : ///
80 : /// Transitions out of this state are possible through `set_broken()`.
81 : Stopping {
82 : // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
83 : // otherwise it will not be skipped during deserialization
84 : #[serde(skip)]
85 : progress: completion::Barrier,
86 : },
87 : /// The tenant is recognized by the pageserver, but can no longer be used for
88 : /// any operations.
89 : ///
90 : /// If the tenant fails to load or attach, it will transition to this state
91 : /// and it is guaranteed that no background tasks are running in its name.
92 : ///
93 : /// The other way to transition into this state is from `Stopping` state
94 : /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
95 : /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
96 : Broken { reason: String, backtrace: String },
97 : }
98 :
99 : impl TenantState {
100 0 : pub fn attachment_status(&self) -> TenantAttachmentStatus {
101 : use TenantAttachmentStatus::*;
102 :
103 : // Below TenantState::Activating is used as "transient" or "transparent" state for
104 : // attachment_status determining.
105 0 : match self {
106 : // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
107 : // So, technically, we can return Attached here.
108 : // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
109 : // But, our attach task might still be fetching the remote timelines, etc.
110 : // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
111 0 : Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
112 : // We only reach Active after successful load / attach.
113 : // So, call atttachment status Attached.
114 0 : Self::Active => Attached,
115 : // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
116 : // However, it also becomes Broken if the regular load fails.
117 : // From Console's perspective there's no practical difference
118 : // because attachment_status is polled by console only during attach operation execution.
119 0 : Self::Broken { reason, .. } => Failed {
120 0 : reason: reason.to_owned(),
121 0 : },
122 : // Why is Stopping a Maybe case? Because, during pageserver shutdown,
123 : // we set the Stopping state irrespective of whether the tenant
124 : // has finished attaching or not.
125 0 : Self::Stopping { .. } => Maybe,
126 : }
127 0 : }
128 :
129 0 : pub fn broken_from_reason(reason: String) -> Self {
130 0 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
131 0 : Self::Broken {
132 0 : reason,
133 0 : backtrace: backtrace_str,
134 0 : }
135 0 : }
136 : }
137 :
138 : impl std::fmt::Debug for TenantState {
139 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 2 : match self {
141 2 : Self::Broken { reason, backtrace } if !reason.is_empty() => {
142 2 : write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
143 : }
144 0 : _ => write!(f, "{self}"),
145 : }
146 2 : }
147 : }
148 :
149 : /// A temporary lease to a specific lsn inside a timeline.
150 : /// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
151 : #[serde_as]
152 0 : #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
153 : pub struct LsnLease {
154 : #[serde_as(as = "SystemTimeAsRfc3339Millis")]
155 : pub valid_until: SystemTime,
156 : }
157 :
158 : serde_with::serde_conv!(
159 : SystemTimeAsRfc3339Millis,
160 : SystemTime,
161 0 : |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
162 0 : |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
163 : );
164 :
165 : impl LsnLease {
166 : /// The default length for an explicit LSN lease request (10 minutes).
167 : pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
168 :
169 : /// The default length for an implicit LSN lease granted during
170 : /// `get_lsn_by_timestamp` request (1 minutes).
171 : pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
172 :
173 : /// Checks whether the lease is expired.
174 12 : pub fn is_expired(&self, now: &SystemTime) -> bool {
175 12 : now > &self.valid_until
176 12 : }
177 : }
178 :
179 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
180 : ///
181 : /// XXX: We used to have more variants here, but now it's just one, which makes this rather
182 : /// useless. Remove, once we've checked that there's no client code left that looks at this.
183 1 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
184 : pub enum ActivatingFrom {
185 : /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
186 : Attaching,
187 : }
188 :
189 : /// A state of a timeline in pageserver's memory.
190 0 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
191 : pub enum TimelineState {
192 : /// The timeline is recognized by the pageserver but is not yet operational.
193 : /// In particular, the walreceiver connection loop is not running for this timeline.
194 : /// It will eventually transition to state Active or Broken.
195 : Loading,
196 : /// The timeline is fully operational.
197 : /// It can be queried, and the walreceiver connection loop is running.
198 : Active,
199 : /// The timeline was previously Loading or Active but is shutting down.
200 : /// It cannot transition back into any other state.
201 : Stopping,
202 : /// The timeline is broken and not operational (previous states: Loading or Active).
203 : Broken { reason: String, backtrace: String },
204 : }
205 :
206 : #[serde_with::serde_as]
207 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
208 : pub struct CompactLsnRange {
209 : pub start: Lsn,
210 : pub end: Lsn,
211 : }
212 :
213 : #[serde_with::serde_as]
214 0 : #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
215 : pub struct CompactKeyRange {
216 : #[serde_as(as = "serde_with::DisplayFromStr")]
217 : pub start: Key,
218 : #[serde_as(as = "serde_with::DisplayFromStr")]
219 : pub end: Key,
220 : }
221 :
222 : impl From<Range<Lsn>> for CompactLsnRange {
223 12 : fn from(range: Range<Lsn>) -> Self {
224 12 : Self {
225 12 : start: range.start,
226 12 : end: range.end,
227 12 : }
228 12 : }
229 : }
230 :
231 : impl From<Range<Key>> for CompactKeyRange {
232 32 : fn from(range: Range<Key>) -> Self {
233 32 : Self {
234 32 : start: range.start,
235 32 : end: range.end,
236 32 : }
237 32 : }
238 : }
239 :
240 : impl From<CompactLsnRange> for Range<Lsn> {
241 20 : fn from(range: CompactLsnRange) -> Self {
242 20 : range.start..range.end
243 20 : }
244 : }
245 :
246 : impl From<CompactKeyRange> for Range<Key> {
247 32 : fn from(range: CompactKeyRange) -> Self {
248 32 : range.start..range.end
249 32 : }
250 : }
251 :
252 : impl CompactLsnRange {
253 8 : pub fn above(lsn: Lsn) -> Self {
254 8 : Self {
255 8 : start: lsn,
256 8 : end: Lsn::MAX,
257 8 : }
258 8 : }
259 : }
260 :
261 : #[derive(Debug, Clone, Serialize)]
262 : pub struct CompactInfoResponse {
263 : pub compact_key_range: Option<CompactKeyRange>,
264 : pub compact_lsn_range: Option<CompactLsnRange>,
265 : pub sub_compaction: bool,
266 : pub running: bool,
267 : pub job_id: usize,
268 : }
269 :
270 0 : #[derive(Serialize, Deserialize, Clone)]
271 : pub struct TimelineCreateRequest {
272 : pub new_timeline_id: TimelineId,
273 : #[serde(flatten)]
274 : pub mode: TimelineCreateRequestMode,
275 : }
276 :
277 0 : #[derive(Serialize, Deserialize, Clone)]
278 : #[serde(untagged)]
279 : pub enum TimelineCreateRequestMode {
280 : Branch {
281 : ancestor_timeline_id: TimelineId,
282 : #[serde(default)]
283 : ancestor_start_lsn: Option<Lsn>,
284 : // TODO: cplane sets this, but, the branching code always
285 : // inherits the ancestor's pg_version. Earlier code wasn't
286 : // using a flattened enum, so, it was an accepted field, and
287 : // we continue to accept it by having it here.
288 : pg_version: Option<u32>,
289 : },
290 : ImportPgdata {
291 : import_pgdata: TimelineCreateRequestModeImportPgdata,
292 : },
293 : // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
294 : // (serde picks the first matching enum variant, in declaration order).
295 : Bootstrap {
296 : #[serde(default)]
297 : existing_initdb_timeline_id: Option<TimelineId>,
298 : pg_version: Option<u32>,
299 : },
300 : }
301 :
302 0 : #[derive(Serialize, Deserialize, Clone)]
303 : pub struct TimelineCreateRequestModeImportPgdata {
304 : pub location: ImportPgdataLocation,
305 : pub idempotency_key: ImportPgdataIdempotencyKey,
306 : }
307 :
308 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
309 : pub enum ImportPgdataLocation {
310 : #[cfg(feature = "testing")]
311 : LocalFs { path: Utf8PathBuf },
312 : AwsS3 {
313 : region: String,
314 : bucket: String,
315 : /// A better name for this would be `prefix`; changing requires coordination with cplane.
316 : /// See <https://github.com/neondatabase/cloud/issues/20646>.
317 : key: String,
318 : },
319 : }
320 :
321 0 : #[derive(Serialize, Deserialize, Clone)]
322 : #[serde(transparent)]
323 : pub struct ImportPgdataIdempotencyKey(pub String);
324 :
325 : impl ImportPgdataIdempotencyKey {
326 0 : pub fn random() -> Self {
327 : use rand::Rng;
328 : use rand::distributions::Alphanumeric;
329 0 : Self(
330 0 : rand::thread_rng()
331 0 : .sample_iter(&Alphanumeric)
332 0 : .take(20)
333 0 : .map(char::from)
334 0 : .collect(),
335 0 : )
336 0 : }
337 : }
338 :
339 0 : #[derive(Serialize, Deserialize, Clone)]
340 : pub struct LsnLeaseRequest {
341 : pub lsn: Lsn,
342 : }
343 :
344 0 : #[derive(Serialize, Deserialize)]
345 : pub struct TenantShardSplitRequest {
346 : pub new_shard_count: u8,
347 :
348 : // A tenant's stripe size is only meaningful the first time their shard count goes
349 : // above 1: therefore during a split from 1->N shards, we may modify the stripe size.
350 : //
351 : // If this is set while the stripe count is being increased from an already >1 value,
352 : // then the request will fail with 400.
353 : pub new_stripe_size: Option<ShardStripeSize>,
354 : }
355 :
356 0 : #[derive(Serialize, Deserialize)]
357 : pub struct TenantShardSplitResponse {
358 : pub new_shards: Vec<TenantShardId>,
359 : }
360 :
361 : /// Parameters that apply to all shards in a tenant. Used during tenant creation.
362 0 : #[derive(Serialize, Deserialize, Debug)]
363 : #[serde(deny_unknown_fields)]
364 : pub struct ShardParameters {
365 : pub count: ShardCount,
366 : pub stripe_size: ShardStripeSize,
367 : }
368 :
369 : impl ShardParameters {
370 : pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
371 :
372 0 : pub fn is_unsharded(&self) -> bool {
373 0 : self.count.is_unsharded()
374 0 : }
375 : }
376 :
377 : impl Default for ShardParameters {
378 455 : fn default() -> Self {
379 455 : Self {
380 455 : count: ShardCount::new(0),
381 455 : stripe_size: Self::DEFAULT_STRIPE_SIZE,
382 455 : }
383 455 : }
384 : }
385 :
386 : #[derive(Debug, Default, Clone, Eq, PartialEq)]
387 : pub enum FieldPatch<T> {
388 : Upsert(T),
389 : Remove,
390 : #[default]
391 : Noop,
392 : }
393 :
394 : impl<T> FieldPatch<T> {
395 70 : fn is_noop(&self) -> bool {
396 70 : matches!(self, FieldPatch::Noop)
397 70 : }
398 :
399 35 : pub fn apply(self, target: &mut Option<T>) {
400 35 : match self {
401 1 : Self::Upsert(v) => *target = Some(v),
402 1 : Self::Remove => *target = None,
403 33 : Self::Noop => {}
404 : }
405 35 : }
406 :
407 10 : pub fn map<U, E, F: FnOnce(T) -> Result<U, E>>(self, map: F) -> Result<FieldPatch<U>, E> {
408 10 : match self {
409 0 : Self::Upsert(v) => Ok(FieldPatch::<U>::Upsert(map(v)?)),
410 0 : Self::Remove => Ok(FieldPatch::<U>::Remove),
411 10 : Self::Noop => Ok(FieldPatch::<U>::Noop),
412 : }
413 10 : }
414 : }
415 :
416 : impl<'de, T: Deserialize<'de>> Deserialize<'de> for FieldPatch<T> {
417 2 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
418 2 : where
419 2 : D: Deserializer<'de>,
420 2 : {
421 2 : Option::deserialize(deserializer).map(|opt| match opt {
422 1 : None => FieldPatch::Remove,
423 1 : Some(val) => FieldPatch::Upsert(val),
424 2 : })
425 2 : }
426 : }
427 :
428 : impl<T: Serialize> Serialize for FieldPatch<T> {
429 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
430 2 : where
431 2 : S: Serializer,
432 2 : {
433 2 : match self {
434 1 : FieldPatch::Upsert(val) => serializer.serialize_some(val),
435 1 : FieldPatch::Remove => serializer.serialize_none(),
436 0 : FieldPatch::Noop => unreachable!(),
437 : }
438 2 : }
439 : }
440 :
441 2 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
442 : #[serde(default)]
443 : pub struct TenantConfigPatch {
444 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
445 : pub checkpoint_distance: FieldPatch<u64>,
446 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
447 : pub checkpoint_timeout: FieldPatch<String>,
448 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
449 : pub compaction_target_size: FieldPatch<u64>,
450 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
451 : pub compaction_period: FieldPatch<String>,
452 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
453 : pub compaction_threshold: FieldPatch<usize>,
454 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
455 : pub compaction_upper_limit: FieldPatch<usize>,
456 : // defer parsing compaction_algorithm, like eviction_policy
457 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
458 : pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
459 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
460 : pub compaction_l0_first: FieldPatch<bool>,
461 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
462 : pub compaction_l0_semaphore: FieldPatch<bool>,
463 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
464 : pub l0_flush_delay_threshold: FieldPatch<usize>,
465 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
466 : pub l0_flush_stall_threshold: FieldPatch<usize>,
467 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
468 : pub l0_flush_wait_upload: FieldPatch<bool>,
469 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
470 : pub gc_horizon: FieldPatch<u64>,
471 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
472 : pub gc_period: FieldPatch<String>,
473 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
474 : pub image_creation_threshold: FieldPatch<usize>,
475 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
476 : pub pitr_interval: FieldPatch<String>,
477 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
478 : pub walreceiver_connect_timeout: FieldPatch<String>,
479 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
480 : pub lagging_wal_timeout: FieldPatch<String>,
481 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
482 : pub max_lsn_wal_lag: FieldPatch<NonZeroU64>,
483 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
484 : pub eviction_policy: FieldPatch<EvictionPolicy>,
485 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
486 : pub min_resident_size_override: FieldPatch<u64>,
487 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
488 : pub evictions_low_residence_duration_metric_threshold: FieldPatch<String>,
489 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
490 : pub heatmap_period: FieldPatch<String>,
491 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
492 : pub lazy_slru_download: FieldPatch<bool>,
493 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
494 : pub timeline_get_throttle: FieldPatch<ThrottleConfig>,
495 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
496 : pub image_layer_creation_check_threshold: FieldPatch<u8>,
497 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
498 : pub image_creation_preempt_threshold: FieldPatch<usize>,
499 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
500 : pub lsn_lease_length: FieldPatch<String>,
501 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
502 : pub lsn_lease_length_for_ts: FieldPatch<String>,
503 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
504 : pub timeline_offloading: FieldPatch<bool>,
505 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
506 : pub wal_receiver_protocol_override: FieldPatch<PostgresClientProtocol>,
507 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
508 : pub rel_size_v2_enabled: FieldPatch<bool>,
509 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
510 : pub gc_compaction_enabled: FieldPatch<bool>,
511 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
512 : pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
513 : #[serde(skip_serializing_if = "FieldPatch::is_noop")]
514 : pub gc_compaction_ratio_percent: FieldPatch<u64>,
515 : }
516 :
517 : /// An alternative representation of `pageserver::tenant::TenantConf` with
518 : /// simpler types.
519 40 : #[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
520 : pub struct TenantConfig {
521 : pub checkpoint_distance: Option<u64>,
522 : #[serde(default)]
523 : #[serde(with = "humantime_serde")]
524 : pub checkpoint_timeout: Option<Duration>,
525 : pub compaction_target_size: Option<u64>,
526 : #[serde(default)]
527 : #[serde(with = "humantime_serde")]
528 : pub compaction_period: Option<Duration>,
529 : pub compaction_threshold: Option<usize>,
530 : pub compaction_upper_limit: Option<usize>,
531 : // defer parsing compaction_algorithm, like eviction_policy
532 : pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
533 : pub compaction_l0_first: Option<bool>,
534 : pub compaction_l0_semaphore: Option<bool>,
535 : pub l0_flush_delay_threshold: Option<usize>,
536 : pub l0_flush_stall_threshold: Option<usize>,
537 : pub l0_flush_wait_upload: Option<bool>,
538 : pub gc_horizon: Option<u64>,
539 : #[serde(default)]
540 : #[serde(with = "humantime_serde")]
541 : pub gc_period: Option<Duration>,
542 : pub image_creation_threshold: Option<usize>,
543 : #[serde(default)]
544 : #[serde(with = "humantime_serde")]
545 : pub pitr_interval: Option<Duration>,
546 : #[serde(default)]
547 : #[serde(with = "humantime_serde")]
548 : pub walreceiver_connect_timeout: Option<Duration>,
549 : #[serde(default)]
550 : #[serde(with = "humantime_serde")]
551 : pub lagging_wal_timeout: Option<Duration>,
552 : pub max_lsn_wal_lag: Option<NonZeroU64>,
553 : pub eviction_policy: Option<EvictionPolicy>,
554 : pub min_resident_size_override: Option<u64>,
555 : #[serde(default)]
556 : #[serde(with = "humantime_serde")]
557 : pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
558 : #[serde(default)]
559 : #[serde(with = "humantime_serde")]
560 : pub heatmap_period: Option<Duration>,
561 : pub lazy_slru_download: Option<bool>,
562 : pub timeline_get_throttle: Option<ThrottleConfig>,
563 : pub image_layer_creation_check_threshold: Option<u8>,
564 : pub image_creation_preempt_threshold: Option<usize>,
565 : #[serde(default)]
566 : #[serde(with = "humantime_serde")]
567 : pub lsn_lease_length: Option<Duration>,
568 : #[serde(default)]
569 : #[serde(with = "humantime_serde")]
570 : pub lsn_lease_length_for_ts: Option<Duration>,
571 : pub timeline_offloading: Option<bool>,
572 : pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
573 : pub rel_size_v2_enabled: Option<bool>,
574 : pub gc_compaction_enabled: Option<bool>,
575 : pub gc_compaction_initial_threshold_kb: Option<u64>,
576 : pub gc_compaction_ratio_percent: Option<u64>,
577 : }
578 :
579 : impl TenantConfig {
580 1 : pub fn apply_patch(
581 1 : self,
582 1 : patch: TenantConfigPatch,
583 1 : ) -> Result<TenantConfig, humantime::DurationError> {
584 1 : let Self {
585 1 : mut checkpoint_distance,
586 1 : mut checkpoint_timeout,
587 1 : mut compaction_target_size,
588 1 : mut compaction_period,
589 1 : mut compaction_threshold,
590 1 : mut compaction_upper_limit,
591 1 : mut compaction_algorithm,
592 1 : mut compaction_l0_first,
593 1 : mut compaction_l0_semaphore,
594 1 : mut l0_flush_delay_threshold,
595 1 : mut l0_flush_stall_threshold,
596 1 : mut l0_flush_wait_upload,
597 1 : mut gc_horizon,
598 1 : mut gc_period,
599 1 : mut image_creation_threshold,
600 1 : mut pitr_interval,
601 1 : mut walreceiver_connect_timeout,
602 1 : mut lagging_wal_timeout,
603 1 : mut max_lsn_wal_lag,
604 1 : mut eviction_policy,
605 1 : mut min_resident_size_override,
606 1 : mut evictions_low_residence_duration_metric_threshold,
607 1 : mut heatmap_period,
608 1 : mut lazy_slru_download,
609 1 : mut timeline_get_throttle,
610 1 : mut image_layer_creation_check_threshold,
611 1 : mut image_creation_preempt_threshold,
612 1 : mut lsn_lease_length,
613 1 : mut lsn_lease_length_for_ts,
614 1 : mut timeline_offloading,
615 1 : mut wal_receiver_protocol_override,
616 1 : mut rel_size_v2_enabled,
617 1 : mut gc_compaction_enabled,
618 1 : mut gc_compaction_initial_threshold_kb,
619 1 : mut gc_compaction_ratio_percent,
620 1 : } = self;
621 1 :
622 1 : patch.checkpoint_distance.apply(&mut checkpoint_distance);
623 1 : patch
624 1 : .checkpoint_timeout
625 1 : .map(|v| humantime::parse_duration(&v))?
626 1 : .apply(&mut checkpoint_timeout);
627 1 : patch
628 1 : .compaction_target_size
629 1 : .apply(&mut compaction_target_size);
630 1 : patch
631 1 : .compaction_period
632 1 : .map(|v| humantime::parse_duration(&v))?
633 1 : .apply(&mut compaction_period);
634 1 : patch.compaction_threshold.apply(&mut compaction_threshold);
635 1 : patch
636 1 : .compaction_upper_limit
637 1 : .apply(&mut compaction_upper_limit);
638 1 : patch.compaction_algorithm.apply(&mut compaction_algorithm);
639 1 : patch.compaction_l0_first.apply(&mut compaction_l0_first);
640 1 : patch
641 1 : .compaction_l0_semaphore
642 1 : .apply(&mut compaction_l0_semaphore);
643 1 : patch
644 1 : .l0_flush_delay_threshold
645 1 : .apply(&mut l0_flush_delay_threshold);
646 1 : patch
647 1 : .l0_flush_stall_threshold
648 1 : .apply(&mut l0_flush_stall_threshold);
649 1 : patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
650 1 : patch.gc_horizon.apply(&mut gc_horizon);
651 1 : patch
652 1 : .gc_period
653 1 : .map(|v| humantime::parse_duration(&v))?
654 1 : .apply(&mut gc_period);
655 1 : patch
656 1 : .image_creation_threshold
657 1 : .apply(&mut image_creation_threshold);
658 1 : patch
659 1 : .pitr_interval
660 1 : .map(|v| humantime::parse_duration(&v))?
661 1 : .apply(&mut pitr_interval);
662 1 : patch
663 1 : .walreceiver_connect_timeout
664 1 : .map(|v| humantime::parse_duration(&v))?
665 1 : .apply(&mut walreceiver_connect_timeout);
666 1 : patch
667 1 : .lagging_wal_timeout
668 1 : .map(|v| humantime::parse_duration(&v))?
669 1 : .apply(&mut lagging_wal_timeout);
670 1 : patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
671 1 : patch.eviction_policy.apply(&mut eviction_policy);
672 1 : patch
673 1 : .min_resident_size_override
674 1 : .apply(&mut min_resident_size_override);
675 1 : patch
676 1 : .evictions_low_residence_duration_metric_threshold
677 1 : .map(|v| humantime::parse_duration(&v))?
678 1 : .apply(&mut evictions_low_residence_duration_metric_threshold);
679 1 : patch
680 1 : .heatmap_period
681 1 : .map(|v| humantime::parse_duration(&v))?
682 1 : .apply(&mut heatmap_period);
683 1 : patch.lazy_slru_download.apply(&mut lazy_slru_download);
684 1 : patch
685 1 : .timeline_get_throttle
686 1 : .apply(&mut timeline_get_throttle);
687 1 : patch
688 1 : .image_layer_creation_check_threshold
689 1 : .apply(&mut image_layer_creation_check_threshold);
690 1 : patch
691 1 : .image_creation_preempt_threshold
692 1 : .apply(&mut image_creation_preempt_threshold);
693 1 : patch
694 1 : .lsn_lease_length
695 1 : .map(|v| humantime::parse_duration(&v))?
696 1 : .apply(&mut lsn_lease_length);
697 1 : patch
698 1 : .lsn_lease_length_for_ts
699 1 : .map(|v| humantime::parse_duration(&v))?
700 1 : .apply(&mut lsn_lease_length_for_ts);
701 1 : patch.timeline_offloading.apply(&mut timeline_offloading);
702 1 : patch
703 1 : .wal_receiver_protocol_override
704 1 : .apply(&mut wal_receiver_protocol_override);
705 1 : patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled);
706 1 : patch
707 1 : .gc_compaction_enabled
708 1 : .apply(&mut gc_compaction_enabled);
709 1 : patch
710 1 : .gc_compaction_initial_threshold_kb
711 1 : .apply(&mut gc_compaction_initial_threshold_kb);
712 1 : patch
713 1 : .gc_compaction_ratio_percent
714 1 : .apply(&mut gc_compaction_ratio_percent);
715 1 :
716 1 : Ok(Self {
717 1 : checkpoint_distance,
718 1 : checkpoint_timeout,
719 1 : compaction_target_size,
720 1 : compaction_period,
721 1 : compaction_threshold,
722 1 : compaction_upper_limit,
723 1 : compaction_algorithm,
724 1 : compaction_l0_first,
725 1 : compaction_l0_semaphore,
726 1 : l0_flush_delay_threshold,
727 1 : l0_flush_stall_threshold,
728 1 : l0_flush_wait_upload,
729 1 : gc_horizon,
730 1 : gc_period,
731 1 : image_creation_threshold,
732 1 : pitr_interval,
733 1 : walreceiver_connect_timeout,
734 1 : lagging_wal_timeout,
735 1 : max_lsn_wal_lag,
736 1 : eviction_policy,
737 1 : min_resident_size_override,
738 1 : evictions_low_residence_duration_metric_threshold,
739 1 : heatmap_period,
740 1 : lazy_slru_download,
741 1 : timeline_get_throttle,
742 1 : image_layer_creation_check_threshold,
743 1 : image_creation_preempt_threshold,
744 1 : lsn_lease_length,
745 1 : lsn_lease_length_for_ts,
746 1 : timeline_offloading,
747 1 : wal_receiver_protocol_override,
748 1 : rel_size_v2_enabled,
749 1 : gc_compaction_enabled,
750 1 : gc_compaction_initial_threshold_kb,
751 1 : gc_compaction_ratio_percent,
752 1 : })
753 1 : }
754 : }
755 :
756 : /// The policy for the aux file storage.
757 : ///
758 : /// It can be switched through `switch_aux_file_policy` tenant config.
759 : /// When the first aux file written, the policy will be persisted in the
760 : /// `index_part.json` file and has a limited migration path.
761 : ///
762 : /// Currently, we only allow the following migration path:
763 : ///
764 : /// Unset -> V1
765 : /// -> V2
766 : /// -> CrossValidation -> V2
767 : #[derive(
768 : Eq,
769 : PartialEq,
770 : Debug,
771 : Copy,
772 : Clone,
773 0 : strum_macros::EnumString,
774 : strum_macros::Display,
775 4 : serde_with::DeserializeFromStr,
776 : serde_with::SerializeDisplay,
777 : )]
778 : #[strum(serialize_all = "kebab-case")]
779 : pub enum AuxFilePolicy {
780 : /// V1 aux file policy: store everything in AUX_FILE_KEY
781 : #[strum(ascii_case_insensitive)]
782 : V1,
783 : /// V2 aux file policy: store in the AUX_FILE keyspace
784 : #[strum(ascii_case_insensitive)]
785 : V2,
786 : /// Cross validation runs both formats on the write path and does validation
787 : /// on the read path.
788 : #[strum(ascii_case_insensitive)]
789 : CrossValidation,
790 : }
791 :
792 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
793 : #[serde(tag = "kind")]
794 : pub enum EvictionPolicy {
795 : NoEviction,
796 : LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
797 : OnlyImitiate(EvictionPolicyLayerAccessThreshold),
798 : }
799 :
800 : impl EvictionPolicy {
801 0 : pub fn discriminant_str(&self) -> &'static str {
802 0 : match self {
803 0 : EvictionPolicy::NoEviction => "NoEviction",
804 0 : EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
805 0 : EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
806 : }
807 0 : }
808 : }
809 :
810 : #[derive(
811 : Eq,
812 : PartialEq,
813 : Debug,
814 : Copy,
815 : Clone,
816 0 : strum_macros::EnumString,
817 : strum_macros::Display,
818 0 : serde_with::DeserializeFromStr,
819 : serde_with::SerializeDisplay,
820 : )]
821 : #[strum(serialize_all = "kebab-case")]
822 : pub enum CompactionAlgorithm {
823 : Legacy,
824 : Tiered,
825 : }
826 :
827 : #[derive(
828 4 : Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
829 : )]
830 : pub enum ImageCompressionAlgorithm {
831 : // Disabled for writes, support decompressing during read path
832 : Disabled,
833 : /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
834 : /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
835 : Zstd {
836 : level: Option<i8>,
837 : },
838 : }
839 :
840 : impl FromStr for ImageCompressionAlgorithm {
841 : type Err = anyhow::Error;
842 8 : fn from_str(s: &str) -> Result<Self, Self::Err> {
843 8 : let mut components = s.split(['(', ')']);
844 8 : let first = components
845 8 : .next()
846 8 : .ok_or_else(|| anyhow::anyhow!("empty string"))?;
847 8 : match first {
848 8 : "disabled" => Ok(ImageCompressionAlgorithm::Disabled),
849 6 : "zstd" => {
850 6 : let level = if let Some(v) = components.next() {
851 4 : let v: i8 = v.parse()?;
852 4 : Some(v)
853 : } else {
854 2 : None
855 : };
856 :
857 6 : Ok(ImageCompressionAlgorithm::Zstd { level })
858 : }
859 0 : _ => anyhow::bail!("invalid specifier '{first}'"),
860 : }
861 8 : }
862 : }
863 :
864 : impl Display for ImageCompressionAlgorithm {
865 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
866 12 : match self {
867 3 : ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
868 9 : ImageCompressionAlgorithm::Zstd { level } => {
869 9 : if let Some(level) = level {
870 6 : write!(f, "zstd({})", level)
871 : } else {
872 3 : write!(f, "zstd")
873 : }
874 : }
875 : }
876 12 : }
877 : }
878 :
879 0 : #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
880 : pub struct CompactionAlgorithmSettings {
881 : pub kind: CompactionAlgorithm,
882 : }
883 :
884 8 : #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
885 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
886 : pub enum L0FlushConfig {
887 : #[serde(rename_all = "snake_case")]
888 : Direct { max_concurrency: NonZeroUsize },
889 : }
890 :
891 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
892 : pub struct EvictionPolicyLayerAccessThreshold {
893 : #[serde(with = "humantime_serde")]
894 : pub period: Duration,
895 : #[serde(with = "humantime_serde")]
896 : pub threshold: Duration,
897 : }
898 :
899 6 : #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
900 : pub struct ThrottleConfig {
901 : /// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
902 : #[serde(rename = "task_kinds")]
903 : pub enabled: ThrottleConfigTaskKinds,
904 : pub initial: u32,
905 : #[serde(with = "humantime_serde")]
906 : pub refill_interval: Duration,
907 : pub refill_amount: NonZeroU32,
908 : pub max: u32,
909 : }
910 :
911 : /// Before <https://github.com/neondatabase/neon/pull/9962>
912 : /// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
913 : /// The `task_kinds` field controlled which Pageserver "Task Kind"s
914 : /// were subject to the throttle.
915 : ///
916 : /// After that PR, the throttle is applied at pagestream request level
917 : /// and the `task_kinds` field does not apply since the only task kind
918 : /// that us subject to the throttle is that of the page service.
919 : ///
920 : /// However, we don't want to make a breaking config change right now
921 : /// because it means we have to migrate all the tenant configs.
922 : /// This will be done in a future PR.
923 : ///
924 : /// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
925 : /// field to determine if the throttle is enabled or not.
926 1 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
927 : #[serde(transparent)]
928 : pub struct ThrottleConfigTaskKinds(Vec<String>);
929 :
930 : impl ThrottleConfigTaskKinds {
931 929 : pub fn disabled() -> Self {
932 929 : Self(vec![])
933 929 : }
934 456 : pub fn is_enabled(&self) -> bool {
935 456 : !self.0.is_empty()
936 456 : }
937 : }
938 :
939 : impl ThrottleConfig {
940 929 : pub fn disabled() -> Self {
941 929 : Self {
942 929 : enabled: ThrottleConfigTaskKinds::disabled(),
943 929 : // other values don't matter with emtpy `task_kinds`.
944 929 : initial: 0,
945 929 : refill_interval: Duration::from_millis(1),
946 929 : refill_amount: NonZeroU32::new(1).unwrap(),
947 929 : max: 1,
948 929 : }
949 929 : }
950 : /// The requests per second allowed by the given config.
951 0 : pub fn steady_rps(&self) -> f64 {
952 0 : (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
953 0 : }
954 : }
955 :
956 : #[cfg(test)]
957 : mod throttle_config_tests {
958 : use super::*;
959 :
960 : #[test]
961 1 : fn test_disabled_is_disabled() {
962 1 : let config = ThrottleConfig::disabled();
963 1 : assert!(!config.enabled.is_enabled());
964 1 : }
965 : #[test]
966 1 : fn test_enabled_backwards_compat() {
967 1 : let input = serde_json::json!({
968 1 : "task_kinds": ["PageRequestHandler"],
969 1 : "initial": 40000,
970 1 : "refill_interval": "50ms",
971 1 : "refill_amount": 1000,
972 1 : "max": 40000,
973 1 : "fair": true
974 1 : });
975 1 : let config: ThrottleConfig = serde_json::from_value(input).unwrap();
976 1 : assert!(config.enabled.is_enabled());
977 1 : }
978 : }
979 :
980 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
981 : /// lists out all possible states (and the virtual "Detached" state)
982 : /// in a flat form rather than using rust-style enums.
983 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
984 : pub enum LocationConfigMode {
985 : AttachedSingle,
986 : AttachedMulti,
987 : AttachedStale,
988 : Secondary,
989 : Detached,
990 : }
991 :
992 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
993 : pub struct LocationConfigSecondary {
994 : pub warm: bool,
995 : }
996 :
997 : /// An alternative representation of `pageserver::tenant::LocationConf`,
998 : /// for use in external-facing APIs.
999 0 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1000 : pub struct LocationConfig {
1001 : pub mode: LocationConfigMode,
1002 : /// If attaching, in what generation?
1003 : #[serde(default)]
1004 : pub generation: Option<u32>,
1005 :
1006 : // If requesting mode `Secondary`, configuration for that.
1007 : #[serde(default)]
1008 : pub secondary_conf: Option<LocationConfigSecondary>,
1009 :
1010 : // Shard parameters: if shard_count is nonzero, then other shard_* fields
1011 : // must be set accurately.
1012 : #[serde(default)]
1013 : pub shard_number: u8,
1014 : #[serde(default)]
1015 : pub shard_count: u8,
1016 : #[serde(default)]
1017 : pub shard_stripe_size: u32,
1018 :
1019 : // This configuration only affects attached mode, but should be provided irrespective
1020 : // of the mode, as a secondary location might transition on startup if the response
1021 : // to the `/re-attach` control plane API requests it.
1022 : pub tenant_conf: TenantConfig,
1023 : }
1024 :
1025 0 : #[derive(Serialize, Deserialize)]
1026 : pub struct LocationConfigListResponse {
1027 : pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
1028 : }
1029 :
1030 : #[derive(Serialize)]
1031 : pub struct StatusResponse {
1032 : pub id: NodeId,
1033 : }
1034 :
1035 0 : #[derive(Serialize, Deserialize, Debug)]
1036 : #[serde(deny_unknown_fields)]
1037 : pub struct TenantLocationConfigRequest {
1038 : #[serde(flatten)]
1039 : pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
1040 : }
1041 :
1042 0 : #[derive(Serialize, Deserialize, Debug)]
1043 : #[serde(deny_unknown_fields)]
1044 : pub struct TenantTimeTravelRequest {
1045 : pub shard_counts: Vec<ShardCount>,
1046 : }
1047 :
1048 0 : #[derive(Serialize, Deserialize, Debug)]
1049 : #[serde(deny_unknown_fields)]
1050 : pub struct TenantShardLocation {
1051 : pub shard_id: TenantShardId,
1052 : pub node_id: NodeId,
1053 : }
1054 :
1055 0 : #[derive(Serialize, Deserialize, Debug)]
1056 : #[serde(deny_unknown_fields)]
1057 : pub struct TenantLocationConfigResponse {
1058 : pub shards: Vec<TenantShardLocation>,
1059 : // If the shards' ShardCount count is >1, stripe_size will be set.
1060 : pub stripe_size: Option<ShardStripeSize>,
1061 : }
1062 :
1063 2 : #[derive(Serialize, Deserialize, Debug)]
1064 : #[serde(deny_unknown_fields)]
1065 : pub struct TenantConfigRequest {
1066 : pub tenant_id: TenantId,
1067 : #[serde(flatten)]
1068 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
1069 : }
1070 :
1071 : impl std::ops::Deref for TenantConfigRequest {
1072 : type Target = TenantConfig;
1073 :
1074 0 : fn deref(&self) -> &Self::Target {
1075 0 : &self.config
1076 0 : }
1077 : }
1078 :
1079 : impl TenantConfigRequest {
1080 0 : pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
1081 0 : let config = TenantConfig::default();
1082 0 : TenantConfigRequest { tenant_id, config }
1083 0 : }
1084 : }
1085 :
1086 3 : #[derive(Serialize, Deserialize, Debug)]
1087 : #[serde(deny_unknown_fields)]
1088 : pub struct TenantConfigPatchRequest {
1089 : pub tenant_id: TenantId,
1090 : #[serde(flatten)]
1091 : pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
1092 : }
1093 :
1094 0 : #[derive(Serialize, Deserialize, Debug)]
1095 : pub struct TenantWaitLsnRequest {
1096 : #[serde(flatten)]
1097 : pub timelines: HashMap<TimelineId, Lsn>,
1098 : pub timeout: Duration,
1099 : }
1100 :
1101 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
1102 0 : #[derive(Serialize, Deserialize, Clone)]
1103 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
1104 : pub enum TenantAttachmentStatus {
1105 : Maybe,
1106 : Attached,
1107 : Failed { reason: String },
1108 : }
1109 :
1110 0 : #[derive(Serialize, Deserialize, Clone)]
1111 : pub struct TenantInfo {
1112 : pub id: TenantShardId,
1113 : // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
1114 : pub state: TenantState,
1115 : /// Sum of the size of all layer files.
1116 : /// If a layer is present in both local FS and S3, it counts only once.
1117 : pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
1118 : pub attachment_status: TenantAttachmentStatus,
1119 : pub generation: u32,
1120 :
1121 : /// Opaque explanation if gc is being blocked.
1122 : ///
1123 : /// Only looked up for the individual tenant detail, not the listing.
1124 : #[serde(skip_serializing_if = "Option::is_none")]
1125 : pub gc_blocking: Option<String>,
1126 : }
1127 :
1128 0 : #[derive(Serialize, Deserialize, Clone)]
1129 : pub struct TenantDetails {
1130 : #[serde(flatten)]
1131 : pub tenant_info: TenantInfo,
1132 :
1133 : pub walredo: Option<WalRedoManagerStatus>,
1134 :
1135 : pub timelines: Vec<TimelineId>,
1136 : }
1137 :
1138 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
1139 : pub enum TimelineArchivalState {
1140 : Archived,
1141 : Unarchived,
1142 : }
1143 :
1144 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
1145 : pub struct TimelineArchivalConfigRequest {
1146 : pub state: TimelineArchivalState,
1147 : }
1148 :
1149 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1150 : pub struct TimelinesInfoAndOffloaded {
1151 : pub timelines: Vec<TimelineInfo>,
1152 : pub offloaded: Vec<OffloadedTimelineInfo>,
1153 : }
1154 :
1155 : /// Analog of [`TimelineInfo`] for offloaded timelines.
1156 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1157 : pub struct OffloadedTimelineInfo {
1158 : pub tenant_id: TenantShardId,
1159 : pub timeline_id: TimelineId,
1160 : /// Whether the timeline has a parent it has been branched off from or not
1161 : pub ancestor_timeline_id: Option<TimelineId>,
1162 : /// Whether to retain the branch lsn at the ancestor or not
1163 : pub ancestor_retain_lsn: Option<Lsn>,
1164 : /// The time point when the timeline was archived
1165 : pub archived_at: chrono::DateTime<chrono::Utc>,
1166 : }
1167 :
1168 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
1169 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1170 : pub struct TimelineInfo {
1171 : pub tenant_id: TenantShardId,
1172 : pub timeline_id: TimelineId,
1173 :
1174 : pub ancestor_timeline_id: Option<TimelineId>,
1175 : pub ancestor_lsn: Option<Lsn>,
1176 : pub last_record_lsn: Lsn,
1177 : pub prev_record_lsn: Option<Lsn>,
1178 :
1179 : /// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
1180 : /// TODO: remove once control plane no longer reads it.
1181 : pub latest_gc_cutoff_lsn: Lsn,
1182 :
1183 : /// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
1184 : /// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
1185 : /// as it is easier to reason about.
1186 : #[serde(default)]
1187 : pub applied_gc_cutoff_lsn: Lsn,
1188 :
1189 : /// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
1190 : /// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
1191 : /// LSN at which it is legal to create a branch or ephemeral endpoint.
1192 : ///
1193 : /// Note that holders of valid LSN leases may be able to create branches and read pages earlier
1194 : /// than this LSN, but new leases may not be taken out earlier than this LSN.
1195 : #[serde(default)]
1196 : pub min_readable_lsn: Lsn,
1197 :
1198 : pub disk_consistent_lsn: Lsn,
1199 :
1200 : /// The LSN that we have succesfully uploaded to remote storage
1201 : pub remote_consistent_lsn: Lsn,
1202 :
1203 : /// The LSN that we are advertizing to safekeepers
1204 : pub remote_consistent_lsn_visible: Lsn,
1205 :
1206 : /// The LSN from the start of the root timeline (never changes)
1207 : pub initdb_lsn: Lsn,
1208 :
1209 : pub current_logical_size: u64,
1210 : pub current_logical_size_is_accurate: bool,
1211 :
1212 : pub directory_entries_counts: Vec<u64>,
1213 :
1214 : /// Sum of the size of all layer files.
1215 : /// If a layer is present in both local FS and S3, it counts only once.
1216 : pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
1217 : pub current_logical_size_non_incremental: Option<u64>,
1218 :
1219 : /// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
1220 : /// beyond the branch's branch point, we only count up to the branch point.
1221 : pub pitr_history_size: u64,
1222 :
1223 : /// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
1224 : /// ancestor data used by this branch would have been retained anyway). If this is false, then
1225 : /// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
1226 : /// otherwise be able to GC.
1227 : pub within_ancestor_pitr: bool,
1228 :
1229 : pub timeline_dir_layer_file_size_sum: Option<u64>,
1230 :
1231 : pub wal_source_connstr: Option<String>,
1232 : pub last_received_msg_lsn: Option<Lsn>,
1233 : /// the timestamp (in microseconds) of the last received message
1234 : pub last_received_msg_ts: Option<u128>,
1235 : pub pg_version: u32,
1236 :
1237 : pub state: TimelineState,
1238 :
1239 : pub walreceiver_status: String,
1240 :
1241 : // ALWAYS add new fields at the end of the struct with `Option` to ensure forward/backward compatibility.
1242 : // Backward compatibility: you will get a JSON not containing the newly-added field.
1243 : // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
1244 : // not deny unknown fields by default so it's safe to set the field to some value, though it won't be
1245 : // read.
1246 : pub is_archived: Option<bool>,
1247 : }
1248 :
1249 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1250 : pub struct LayerMapInfo {
1251 : pub in_memory_layers: Vec<InMemoryLayerInfo>,
1252 : pub historic_layers: Vec<HistoricLayerInfo>,
1253 : }
1254 :
1255 : /// The residence status of a layer
1256 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1257 : pub enum LayerResidenceStatus {
1258 : /// Residence status for a layer file that exists locally.
1259 : /// It may also exist on the remote, we don't care here.
1260 : Resident,
1261 : /// Residence status for a layer file that only exists on the remote.
1262 : Evicted,
1263 : }
1264 :
1265 : #[serde_as]
1266 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1267 : pub struct LayerAccessStats {
1268 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1269 : pub access_time: SystemTime,
1270 :
1271 : #[serde_as(as = "serde_with::TimestampMilliSeconds")]
1272 : pub residence_time: SystemTime,
1273 :
1274 : pub visible: bool,
1275 : }
1276 :
1277 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1278 : #[serde(tag = "kind")]
1279 : pub enum InMemoryLayerInfo {
1280 : Open { lsn_start: Lsn },
1281 : Frozen { lsn_start: Lsn, lsn_end: Lsn },
1282 : }
1283 :
1284 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1285 : #[serde(tag = "kind")]
1286 : pub enum HistoricLayerInfo {
1287 : Delta {
1288 : layer_file_name: String,
1289 : layer_file_size: u64,
1290 :
1291 : lsn_start: Lsn,
1292 : lsn_end: Lsn,
1293 : remote: bool,
1294 : access_stats: LayerAccessStats,
1295 :
1296 : l0: bool,
1297 : },
1298 : Image {
1299 : layer_file_name: String,
1300 : layer_file_size: u64,
1301 :
1302 : lsn_start: Lsn,
1303 : remote: bool,
1304 : access_stats: LayerAccessStats,
1305 : },
1306 : }
1307 :
1308 : impl HistoricLayerInfo {
1309 0 : pub fn layer_file_name(&self) -> &str {
1310 0 : match self {
1311 : HistoricLayerInfo::Delta {
1312 0 : layer_file_name, ..
1313 0 : } => layer_file_name,
1314 : HistoricLayerInfo::Image {
1315 0 : layer_file_name, ..
1316 0 : } => layer_file_name,
1317 : }
1318 0 : }
1319 0 : pub fn is_remote(&self) -> bool {
1320 0 : match self {
1321 0 : HistoricLayerInfo::Delta { remote, .. } => *remote,
1322 0 : HistoricLayerInfo::Image { remote, .. } => *remote,
1323 : }
1324 0 : }
1325 0 : pub fn set_remote(&mut self, value: bool) {
1326 0 : let field = match self {
1327 0 : HistoricLayerInfo::Delta { remote, .. } => remote,
1328 0 : HistoricLayerInfo::Image { remote, .. } => remote,
1329 : };
1330 0 : *field = value;
1331 0 : }
1332 0 : pub fn layer_file_size(&self) -> u64 {
1333 0 : match self {
1334 : HistoricLayerInfo::Delta {
1335 0 : layer_file_size, ..
1336 0 : } => *layer_file_size,
1337 : HistoricLayerInfo::Image {
1338 0 : layer_file_size, ..
1339 0 : } => *layer_file_size,
1340 : }
1341 0 : }
1342 : }
1343 :
1344 0 : #[derive(Debug, Serialize, Deserialize)]
1345 : pub struct DownloadRemoteLayersTaskSpawnRequest {
1346 : pub max_concurrent_downloads: NonZeroUsize,
1347 : }
1348 :
1349 0 : #[derive(Debug, Serialize, Deserialize)]
1350 : pub struct IngestAuxFilesRequest {
1351 : pub aux_files: HashMap<String, String>,
1352 : }
1353 :
1354 0 : #[derive(Debug, Serialize, Deserialize)]
1355 : pub struct ListAuxFilesRequest {
1356 : pub lsn: Lsn,
1357 : }
1358 :
1359 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1360 : pub struct DownloadRemoteLayersTaskInfo {
1361 : pub task_id: String,
1362 : pub state: DownloadRemoteLayersTaskState,
1363 : pub total_layer_count: u64, // stable once `completed`
1364 : pub successful_download_count: u64, // stable once `completed`
1365 : pub failed_download_count: u64, // stable once `completed`
1366 : }
1367 :
1368 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
1369 : pub enum DownloadRemoteLayersTaskState {
1370 : Running,
1371 : Completed,
1372 : ShutDown,
1373 : }
1374 :
1375 0 : #[derive(Debug, Serialize, Deserialize)]
1376 : pub struct TimelineGcRequest {
1377 : pub gc_horizon: Option<u64>,
1378 : }
1379 :
1380 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1381 : pub struct WalRedoManagerProcessStatus {
1382 : pub pid: u32,
1383 : }
1384 :
1385 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1386 : pub struct WalRedoManagerStatus {
1387 : pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
1388 : pub process: Option<WalRedoManagerProcessStatus>,
1389 : }
1390 :
1391 : /// The progress of a secondary tenant.
1392 : ///
1393 : /// It is mostly useful when doing a long running download: e.g. initiating
1394 : /// a download job, timing out while waiting for it to run, and then inspecting this status to understand
1395 : /// what's happening.
1396 0 : #[derive(Default, Debug, Serialize, Deserialize, Clone)]
1397 : pub struct SecondaryProgress {
1398 : /// The remote storage LastModified time of the heatmap object we last downloaded.
1399 : pub heatmap_mtime: Option<serde_system_time::SystemTime>,
1400 :
1401 : /// The number of layers currently on-disk
1402 : pub layers_downloaded: usize,
1403 : /// The number of layers in the most recently seen heatmap
1404 : pub layers_total: usize,
1405 :
1406 : /// The number of layer bytes currently on-disk
1407 : pub bytes_downloaded: u64,
1408 : /// The number of layer bytes in the most recently seen heatmap
1409 : pub bytes_total: u64,
1410 : }
1411 :
1412 0 : #[derive(Serialize, Deserialize, Debug)]
1413 : pub struct TenantScanRemoteStorageShard {
1414 : pub tenant_shard_id: TenantShardId,
1415 : pub generation: Option<u32>,
1416 : }
1417 :
1418 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1419 : pub struct TenantScanRemoteStorageResponse {
1420 : pub shards: Vec<TenantScanRemoteStorageShard>,
1421 : }
1422 :
1423 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1424 : #[serde(rename_all = "snake_case")]
1425 : pub enum TenantSorting {
1426 : ResidentSize,
1427 : MaxLogicalSize,
1428 : }
1429 :
1430 : impl Default for TenantSorting {
1431 0 : fn default() -> Self {
1432 0 : Self::ResidentSize
1433 0 : }
1434 : }
1435 :
1436 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
1437 : pub struct TopTenantShardsRequest {
1438 : // How would you like to sort the tenants?
1439 : pub order_by: TenantSorting,
1440 :
1441 : // How many results?
1442 : pub limit: usize,
1443 :
1444 : // Omit tenants with more than this many shards (e.g. if this is the max number of shards
1445 : // that the caller would ever split to)
1446 : pub where_shards_lt: Option<ShardCount>,
1447 :
1448 : // Omit tenants where the ordering metric is less than this (this is an optimization to
1449 : // let us quickly exclude numerous tiny shards)
1450 : pub where_gt: Option<u64>,
1451 : }
1452 :
1453 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
1454 : pub struct TopTenantShardItem {
1455 : pub id: TenantShardId,
1456 :
1457 : /// Total size of layers on local disk for all timelines in this tenant
1458 : pub resident_size: u64,
1459 :
1460 : /// Total size of layers in remote storage for all timelines in this tenant
1461 : pub physical_size: u64,
1462 :
1463 : /// The largest logical size of a timeline within this tenant
1464 : pub max_logical_size: u64,
1465 : }
1466 :
1467 0 : #[derive(Serialize, Deserialize, Debug, Default)]
1468 : pub struct TopTenantShardsResponse {
1469 : pub shards: Vec<TopTenantShardItem>,
1470 : }
1471 :
1472 : pub mod virtual_file {
1473 : #[derive(
1474 : Copy,
1475 : Clone,
1476 : PartialEq,
1477 : Eq,
1478 : Hash,
1479 0 : strum_macros::EnumString,
1480 : strum_macros::Display,
1481 0 : serde_with::DeserializeFromStr,
1482 : serde_with::SerializeDisplay,
1483 : Debug,
1484 : )]
1485 : #[strum(serialize_all = "kebab-case")]
1486 : pub enum IoEngineKind {
1487 : StdFs,
1488 : #[cfg(target_os = "linux")]
1489 : TokioEpollUring,
1490 : }
1491 :
1492 : /// Direct IO modes for a pageserver.
1493 : #[derive(
1494 : Copy,
1495 : Clone,
1496 : PartialEq,
1497 : Eq,
1498 : Hash,
1499 0 : strum_macros::EnumString,
1500 : strum_macros::Display,
1501 0 : serde_with::DeserializeFromStr,
1502 : serde_with::SerializeDisplay,
1503 : Debug,
1504 : )]
1505 : #[strum(serialize_all = "kebab-case")]
1506 : #[repr(u8)]
1507 : pub enum IoMode {
1508 : /// Uses buffered IO.
1509 : Buffered,
1510 : /// Uses direct IO, error out if the operation fails.
1511 : #[cfg(target_os = "linux")]
1512 : Direct,
1513 : }
1514 :
1515 : impl IoMode {
1516 490 : pub const fn preferred() -> Self {
1517 490 : Self::Buffered
1518 490 : }
1519 : }
1520 :
1521 : impl TryFrom<u8> for IoMode {
1522 : type Error = u8;
1523 :
1524 5098 : fn try_from(value: u8) -> Result<Self, Self::Error> {
1525 5098 : Ok(match value {
1526 5098 : v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
1527 : #[cfg(target_os = "linux")]
1528 0 : v if v == (IoMode::Direct as u8) => IoMode::Direct,
1529 0 : x => return Err(x),
1530 : })
1531 5098 : }
1532 : }
1533 : }
1534 :
1535 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
1536 : pub struct ScanDisposableKeysResponse {
1537 : pub disposable_count: usize,
1538 : pub not_disposable_count: usize,
1539 : }
1540 :
1541 : // Wrapped in libpq CopyData
1542 : #[derive(PartialEq, Eq, Debug)]
1543 : pub enum PagestreamFeMessage {
1544 : Exists(PagestreamExistsRequest),
1545 : Nblocks(PagestreamNblocksRequest),
1546 : GetPage(PagestreamGetPageRequest),
1547 : DbSize(PagestreamDbSizeRequest),
1548 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
1549 : #[cfg(feature = "testing")]
1550 : Test(PagestreamTestRequest),
1551 : }
1552 :
1553 : // Wrapped in libpq CopyData
1554 : #[derive(strum_macros::EnumProperty)]
1555 : pub enum PagestreamBeMessage {
1556 : Exists(PagestreamExistsResponse),
1557 : Nblocks(PagestreamNblocksResponse),
1558 : GetPage(PagestreamGetPageResponse),
1559 : Error(PagestreamErrorResponse),
1560 : DbSize(PagestreamDbSizeResponse),
1561 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
1562 : #[cfg(feature = "testing")]
1563 : Test(PagestreamTestResponse),
1564 : }
1565 :
1566 : // Keep in sync with `pagestore_client.h`
1567 : #[repr(u8)]
1568 : enum PagestreamFeMessageTag {
1569 : Exists = 0,
1570 : Nblocks = 1,
1571 : GetPage = 2,
1572 : DbSize = 3,
1573 : GetSlruSegment = 4,
1574 : /* future tags above this line */
1575 : /// For testing purposes, not available in production.
1576 : #[cfg(feature = "testing")]
1577 : Test = 99,
1578 : }
1579 :
1580 : // Keep in sync with `pagestore_client.h`
1581 : #[repr(u8)]
1582 : enum PagestreamBeMessageTag {
1583 : Exists = 100,
1584 : Nblocks = 101,
1585 : GetPage = 102,
1586 : Error = 103,
1587 : DbSize = 104,
1588 : GetSlruSegment = 105,
1589 : /* future tags above this line */
1590 : /// For testing purposes, not available in production.
1591 : #[cfg(feature = "testing")]
1592 : Test = 199,
1593 : }
1594 :
1595 : impl TryFrom<u8> for PagestreamFeMessageTag {
1596 : type Error = u8;
1597 4 : fn try_from(value: u8) -> Result<Self, u8> {
1598 4 : match value {
1599 1 : 0 => Ok(PagestreamFeMessageTag::Exists),
1600 1 : 1 => Ok(PagestreamFeMessageTag::Nblocks),
1601 1 : 2 => Ok(PagestreamFeMessageTag::GetPage),
1602 1 : 3 => Ok(PagestreamFeMessageTag::DbSize),
1603 0 : 4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
1604 : #[cfg(feature = "testing")]
1605 0 : 99 => Ok(PagestreamFeMessageTag::Test),
1606 0 : _ => Err(value),
1607 : }
1608 4 : }
1609 : }
1610 :
1611 : impl TryFrom<u8> for PagestreamBeMessageTag {
1612 : type Error = u8;
1613 0 : fn try_from(value: u8) -> Result<Self, u8> {
1614 0 : match value {
1615 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
1616 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
1617 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
1618 0 : 103 => Ok(PagestreamBeMessageTag::Error),
1619 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
1620 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
1621 : #[cfg(feature = "testing")]
1622 0 : 199 => Ok(PagestreamBeMessageTag::Test),
1623 0 : _ => Err(value),
1624 : }
1625 0 : }
1626 : }
1627 :
1628 : // A GetPage request contains two LSN values:
1629 : //
1630 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
1631 : // "get the latest version present". It's used by the primary server, which knows that no one else
1632 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
1633 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
1634 : //
1635 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
1636 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
1637 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
1638 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
1639 : // request without waiting for 'request_lsn' to arrive.
1640 : //
1641 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
1642 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
1643 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
1644 : // standby to request a page at a particular non-latest LSN, and also include the
1645 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
1646 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
1647 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
1648 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
1649 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
1650 : // difference in the responses between V1 and V2.
1651 : //
1652 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
1653 : // as well as other fields from requests, which allows to verify that we receive response for our request.
1654 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
1655 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
1656 : //
1657 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1658 : pub enum PagestreamProtocolVersion {
1659 : V2,
1660 : V3,
1661 : }
1662 :
1663 : pub type RequestId = u64;
1664 :
1665 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1666 : pub struct PagestreamRequest {
1667 : pub reqid: RequestId,
1668 : pub request_lsn: Lsn,
1669 : pub not_modified_since: Lsn,
1670 : }
1671 :
1672 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1673 : pub struct PagestreamExistsRequest {
1674 : pub hdr: PagestreamRequest,
1675 : pub rel: RelTag,
1676 : }
1677 :
1678 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1679 : pub struct PagestreamNblocksRequest {
1680 : pub hdr: PagestreamRequest,
1681 : pub rel: RelTag,
1682 : }
1683 :
1684 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1685 : pub struct PagestreamGetPageRequest {
1686 : pub hdr: PagestreamRequest,
1687 : pub rel: RelTag,
1688 : pub blkno: u32,
1689 : }
1690 :
1691 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1692 : pub struct PagestreamDbSizeRequest {
1693 : pub hdr: PagestreamRequest,
1694 : pub dbnode: u32,
1695 : }
1696 :
1697 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
1698 : pub struct PagestreamGetSlruSegmentRequest {
1699 : pub hdr: PagestreamRequest,
1700 : pub kind: u8,
1701 : pub segno: u32,
1702 : }
1703 :
1704 : #[derive(Debug)]
1705 : pub struct PagestreamExistsResponse {
1706 : pub req: PagestreamExistsRequest,
1707 : pub exists: bool,
1708 : }
1709 :
1710 : #[derive(Debug)]
1711 : pub struct PagestreamNblocksResponse {
1712 : pub req: PagestreamNblocksRequest,
1713 : pub n_blocks: u32,
1714 : }
1715 :
1716 : #[derive(Debug)]
1717 : pub struct PagestreamGetPageResponse {
1718 : pub req: PagestreamGetPageRequest,
1719 : pub page: Bytes,
1720 : }
1721 :
1722 : #[derive(Debug)]
1723 : pub struct PagestreamGetSlruSegmentResponse {
1724 : pub req: PagestreamGetSlruSegmentRequest,
1725 : pub segment: Bytes,
1726 : }
1727 :
1728 : #[derive(Debug)]
1729 : pub struct PagestreamErrorResponse {
1730 : pub req: PagestreamRequest,
1731 : pub message: String,
1732 : }
1733 :
1734 : #[derive(Debug)]
1735 : pub struct PagestreamDbSizeResponse {
1736 : pub req: PagestreamDbSizeRequest,
1737 : pub db_size: i64,
1738 : }
1739 :
1740 : #[cfg(feature = "testing")]
1741 : #[derive(Debug, PartialEq, Eq, Clone)]
1742 : pub struct PagestreamTestRequest {
1743 : pub hdr: PagestreamRequest,
1744 : pub batch_key: u64,
1745 : pub message: String,
1746 : }
1747 :
1748 : #[cfg(feature = "testing")]
1749 : #[derive(Debug)]
1750 : pub struct PagestreamTestResponse {
1751 : pub req: PagestreamTestRequest,
1752 : }
1753 :
1754 : // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
1755 : // that require pageserver-internal types. It is sufficient to get the total size.
1756 0 : #[derive(Serialize, Deserialize, Debug)]
1757 : pub struct TenantHistorySize {
1758 : pub id: TenantId,
1759 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1760 : ///
1761 : /// Will be none if `?inputs_only=true` was given.
1762 : pub size: Option<u64>,
1763 : }
1764 :
1765 : impl PagestreamFeMessage {
1766 : /// Serialize a compute -> pageserver message. This is currently only used in testing
1767 : /// tools. Always uses protocol version 3.
1768 4 : pub fn serialize(&self) -> Bytes {
1769 4 : let mut bytes = BytesMut::new();
1770 4 :
1771 4 : match self {
1772 1 : Self::Exists(req) => {
1773 1 : bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
1774 1 : bytes.put_u64(req.hdr.reqid);
1775 1 : bytes.put_u64(req.hdr.request_lsn.0);
1776 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1777 1 : bytes.put_u32(req.rel.spcnode);
1778 1 : bytes.put_u32(req.rel.dbnode);
1779 1 : bytes.put_u32(req.rel.relnode);
1780 1 : bytes.put_u8(req.rel.forknum);
1781 1 : }
1782 :
1783 1 : Self::Nblocks(req) => {
1784 1 : bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
1785 1 : bytes.put_u64(req.hdr.reqid);
1786 1 : bytes.put_u64(req.hdr.request_lsn.0);
1787 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1788 1 : bytes.put_u32(req.rel.spcnode);
1789 1 : bytes.put_u32(req.rel.dbnode);
1790 1 : bytes.put_u32(req.rel.relnode);
1791 1 : bytes.put_u8(req.rel.forknum);
1792 1 : }
1793 :
1794 1 : Self::GetPage(req) => {
1795 1 : bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
1796 1 : bytes.put_u64(req.hdr.reqid);
1797 1 : bytes.put_u64(req.hdr.request_lsn.0);
1798 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1799 1 : bytes.put_u32(req.rel.spcnode);
1800 1 : bytes.put_u32(req.rel.dbnode);
1801 1 : bytes.put_u32(req.rel.relnode);
1802 1 : bytes.put_u8(req.rel.forknum);
1803 1 : bytes.put_u32(req.blkno);
1804 1 : }
1805 :
1806 1 : Self::DbSize(req) => {
1807 1 : bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
1808 1 : bytes.put_u64(req.hdr.reqid);
1809 1 : bytes.put_u64(req.hdr.request_lsn.0);
1810 1 : bytes.put_u64(req.hdr.not_modified_since.0);
1811 1 : bytes.put_u32(req.dbnode);
1812 1 : }
1813 :
1814 0 : Self::GetSlruSegment(req) => {
1815 0 : bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
1816 0 : bytes.put_u64(req.hdr.reqid);
1817 0 : bytes.put_u64(req.hdr.request_lsn.0);
1818 0 : bytes.put_u64(req.hdr.not_modified_since.0);
1819 0 : bytes.put_u8(req.kind);
1820 0 : bytes.put_u32(req.segno);
1821 0 : }
1822 : #[cfg(feature = "testing")]
1823 0 : Self::Test(req) => {
1824 0 : bytes.put_u8(PagestreamFeMessageTag::Test as u8);
1825 0 : bytes.put_u64(req.hdr.reqid);
1826 0 : bytes.put_u64(req.hdr.request_lsn.0);
1827 0 : bytes.put_u64(req.hdr.not_modified_since.0);
1828 0 : bytes.put_u64(req.batch_key);
1829 0 : let message = req.message.as_bytes();
1830 0 : bytes.put_u64(message.len() as u64);
1831 0 : bytes.put_slice(message);
1832 0 : }
1833 : }
1834 :
1835 4 : bytes.into()
1836 4 : }
1837 :
1838 4 : pub fn parse<R: std::io::Read>(
1839 4 : body: &mut R,
1840 4 : protocol_version: PagestreamProtocolVersion,
1841 4 : ) -> anyhow::Result<PagestreamFeMessage> {
1842 : // these correspond to the NeonMessageTag enum in pagestore_client.h
1843 : //
1844 : // TODO: consider using protobuf or serde bincode for less error prone
1845 : // serialization.
1846 4 : let msg_tag = body.read_u8()?;
1847 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
1848 : PagestreamProtocolVersion::V2 => (
1849 : 0,
1850 0 : Lsn::from(body.read_u64::<BigEndian>()?),
1851 0 : Lsn::from(body.read_u64::<BigEndian>()?),
1852 : ),
1853 : PagestreamProtocolVersion::V3 => (
1854 4 : body.read_u64::<BigEndian>()?,
1855 4 : Lsn::from(body.read_u64::<BigEndian>()?),
1856 4 : Lsn::from(body.read_u64::<BigEndian>()?),
1857 : ),
1858 : };
1859 :
1860 4 : match PagestreamFeMessageTag::try_from(msg_tag)
1861 4 : .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
1862 : {
1863 : PagestreamFeMessageTag::Exists => {
1864 : Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
1865 1 : hdr: PagestreamRequest {
1866 1 : reqid,
1867 1 : request_lsn,
1868 1 : not_modified_since,
1869 1 : },
1870 1 : rel: RelTag {
1871 1 : spcnode: body.read_u32::<BigEndian>()?,
1872 1 : dbnode: body.read_u32::<BigEndian>()?,
1873 1 : relnode: body.read_u32::<BigEndian>()?,
1874 1 : forknum: body.read_u8()?,
1875 : },
1876 : }))
1877 : }
1878 : PagestreamFeMessageTag::Nblocks => {
1879 : Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
1880 1 : hdr: PagestreamRequest {
1881 1 : reqid,
1882 1 : request_lsn,
1883 1 : not_modified_since,
1884 1 : },
1885 1 : rel: RelTag {
1886 1 : spcnode: body.read_u32::<BigEndian>()?,
1887 1 : dbnode: body.read_u32::<BigEndian>()?,
1888 1 : relnode: body.read_u32::<BigEndian>()?,
1889 1 : forknum: body.read_u8()?,
1890 : },
1891 : }))
1892 : }
1893 : PagestreamFeMessageTag::GetPage => {
1894 : Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
1895 1 : hdr: PagestreamRequest {
1896 1 : reqid,
1897 1 : request_lsn,
1898 1 : not_modified_since,
1899 1 : },
1900 1 : rel: RelTag {
1901 1 : spcnode: body.read_u32::<BigEndian>()?,
1902 1 : dbnode: body.read_u32::<BigEndian>()?,
1903 1 : relnode: body.read_u32::<BigEndian>()?,
1904 1 : forknum: body.read_u8()?,
1905 : },
1906 1 : blkno: body.read_u32::<BigEndian>()?,
1907 : }))
1908 : }
1909 : PagestreamFeMessageTag::DbSize => {
1910 : Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
1911 1 : hdr: PagestreamRequest {
1912 1 : reqid,
1913 1 : request_lsn,
1914 1 : not_modified_since,
1915 1 : },
1916 1 : dbnode: body.read_u32::<BigEndian>()?,
1917 : }))
1918 : }
1919 : PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
1920 : PagestreamGetSlruSegmentRequest {
1921 0 : hdr: PagestreamRequest {
1922 0 : reqid,
1923 0 : request_lsn,
1924 0 : not_modified_since,
1925 0 : },
1926 0 : kind: body.read_u8()?,
1927 0 : segno: body.read_u32::<BigEndian>()?,
1928 : },
1929 : )),
1930 : #[cfg(feature = "testing")]
1931 : PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
1932 0 : hdr: PagestreamRequest {
1933 0 : reqid,
1934 0 : request_lsn,
1935 0 : not_modified_since,
1936 0 : },
1937 0 : batch_key: body.read_u64::<BigEndian>()?,
1938 : message: {
1939 0 : let len = body.read_u64::<BigEndian>()?;
1940 0 : let mut buf = vec![0; len as usize];
1941 0 : body.read_exact(&mut buf)?;
1942 0 : String::from_utf8(buf)?
1943 : },
1944 : })),
1945 : }
1946 4 : }
1947 : }
1948 :
1949 : impl PagestreamBeMessage {
1950 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
1951 0 : let mut bytes = BytesMut::new();
1952 :
1953 : use PagestreamBeMessageTag as Tag;
1954 0 : match protocol_version {
1955 : PagestreamProtocolVersion::V2 => {
1956 0 : match self {
1957 0 : Self::Exists(resp) => {
1958 0 : bytes.put_u8(Tag::Exists as u8);
1959 0 : bytes.put_u8(resp.exists as u8);
1960 0 : }
1961 :
1962 0 : Self::Nblocks(resp) => {
1963 0 : bytes.put_u8(Tag::Nblocks as u8);
1964 0 : bytes.put_u32(resp.n_blocks);
1965 0 : }
1966 :
1967 0 : Self::GetPage(resp) => {
1968 0 : bytes.put_u8(Tag::GetPage as u8);
1969 0 : bytes.put(&resp.page[..])
1970 : }
1971 :
1972 0 : Self::Error(resp) => {
1973 0 : bytes.put_u8(Tag::Error as u8);
1974 0 : bytes.put(resp.message.as_bytes());
1975 0 : bytes.put_u8(0); // null terminator
1976 0 : }
1977 0 : Self::DbSize(resp) => {
1978 0 : bytes.put_u8(Tag::DbSize as u8);
1979 0 : bytes.put_i64(resp.db_size);
1980 0 : }
1981 :
1982 0 : Self::GetSlruSegment(resp) => {
1983 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
1984 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
1985 0 : bytes.put(&resp.segment[..]);
1986 0 : }
1987 :
1988 : #[cfg(feature = "testing")]
1989 0 : Self::Test(resp) => {
1990 0 : bytes.put_u8(Tag::Test as u8);
1991 0 : bytes.put_u64(resp.req.batch_key);
1992 0 : let message = resp.req.message.as_bytes();
1993 0 : bytes.put_u64(message.len() as u64);
1994 0 : bytes.put_slice(message);
1995 0 : }
1996 : }
1997 : }
1998 : PagestreamProtocolVersion::V3 => {
1999 0 : match self {
2000 0 : Self::Exists(resp) => {
2001 0 : bytes.put_u8(Tag::Exists as u8);
2002 0 : bytes.put_u64(resp.req.hdr.reqid);
2003 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2004 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2005 0 : bytes.put_u32(resp.req.rel.spcnode);
2006 0 : bytes.put_u32(resp.req.rel.dbnode);
2007 0 : bytes.put_u32(resp.req.rel.relnode);
2008 0 : bytes.put_u8(resp.req.rel.forknum);
2009 0 : bytes.put_u8(resp.exists as u8);
2010 0 : }
2011 :
2012 0 : Self::Nblocks(resp) => {
2013 0 : bytes.put_u8(Tag::Nblocks as u8);
2014 0 : bytes.put_u64(resp.req.hdr.reqid);
2015 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2016 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2017 0 : bytes.put_u32(resp.req.rel.spcnode);
2018 0 : bytes.put_u32(resp.req.rel.dbnode);
2019 0 : bytes.put_u32(resp.req.rel.relnode);
2020 0 : bytes.put_u8(resp.req.rel.forknum);
2021 0 : bytes.put_u32(resp.n_blocks);
2022 0 : }
2023 :
2024 0 : Self::GetPage(resp) => {
2025 0 : bytes.put_u8(Tag::GetPage as u8);
2026 0 : bytes.put_u64(resp.req.hdr.reqid);
2027 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2028 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2029 0 : bytes.put_u32(resp.req.rel.spcnode);
2030 0 : bytes.put_u32(resp.req.rel.dbnode);
2031 0 : bytes.put_u32(resp.req.rel.relnode);
2032 0 : bytes.put_u8(resp.req.rel.forknum);
2033 0 : bytes.put_u32(resp.req.blkno);
2034 0 : bytes.put(&resp.page[..])
2035 : }
2036 :
2037 0 : Self::Error(resp) => {
2038 0 : bytes.put_u8(Tag::Error as u8);
2039 0 : bytes.put_u64(resp.req.reqid);
2040 0 : bytes.put_u64(resp.req.request_lsn.0);
2041 0 : bytes.put_u64(resp.req.not_modified_since.0);
2042 0 : bytes.put(resp.message.as_bytes());
2043 0 : bytes.put_u8(0); // null terminator
2044 0 : }
2045 0 : Self::DbSize(resp) => {
2046 0 : bytes.put_u8(Tag::DbSize as u8);
2047 0 : bytes.put_u64(resp.req.hdr.reqid);
2048 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2049 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2050 0 : bytes.put_u32(resp.req.dbnode);
2051 0 : bytes.put_i64(resp.db_size);
2052 0 : }
2053 :
2054 0 : Self::GetSlruSegment(resp) => {
2055 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
2056 0 : bytes.put_u64(resp.req.hdr.reqid);
2057 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2058 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2059 0 : bytes.put_u8(resp.req.kind);
2060 0 : bytes.put_u32(resp.req.segno);
2061 0 : bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
2062 0 : bytes.put(&resp.segment[..]);
2063 0 : }
2064 :
2065 : #[cfg(feature = "testing")]
2066 0 : Self::Test(resp) => {
2067 0 : bytes.put_u8(Tag::Test as u8);
2068 0 : bytes.put_u64(resp.req.hdr.reqid);
2069 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
2070 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
2071 0 : bytes.put_u64(resp.req.batch_key);
2072 0 : let message = resp.req.message.as_bytes();
2073 0 : bytes.put_u64(message.len() as u64);
2074 0 : bytes.put_slice(message);
2075 0 : }
2076 : }
2077 : }
2078 : }
2079 0 : bytes.into()
2080 0 : }
2081 :
2082 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
2083 0 : let mut buf = buf.reader();
2084 0 : let msg_tag = buf.read_u8()?;
2085 :
2086 : use PagestreamBeMessageTag as Tag;
2087 0 : let ok =
2088 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
2089 : Tag::Exists => {
2090 0 : let reqid = buf.read_u64::<BigEndian>()?;
2091 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2092 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2093 0 : let rel = RelTag {
2094 0 : spcnode: buf.read_u32::<BigEndian>()?,
2095 0 : dbnode: buf.read_u32::<BigEndian>()?,
2096 0 : relnode: buf.read_u32::<BigEndian>()?,
2097 0 : forknum: buf.read_u8()?,
2098 : };
2099 0 : let exists = buf.read_u8()? != 0;
2100 0 : Self::Exists(PagestreamExistsResponse {
2101 0 : req: PagestreamExistsRequest {
2102 0 : hdr: PagestreamRequest {
2103 0 : reqid,
2104 0 : request_lsn,
2105 0 : not_modified_since,
2106 0 : },
2107 0 : rel,
2108 0 : },
2109 0 : exists,
2110 0 : })
2111 : }
2112 : Tag::Nblocks => {
2113 0 : let reqid = buf.read_u64::<BigEndian>()?;
2114 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2115 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2116 0 : let rel = RelTag {
2117 0 : spcnode: buf.read_u32::<BigEndian>()?,
2118 0 : dbnode: buf.read_u32::<BigEndian>()?,
2119 0 : relnode: buf.read_u32::<BigEndian>()?,
2120 0 : forknum: buf.read_u8()?,
2121 : };
2122 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2123 0 : Self::Nblocks(PagestreamNblocksResponse {
2124 0 : req: PagestreamNblocksRequest {
2125 0 : hdr: PagestreamRequest {
2126 0 : reqid,
2127 0 : request_lsn,
2128 0 : not_modified_since,
2129 0 : },
2130 0 : rel,
2131 0 : },
2132 0 : n_blocks,
2133 0 : })
2134 : }
2135 : Tag::GetPage => {
2136 0 : let reqid = buf.read_u64::<BigEndian>()?;
2137 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2138 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2139 0 : let rel = RelTag {
2140 0 : spcnode: buf.read_u32::<BigEndian>()?,
2141 0 : dbnode: buf.read_u32::<BigEndian>()?,
2142 0 : relnode: buf.read_u32::<BigEndian>()?,
2143 0 : forknum: buf.read_u8()?,
2144 : };
2145 0 : let blkno = buf.read_u32::<BigEndian>()?;
2146 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
2147 0 : buf.read_exact(&mut page)?;
2148 0 : Self::GetPage(PagestreamGetPageResponse {
2149 0 : req: PagestreamGetPageRequest {
2150 0 : hdr: PagestreamRequest {
2151 0 : reqid,
2152 0 : request_lsn,
2153 0 : not_modified_since,
2154 0 : },
2155 0 : rel,
2156 0 : blkno,
2157 0 : },
2158 0 : page: page.into(),
2159 0 : })
2160 : }
2161 : Tag::Error => {
2162 0 : let reqid = buf.read_u64::<BigEndian>()?;
2163 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2164 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2165 0 : let mut msg = Vec::new();
2166 0 : buf.read_until(0, &mut msg)?;
2167 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
2168 0 : let rust_str = cstring.to_str()?;
2169 0 : Self::Error(PagestreamErrorResponse {
2170 0 : req: PagestreamRequest {
2171 0 : reqid,
2172 0 : request_lsn,
2173 0 : not_modified_since,
2174 0 : },
2175 0 : message: rust_str.to_owned(),
2176 0 : })
2177 : }
2178 : Tag::DbSize => {
2179 0 : let reqid = buf.read_u64::<BigEndian>()?;
2180 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2181 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2182 0 : let dbnode = buf.read_u32::<BigEndian>()?;
2183 0 : let db_size = buf.read_i64::<BigEndian>()?;
2184 0 : Self::DbSize(PagestreamDbSizeResponse {
2185 0 : req: PagestreamDbSizeRequest {
2186 0 : hdr: PagestreamRequest {
2187 0 : reqid,
2188 0 : request_lsn,
2189 0 : not_modified_since,
2190 0 : },
2191 0 : dbnode,
2192 0 : },
2193 0 : db_size,
2194 0 : })
2195 : }
2196 : Tag::GetSlruSegment => {
2197 0 : let reqid = buf.read_u64::<BigEndian>()?;
2198 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2199 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2200 0 : let kind = buf.read_u8()?;
2201 0 : let segno = buf.read_u32::<BigEndian>()?;
2202 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
2203 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
2204 0 : buf.read_exact(&mut segment)?;
2205 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
2206 0 : req: PagestreamGetSlruSegmentRequest {
2207 0 : hdr: PagestreamRequest {
2208 0 : reqid,
2209 0 : request_lsn,
2210 0 : not_modified_since,
2211 0 : },
2212 0 : kind,
2213 0 : segno,
2214 0 : },
2215 0 : segment: segment.into(),
2216 0 : })
2217 : }
2218 : #[cfg(feature = "testing")]
2219 : Tag::Test => {
2220 0 : let reqid = buf.read_u64::<BigEndian>()?;
2221 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
2222 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
2223 0 : let batch_key = buf.read_u64::<BigEndian>()?;
2224 0 : let len = buf.read_u64::<BigEndian>()?;
2225 0 : let mut msg = vec![0; len as usize];
2226 0 : buf.read_exact(&mut msg)?;
2227 0 : let message = String::from_utf8(msg)?;
2228 0 : Self::Test(PagestreamTestResponse {
2229 0 : req: PagestreamTestRequest {
2230 0 : hdr: PagestreamRequest {
2231 0 : reqid,
2232 0 : request_lsn,
2233 0 : not_modified_since,
2234 0 : },
2235 0 : batch_key,
2236 0 : message,
2237 0 : },
2238 0 : })
2239 : }
2240 : };
2241 0 : let remaining = buf.into_inner();
2242 0 : if !remaining.is_empty() {
2243 0 : anyhow::bail!(
2244 0 : "remaining bytes in msg with tag={msg_tag}: {}",
2245 0 : remaining.len()
2246 0 : );
2247 0 : }
2248 0 : Ok(ok)
2249 0 : }
2250 :
2251 0 : pub fn kind(&self) -> &'static str {
2252 0 : match self {
2253 0 : Self::Exists(_) => "Exists",
2254 0 : Self::Nblocks(_) => "Nblocks",
2255 0 : Self::GetPage(_) => "GetPage",
2256 0 : Self::Error(_) => "Error",
2257 0 : Self::DbSize(_) => "DbSize",
2258 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
2259 : #[cfg(feature = "testing")]
2260 0 : Self::Test(_) => "Test",
2261 : }
2262 0 : }
2263 : }
2264 :
2265 0 : #[derive(Debug, Serialize, Deserialize)]
2266 : pub struct PageTraceEvent {
2267 : pub key: CompactKey,
2268 : pub effective_lsn: Lsn,
2269 : pub time: SystemTime,
2270 : }
2271 :
2272 : impl Default for PageTraceEvent {
2273 0 : fn default() -> Self {
2274 0 : Self {
2275 0 : key: Default::default(),
2276 0 : effective_lsn: Default::default(),
2277 0 : time: std::time::UNIX_EPOCH,
2278 0 : }
2279 0 : }
2280 : }
2281 :
2282 : #[cfg(test)]
2283 : mod tests {
2284 : use std::str::FromStr;
2285 :
2286 : use serde_json::json;
2287 :
2288 : use super::*;
2289 :
2290 : #[test]
2291 1 : fn test_pagestream() {
2292 1 : // Test serialization/deserialization of PagestreamFeMessage
2293 1 : let messages = vec![
2294 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
2295 1 : hdr: PagestreamRequest {
2296 1 : reqid: 0,
2297 1 : request_lsn: Lsn(4),
2298 1 : not_modified_since: Lsn(3),
2299 1 : },
2300 1 : rel: RelTag {
2301 1 : forknum: 1,
2302 1 : spcnode: 2,
2303 1 : dbnode: 3,
2304 1 : relnode: 4,
2305 1 : },
2306 1 : }),
2307 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
2308 1 : hdr: PagestreamRequest {
2309 1 : reqid: 0,
2310 1 : request_lsn: Lsn(4),
2311 1 : not_modified_since: Lsn(4),
2312 1 : },
2313 1 : rel: RelTag {
2314 1 : forknum: 1,
2315 1 : spcnode: 2,
2316 1 : dbnode: 3,
2317 1 : relnode: 4,
2318 1 : },
2319 1 : }),
2320 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2321 1 : hdr: PagestreamRequest {
2322 1 : reqid: 0,
2323 1 : request_lsn: Lsn(4),
2324 1 : not_modified_since: Lsn(3),
2325 1 : },
2326 1 : rel: RelTag {
2327 1 : forknum: 1,
2328 1 : spcnode: 2,
2329 1 : dbnode: 3,
2330 1 : relnode: 4,
2331 1 : },
2332 1 : blkno: 7,
2333 1 : }),
2334 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
2335 1 : hdr: PagestreamRequest {
2336 1 : reqid: 0,
2337 1 : request_lsn: Lsn(4),
2338 1 : not_modified_since: Lsn(3),
2339 1 : },
2340 1 : dbnode: 7,
2341 1 : }),
2342 1 : ];
2343 5 : for msg in messages {
2344 4 : let bytes = msg.serialize();
2345 4 : let reconstructed =
2346 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
2347 4 : .unwrap();
2348 4 : assert!(msg == reconstructed);
2349 : }
2350 1 : }
2351 :
2352 : #[test]
2353 1 : fn test_tenantinfo_serde() {
2354 1 : // Test serialization/deserialization of TenantInfo
2355 1 : let original_active = TenantInfo {
2356 1 : id: TenantShardId::unsharded(TenantId::generate()),
2357 1 : state: TenantState::Active,
2358 1 : current_physical_size: Some(42),
2359 1 : attachment_status: TenantAttachmentStatus::Attached,
2360 1 : generation: 1,
2361 1 : gc_blocking: None,
2362 1 : };
2363 1 : let expected_active = json!({
2364 1 : "id": original_active.id.to_string(),
2365 1 : "state": {
2366 1 : "slug": "Active",
2367 1 : },
2368 1 : "current_physical_size": 42,
2369 1 : "attachment_status": {
2370 1 : "slug":"attached",
2371 1 : },
2372 1 : "generation" : 1
2373 1 : });
2374 1 :
2375 1 : let original_broken = TenantInfo {
2376 1 : id: TenantShardId::unsharded(TenantId::generate()),
2377 1 : state: TenantState::Broken {
2378 1 : reason: "reason".into(),
2379 1 : backtrace: "backtrace info".into(),
2380 1 : },
2381 1 : current_physical_size: Some(42),
2382 1 : attachment_status: TenantAttachmentStatus::Attached,
2383 1 : generation: 1,
2384 1 : gc_blocking: None,
2385 1 : };
2386 1 : let expected_broken = json!({
2387 1 : "id": original_broken.id.to_string(),
2388 1 : "state": {
2389 1 : "slug": "Broken",
2390 1 : "data": {
2391 1 : "backtrace": "backtrace info",
2392 1 : "reason": "reason",
2393 1 : }
2394 1 : },
2395 1 : "current_physical_size": 42,
2396 1 : "attachment_status": {
2397 1 : "slug":"attached",
2398 1 : },
2399 1 : "generation" : 1
2400 1 : });
2401 1 :
2402 1 : assert_eq!(
2403 1 : serde_json::to_value(&original_active).unwrap(),
2404 1 : expected_active
2405 1 : );
2406 :
2407 1 : assert_eq!(
2408 1 : serde_json::to_value(&original_broken).unwrap(),
2409 1 : expected_broken
2410 1 : );
2411 1 : assert!(format!("{:?}", &original_broken.state).contains("reason"));
2412 1 : assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
2413 1 : }
2414 :
2415 : #[test]
2416 1 : fn test_reject_unknown_field() {
2417 1 : let id = TenantId::generate();
2418 1 : let config_request = json!({
2419 1 : "tenant_id": id.to_string(),
2420 1 : "unknown_field": "unknown_value".to_string(),
2421 1 : });
2422 1 : let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
2423 1 : assert!(
2424 1 : err.to_string().contains("unknown field `unknown_field`"),
2425 0 : "expect unknown field `unknown_field` error, got: {}",
2426 : err
2427 : );
2428 1 : }
2429 :
2430 : #[test]
2431 1 : fn tenantstatus_activating_serde() {
2432 1 : let states = [TenantState::Activating(ActivatingFrom::Attaching)];
2433 1 : let expected = "[{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
2434 1 :
2435 1 : let actual = serde_json::to_string(&states).unwrap();
2436 1 :
2437 1 : assert_eq!(actual, expected);
2438 :
2439 1 : let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
2440 1 :
2441 1 : assert_eq!(states.as_slice(), &parsed);
2442 1 : }
2443 :
2444 : #[test]
2445 1 : fn tenantstatus_activating_strum() {
2446 1 : // tests added, because we use these for metrics
2447 1 : let examples = [
2448 1 : (line!(), TenantState::Attaching, "Attaching"),
2449 1 : (
2450 1 : line!(),
2451 1 : TenantState::Activating(ActivatingFrom::Attaching),
2452 1 : "Activating",
2453 1 : ),
2454 1 : (line!(), TenantState::Active, "Active"),
2455 1 : (
2456 1 : line!(),
2457 1 : TenantState::Stopping {
2458 1 : progress: utils::completion::Barrier::default(),
2459 1 : },
2460 1 : "Stopping",
2461 1 : ),
2462 1 : (
2463 1 : line!(),
2464 1 : TenantState::Broken {
2465 1 : reason: "Example".into(),
2466 1 : backtrace: "Looooong backtrace".into(),
2467 1 : },
2468 1 : "Broken",
2469 1 : ),
2470 1 : ];
2471 :
2472 6 : for (line, rendered, expected) in examples {
2473 5 : let actual: &'static str = rendered.into();
2474 5 : assert_eq!(actual, expected, "example on {line}");
2475 : }
2476 1 : }
2477 :
2478 : #[test]
2479 1 : fn test_image_compression_algorithm_parsing() {
2480 : use ImageCompressionAlgorithm::*;
2481 1 : let cases = [
2482 1 : ("disabled", Disabled),
2483 1 : ("zstd", Zstd { level: None }),
2484 1 : ("zstd(18)", Zstd { level: Some(18) }),
2485 1 : ("zstd(-3)", Zstd { level: Some(-3) }),
2486 1 : ];
2487 :
2488 5 : for (display, expected) in cases {
2489 4 : assert_eq!(
2490 4 : ImageCompressionAlgorithm::from_str(display).unwrap(),
2491 : expected,
2492 0 : "parsing works"
2493 : );
2494 4 : assert_eq!(format!("{expected}"), display, "Display FromStr roundtrip");
2495 :
2496 4 : let ser = serde_json::to_string(&expected).expect("serialization");
2497 4 : assert_eq!(
2498 4 : serde_json::from_str::<ImageCompressionAlgorithm>(&ser).unwrap(),
2499 : expected,
2500 0 : "serde roundtrip"
2501 : );
2502 :
2503 4 : assert_eq!(
2504 4 : serde_json::Value::String(display.to_string()),
2505 4 : serde_json::to_value(expected).unwrap(),
2506 0 : "Display is the serde serialization"
2507 : );
2508 : }
2509 1 : }
2510 :
2511 : #[test]
2512 1 : fn test_tenant_config_patch_request_serde() {
2513 1 : let patch_request = TenantConfigPatchRequest {
2514 1 : tenant_id: TenantId::from_str("17c6d121946a61e5ab0fe5a2fd4d8215").unwrap(),
2515 1 : config: TenantConfigPatch {
2516 1 : checkpoint_distance: FieldPatch::Upsert(42),
2517 1 : gc_horizon: FieldPatch::Remove,
2518 1 : compaction_threshold: FieldPatch::Noop,
2519 1 : ..TenantConfigPatch::default()
2520 1 : },
2521 1 : };
2522 1 :
2523 1 : let json = serde_json::to_string(&patch_request).unwrap();
2524 1 :
2525 1 : let expected = r#"{"tenant_id":"17c6d121946a61e5ab0fe5a2fd4d8215","checkpoint_distance":42,"gc_horizon":null}"#;
2526 1 : assert_eq!(json, expected);
2527 :
2528 1 : let decoded: TenantConfigPatchRequest = serde_json::from_str(&json).unwrap();
2529 1 : assert_eq!(decoded.tenant_id, patch_request.tenant_id);
2530 1 : assert_eq!(decoded.config, patch_request.config);
2531 :
2532 : // Now apply the patch to a config to demonstrate semantics
2533 :
2534 1 : let base = TenantConfig {
2535 1 : checkpoint_distance: Some(28),
2536 1 : gc_horizon: Some(100),
2537 1 : compaction_target_size: Some(1024),
2538 1 : ..Default::default()
2539 1 : };
2540 1 :
2541 1 : let expected = TenantConfig {
2542 1 : checkpoint_distance: Some(42),
2543 1 : gc_horizon: None,
2544 1 : ..base.clone()
2545 1 : };
2546 1 :
2547 1 : let patched = base.apply_patch(decoded.config).unwrap();
2548 1 :
2549 1 : assert_eq!(patched, expected);
2550 1 : }
2551 : }
|