LCOV - code coverage report
Current view: top level - libs/safekeeper_api/src - models.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 44.0 % 50 22
Test Date: 2025-07-16 12:29:03 Functions: 2.9 % 103 3

            Line data    Source code
       1              : //! Types used in safekeeper http API. Many of them are also reused internally.
       2              : 
       3              : use std::net::SocketAddr;
       4              : 
       5              : use pageserver_api::shard::ShardIdentity;
       6              : use postgres_ffi_types::TimestampTz;
       7              : use postgres_versioninfo::PgVersionId;
       8              : use serde::{Deserialize, Serialize};
       9              : use tokio::time::Instant;
      10              : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
      11              : use utils::lsn::Lsn;
      12              : use utils::pageserver_feedback::PageserverFeedback;
      13              : 
      14              : use crate::membership::{Configuration, SafekeeperGeneration};
      15              : use crate::{ServerInfo, Term};
      16              : 
      17            0 : #[derive(Debug, Serialize, Deserialize)]
      18              : pub struct SafekeeperStatus {
      19              :     pub id: NodeId,
      20              : }
      21              : 
      22            0 : #[derive(Serialize, Deserialize, Clone)]
      23              : pub struct TimelineCreateRequest {
      24              :     pub tenant_id: TenantId,
      25              :     pub timeline_id: TimelineId,
      26              :     pub mconf: Configuration,
      27              :     pub pg_version: PgVersionId,
      28              :     pub system_id: Option<u64>,
      29              :     // By default WAL_SEGMENT_SIZE
      30              :     pub wal_seg_size: Option<u32>,
      31              :     pub start_lsn: Lsn,
      32              :     // Normal creation should omit this field (start_lsn initializes all LSNs).
      33              :     // However, we allow specifying custom value higher than start_lsn for
      34              :     // manual recovery case, see test_s3_wal_replay.
      35              :     pub commit_lsn: Option<Lsn>,
      36              : }
      37              : 
      38              : /// Same as TermLsn, but serializes LSN using display serializer
      39              : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
      40            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      41              : pub struct TermSwitchApiEntry {
      42              :     pub term: Term,
      43              :     pub lsn: Lsn,
      44              : }
      45              : 
      46              : /// Augment AcceptorState with last_log_term for convenience
      47            0 : #[derive(Debug, Serialize, Deserialize)]
      48              : pub struct AcceptorStateStatus {
      49              :     pub term: Term,
      50              :     pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility
      51              :     pub term_history: Vec<TermSwitchApiEntry>,
      52              : }
      53              : 
      54              : /// Things safekeeper should know about timeline state on peers.
      55              : /// Used as both model and internally.
      56            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      57              : pub struct PeerInfo {
      58              :     pub sk_id: NodeId,
      59              :     pub term: Term,
      60              :     /// Term of the last entry.
      61              :     pub last_log_term: Term,
      62              :     /// LSN of the last record.
      63              :     pub flush_lsn: Lsn,
      64              :     pub commit_lsn: Lsn,
      65              :     /// Since which LSN safekeeper has WAL.
      66              :     pub local_start_lsn: Lsn,
      67              :     /// When info was received. Serde annotations are not very useful but make
      68              :     /// the code compile -- we don't rely on this field externally.
      69              :     #[serde(skip)]
      70              :     #[serde(default = "Instant::now")]
      71              :     pub ts: Instant,
      72              :     pub pg_connstr: String,
      73              :     pub http_connstr: String,
      74              :     pub https_connstr: Option<String>,
      75              : }
      76              : 
      77              : pub type FullTransactionId = u64;
      78              : 
      79              : /// Hot standby feedback received from replica
      80            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      81              : pub struct HotStandbyFeedback {
      82              :     pub ts: TimestampTz,
      83              :     pub xmin: FullTransactionId,
      84              :     pub catalog_xmin: FullTransactionId,
      85              : }
      86              : 
      87              : pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      88              : 
      89              : impl HotStandbyFeedback {
      90         3019 :     pub fn empty() -> HotStandbyFeedback {
      91         3019 :         HotStandbyFeedback {
      92         3019 :             ts: 0,
      93         3019 :             xmin: 0,
      94         3019 :             catalog_xmin: 0,
      95         3019 :         }
      96         3019 :     }
      97              : }
      98              : 
      99              : /// Standby status update
     100            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     101              : pub struct StandbyReply {
     102              :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
     103              :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
     104              :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
     105              :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
     106              :     pub reply_requested: bool,
     107              : }
     108              : 
     109              : impl StandbyReply {
     110           13 :     pub fn empty() -> Self {
     111           13 :         StandbyReply {
     112           13 :             write_lsn: Lsn::INVALID,
     113           13 :             flush_lsn: Lsn::INVALID,
     114           13 :             apply_lsn: Lsn::INVALID,
     115           13 :             reply_ts: 0,
     116           13 :             reply_requested: false,
     117           13 :         }
     118           13 :     }
     119              : }
     120              : 
     121            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     122              : pub struct StandbyFeedback {
     123              :     pub reply: StandbyReply,
     124              :     pub hs_feedback: HotStandbyFeedback,
     125              : }
     126              : 
     127              : impl StandbyFeedback {
     128            7 :     pub fn empty() -> Self {
     129            7 :         StandbyFeedback {
     130            7 :             reply: StandbyReply::empty(),
     131            7 :             hs_feedback: HotStandbyFeedback::empty(),
     132            7 :         }
     133            7 :     }
     134              : }
     135              : 
     136              : /// Receiver is either pageserver or regular standby, which have different
     137              : /// feedbacks.
     138              : /// Used as both model and internally.
     139            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     140              : pub enum ReplicationFeedback {
     141              :     Pageserver(PageserverFeedback),
     142              :     Standby(StandbyFeedback),
     143              : }
     144              : 
     145              : /// Uniquely identifies a WAL service connection. Logged in spans for
     146              : /// observability.
     147              : pub type ConnectionId = u32;
     148              : 
     149              : /// Serialize is used only for json'ing in API response. Also used internally.
     150            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     151              : pub enum WalSenderState {
     152              :     Vanilla(VanillaWalSenderState),
     153              :     Interpreted(InterpretedWalSenderState),
     154              : }
     155              : 
     156            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     157              : pub struct VanillaWalSenderState {
     158              :     pub ttid: TenantTimelineId,
     159              :     pub addr: SocketAddr,
     160              :     pub conn_id: ConnectionId,
     161              :     // postgres application_name
     162              :     pub appname: Option<String>,
     163              :     pub feedback: ReplicationFeedback,
     164              : }
     165              : 
     166            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     167              : pub struct InterpretedWalSenderState {
     168              :     pub ttid: TenantTimelineId,
     169              :     pub shard: ShardIdentity,
     170              :     pub addr: SocketAddr,
     171              :     pub conn_id: ConnectionId,
     172              :     // postgres application_name
     173              :     pub appname: Option<String>,
     174              :     pub feedback: ReplicationFeedback,
     175              : }
     176              : 
     177            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     178              : pub struct WalReceiverState {
     179              :     /// None means it is recovery initiated by us (this safekeeper).
     180              :     pub conn_id: Option<ConnectionId>,
     181              :     pub status: WalReceiverStatus,
     182              : }
     183              : 
     184              : /// Walreceiver status. Currently only whether it passed voting stage and
     185              : /// started receiving the stream, but it is easy to add more if needed.
     186            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     187              : pub enum WalReceiverStatus {
     188              :     Voting,
     189              :     Streaming,
     190              : }
     191              : 
     192              : /// Info about timeline on safekeeper ready for reporting.
     193            0 : #[derive(Debug, Serialize, Deserialize)]
     194              : pub struct TimelineStatus {
     195              :     pub tenant_id: TenantId,
     196              :     pub timeline_id: TimelineId,
     197              :     pub mconf: Configuration,
     198              :     pub acceptor_state: AcceptorStateStatus,
     199              :     pub pg_info: ServerInfo,
     200              :     pub flush_lsn: Lsn,
     201              :     pub timeline_start_lsn: Lsn,
     202              :     pub local_start_lsn: Lsn,
     203              :     pub commit_lsn: Lsn,
     204              :     pub backup_lsn: Lsn,
     205              :     pub peer_horizon_lsn: Lsn,
     206              :     pub remote_consistent_lsn: Lsn,
     207              :     pub peers: Vec<PeerInfo>,
     208              :     pub walsenders: Vec<WalSenderState>,
     209              :     pub walreceivers: Vec<WalReceiverState>,
     210              : }
     211              : 
     212              : /// Request to switch membership configuration.
     213              : #[derive(Clone, Serialize, Deserialize)]
     214              : #[serde(transparent)]
     215              : pub struct TimelineMembershipSwitchRequest {
     216              :     pub mconf: Configuration,
     217              : }
     218              : 
     219              : /// In response both previous and current configuration are sent.
     220            0 : #[derive(Serialize, Deserialize)]
     221              : pub struct TimelineMembershipSwitchResponse {
     222              :     pub previous_conf: Configuration,
     223              :     pub current_conf: Configuration,
     224              :     pub last_log_term: Term,
     225              :     pub flush_lsn: Lsn,
     226              : }
     227              : 
     228            0 : #[derive(Clone, Copy, Serialize, Deserialize)]
     229              : pub struct TimelineDeleteResult {
     230              :     pub dir_existed: bool,
     231              : }
     232              : 
     233              : pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
     234              : 
     235            0 : fn lsn_invalid() -> Lsn {
     236            0 :     Lsn::INVALID
     237            0 : }
     238              : 
     239              : /// Data about safekeeper's timeline, mirrors broker.proto.
     240            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     241              : pub struct SkTimelineInfo {
     242              :     /// Term.
     243              :     pub term: Option<u64>,
     244              :     /// Term of the last entry.
     245              :     pub last_log_term: Option<u64>,
     246              :     /// LSN of the last record.
     247              :     #[serde(default = "lsn_invalid")]
     248              :     pub flush_lsn: Lsn,
     249              :     /// Up to which LSN safekeeper regards its WAL as committed.
     250              :     #[serde(default = "lsn_invalid")]
     251              :     pub commit_lsn: Lsn,
     252              :     /// LSN up to which safekeeper has backed WAL.
     253              :     #[serde(default = "lsn_invalid")]
     254              :     pub backup_lsn: Lsn,
     255              :     /// LSN of last checkpoint uploaded by pageserver.
     256              :     #[serde(default = "lsn_invalid")]
     257              :     pub remote_consistent_lsn: Lsn,
     258              :     #[serde(default = "lsn_invalid")]
     259              :     pub peer_horizon_lsn: Lsn,
     260              :     #[serde(default = "lsn_invalid")]
     261              :     pub local_start_lsn: Lsn,
     262              :     /// A connection string to use for WAL receiving.
     263              :     #[serde(default)]
     264              :     pub safekeeper_connstr: Option<String>,
     265              :     #[serde(default)]
     266              :     pub http_connstr: Option<String>,
     267              :     #[serde(default)]
     268              :     pub https_connstr: Option<String>,
     269              :     // Minimum of all active RO replicas flush LSN
     270              :     #[serde(default = "lsn_invalid")]
     271              :     pub standby_horizon: Lsn,
     272              : }
     273              : 
     274            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     275              : pub struct TimelineCopyRequest {
     276              :     pub target_timeline_id: TimelineId,
     277              :     pub until_lsn: Lsn,
     278              : }
     279              : 
     280            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     281              : pub struct TimelineTermBumpRequest {
     282              :     /// bump to
     283              :     pub term: Option<u64>,
     284              : }
     285              : 
     286            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     287              : pub struct TimelineTermBumpResponse {
     288              :     // before the request
     289              :     pub previous_term: u64,
     290              :     pub current_term: u64,
     291              : }
     292              : 
     293            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     294              : pub struct SafekeeperUtilization {
     295              :     pub timeline_count: u64,
     296              : }
     297              : 
     298              : /// pull_timeline request body.
     299            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     300              : pub struct PullTimelineRequest {
     301              :     pub tenant_id: TenantId,
     302              :     pub timeline_id: TimelineId,
     303              :     pub http_hosts: Vec<String>,
     304              :     pub ignore_tombstone: Option<bool>,
     305              : }
     306              : 
     307            0 : #[derive(Debug, Serialize, Deserialize)]
     308              : pub struct PullTimelineResponse {
     309              :     /// Donor safekeeper host.
     310              :     /// None if no pull happened because the timeline already exists.
     311              :     pub safekeeper_host: Option<String>,
     312              :     // TODO: add more fields?
     313              : }
     314              : 
     315              : /// Response to a timeline locate request.
     316              : /// Storcon-only API.
     317            0 : #[derive(Serialize, Deserialize, Clone, Debug)]
     318              : pub struct TimelineLocateResponse {
     319              :     pub generation: SafekeeperGeneration,
     320              :     pub sk_set: Vec<NodeId>,
     321              :     pub new_sk_set: Option<Vec<NodeId>>,
     322              : }
        

Generated by: LCOV version 2.1-beta