LCOV - code coverage report
Current view: top level - safekeeper/src - state.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 74.8 % 127 95
Test Date: 2025-07-16 12:29:03 Functions: 76.5 % 51 39

            Line data    Source code
       1              : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
       2              : //! and its wrapper with in memory layer (SafekeeperState).
       3              : 
       4              : use std::cmp::max;
       5              : use std::ops::Deref;
       6              : use std::time::SystemTime;
       7              : 
       8              : use anyhow::{Result, bail};
       9              : use postgres_ffi::WAL_SEGMENT_SIZE;
      10              : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
      11              : use safekeeper_api::membership::Configuration;
      12              : use safekeeper_api::models::TimelineTermBumpResponse;
      13              : use safekeeper_api::{INITIAL_TERM, ServerInfo, Term};
      14              : use serde::{Deserialize, Serialize};
      15              : use tracing::info;
      16              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      17              : use utils::lsn::Lsn;
      18              : 
      19              : use crate::control_file;
      20              : use crate::safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION};
      21              : use crate::timeline::TimelineError;
      22              : use crate::wal_backup_partial::{self};
      23              : 
      24              : /// Persistent information stored on safekeeper node about timeline.
      25              : /// On disk data is prefixed by magic and format version and followed by checksum.
      26              : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      27              : pub struct TimelinePersistentState {
      28              :     #[serde(with = "hex")]
      29              :     pub tenant_id: TenantId,
      30              :     #[serde(with = "hex")]
      31              :     pub timeline_id: TimelineId,
      32              :     /// Membership configuration.
      33              :     pub mconf: Configuration,
      34              :     /// persistent acceptor state
      35              :     pub acceptor_state: AcceptorState,
      36              :     /// information about server
      37              :     pub server: ServerInfo,
      38              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      39              :     /// for correctness, exists for monitoring purposes.
      40              :     #[serde(with = "hex")]
      41              :     pub proposer_uuid: PgUuid,
      42              :     /// Since which LSN this timeline generally starts. Safekeeper might have
      43              :     /// joined later.
      44              :     pub timeline_start_lsn: Lsn,
      45              :     /// Since which LSN safekeeper has (had) WAL for this timeline.
      46              :     /// All WAL segments next to one containing local_start_lsn are
      47              :     /// filled with data from the beginning.
      48              :     pub local_start_lsn: Lsn,
      49              :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
      50              :     /// to record boundary.
      51              :     pub commit_lsn: Lsn,
      52              :     /// LSN that points to the end of the last backed up segment. Useful to
      53              :     /// persist to avoid finding out offloading progress on boot.
      54              :     pub backup_lsn: Lsn,
      55              :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      56              :     /// of last record streamed to everyone). Persisting it helps skipping
      57              :     /// recovery in walproposer, generally we compute it from peers. In
      58              :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
      59              :     /// only by walproposer.
      60              :     pub peer_horizon_lsn: Lsn,
      61              :     /// LSN of the oldest known checkpoint made by pageserver and successfully
      62              :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
      63              :     /// informational purposes, we receive it from pageserver (or broker).
      64              :     pub remote_consistent_lsn: Lsn,
      65              :     /// Holds names of partial segments uploaded to remote storage. Used to
      66              :     /// clean up old objects without leaving garbage in remote storage.
      67              :     pub partial_backup: wal_backup_partial::State,
      68              :     /// Eviction state of the timeline. If it's Offloaded, we should download
      69              :     /// WAL files from remote storage to serve the timeline.
      70              :     pub eviction_state: EvictionState,
      71              :     pub creation_ts: SystemTime,
      72              : }
      73              : 
      74              : /// State of the local WAL files. Used to track current timeline state,
      75              : /// that can be either WAL files are present on disk or last partial segment
      76              : /// is offloaded to remote storage.
      77            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
      78              : pub enum EvictionState {
      79              :     /// WAL files are present on disk.
      80              :     Present,
      81              :     /// Last partial segment is offloaded to remote storage.
      82              :     /// Contains flush_lsn of the last offloaded segment.
      83              :     Offloaded(Lsn),
      84              : }
      85              : 
      86              : pub struct MembershipSwitchResult {
      87              :     pub previous_conf: Configuration,
      88              :     pub current_conf: Configuration,
      89              : }
      90              : 
      91              : impl TimelinePersistentState {
      92              :     /// commit_lsn is the same as start_lsn in the normal creaiton; see
      93              :     /// `TimelineCreateRequest` comments.`
      94         1471 :     pub fn new(
      95         1471 :         ttid: &TenantTimelineId,
      96         1471 :         mconf: Configuration,
      97         1471 :         server_info: ServerInfo,
      98         1471 :         start_lsn: Lsn,
      99         1471 :         commit_lsn: Lsn,
     100         1471 :     ) -> anyhow::Result<TimelinePersistentState> {
     101         1471 :         if server_info.wal_seg_size == 0 {
     102            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     103         1471 :         }
     104              : 
     105         1471 :         if server_info.pg_version == UNKNOWN_SERVER_VERSION {
     106            0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     107         1471 :         }
     108              : 
     109         1471 :         if commit_lsn < start_lsn {
     110            0 :             bail!(
     111            0 :                 "commit_lsn {} is smaller than start_lsn {}",
     112              :                 commit_lsn,
     113              :                 start_lsn
     114              :             );
     115         1471 :         }
     116              : 
     117              :         // If we are given with init LSN, initialize term history with it. It
     118              :         // ensures that walproposer always must be able to find a common point
     119              :         // in histories; if it can't something is corrupted. Not having LSN here
     120              :         // is so far left for legacy case where timeline is created by compute
     121              :         // and LSN during creation is not known yet.
     122         1471 :         let term_history = if commit_lsn != Lsn::INVALID {
     123            0 :             TermHistory(vec![TermLsn {
     124            0 :                 term: INITIAL_TERM,
     125            0 :                 lsn: start_lsn,
     126            0 :             }])
     127              :         } else {
     128         1471 :             TermHistory::empty()
     129              :         };
     130              : 
     131         1471 :         Ok(TimelinePersistentState {
     132         1471 :             tenant_id: ttid.tenant_id,
     133         1471 :             timeline_id: ttid.timeline_id,
     134         1471 :             mconf,
     135         1471 :             acceptor_state: AcceptorState {
     136         1471 :                 term: INITIAL_TERM,
     137         1471 :                 term_history,
     138         1471 :             },
     139         1471 :             server: server_info,
     140         1471 :             proposer_uuid: [0; 16],
     141         1471 :             timeline_start_lsn: start_lsn,
     142         1471 :             local_start_lsn: start_lsn,
     143         1471 :             commit_lsn,
     144         1471 :             backup_lsn: start_lsn,
     145         1471 :             peer_horizon_lsn: start_lsn,
     146         1471 :             remote_consistent_lsn: Lsn(0),
     147         1471 :             partial_backup: wal_backup_partial::State::default(),
     148         1471 :             eviction_state: EvictionState::Present,
     149         1471 :             creation_ts: SystemTime::now(),
     150         1471 :         })
     151         1471 :     }
     152              : 
     153           10 :     pub fn empty() -> Self {
     154           10 :         TimelinePersistentState::new(
     155           10 :             &TenantTimelineId::empty(),
     156           10 :             Configuration::empty(),
     157           10 :             ServerInfo {
     158           10 :                 pg_version: PgVersionId::from(PgMajorVersion::PG17),
     159           10 :                 system_id: 0, /* Postgres system identifier */
     160           10 :                 wal_seg_size: WAL_SEGMENT_SIZE as u32,
     161           10 :             },
     162              :             Lsn::INVALID,
     163              :             Lsn::INVALID,
     164              :         )
     165           10 :         .unwrap()
     166           10 :     }
     167              : }
     168              : 
     169              : #[derive(Debug, Clone, Serialize, Deserialize)]
     170              : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
     171              : // are not flushed yet.
     172              : pub struct TimelineMemState {
     173              :     pub commit_lsn: Lsn,
     174              :     pub backup_lsn: Lsn,
     175              :     pub peer_horizon_lsn: Lsn,
     176              :     pub remote_consistent_lsn: Lsn,
     177              :     #[serde(with = "hex")]
     178              :     pub proposer_uuid: PgUuid,
     179              : }
     180              : 
     181              : /// Safekeeper persistent state plus in memory layer.
     182              : ///
     183              : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
     184              : /// which don't need immediate persistence. Provides transactional like API
     185              : /// to atomically update the state.
     186              : ///
     187              : /// Implements Deref into *persistent* part.
     188              : pub struct TimelineState<CTRL: control_file::Storage> {
     189              :     pub inmem: TimelineMemState,
     190              :     pub pers: CTRL, // persistent
     191              : }
     192              : 
     193              : impl<CTRL> TimelineState<CTRL>
     194              : where
     195              :     CTRL: control_file::Storage,
     196              : {
     197         9112 :     pub fn new(state: CTRL) -> Self {
     198         9112 :         TimelineState {
     199         9112 :             inmem: TimelineMemState {
     200         9112 :                 commit_lsn: state.commit_lsn,
     201         9112 :                 backup_lsn: state.backup_lsn,
     202         9112 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     203         9112 :                 remote_consistent_lsn: state.remote_consistent_lsn,
     204         9112 :                 proposer_uuid: state.proposer_uuid,
     205         9112 :             },
     206         9112 :             pers: state,
     207         9112 :         }
     208            9 :     }
     209              : 
     210              :     /// Start atomic change. Returns SafeKeeperPersistentState with in memory
     211              :     /// values applied; the protocol is to 1) change returned struct as desired
     212              :     /// 2) atomically persist it with finish_change.
     213         3276 :     pub fn start_change(&self) -> TimelinePersistentState {
     214         3276 :         let mut s = self.pers.clone();
     215         3276 :         s.commit_lsn = self.inmem.commit_lsn;
     216         3276 :         s.backup_lsn = self.inmem.backup_lsn;
     217         3276 :         s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     218         3276 :         s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
     219         3276 :         s.proposer_uuid = self.inmem.proposer_uuid;
     220         3276 :         s
     221           40 :     }
     222              : 
     223              :     /// Persist given state. c.f. start_change.
     224         3276 :     pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
     225         3276 :         if s.eq(&*self.pers) {
     226           43 :             // nothing to do if state didn't change
     227           43 :         } else {
     228         3233 :             self.pers.persist(s).await?;
     229              :         }
     230              : 
     231              :         // keep in memory values up to date
     232         3275 :         self.inmem.commit_lsn = s.commit_lsn;
     233         3275 :         self.inmem.backup_lsn = s.backup_lsn;
     234         3275 :         self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
     235         3275 :         self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
     236         3275 :         self.inmem.proposer_uuid = s.proposer_uuid;
     237         3275 :         Ok(())
     238           39 :     }
     239              : 
     240              :     /// Flush in memory values.
     241          110 :     pub async fn flush(&mut self) -> Result<()> {
     242          110 :         let s = self.start_change();
     243          110 :         self.finish_change(&s).await
     244           24 :     }
     245              : 
     246              :     /// Make term at least as `to`. If `to` is None, increment current one. This
     247              :     /// is not in safekeeper.rs because we want to be able to do it even if
     248              :     /// timeline is offloaded.
     249            0 :     pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     250            0 :         let before = self.acceptor_state.term;
     251            0 :         let mut state = self.start_change();
     252            0 :         let new = match to {
     253            0 :             Some(to) => max(state.acceptor_state.term, to),
     254            0 :             None => state.acceptor_state.term + 1,
     255              :         };
     256            0 :         if new > state.acceptor_state.term {
     257            0 :             state.acceptor_state.term = new;
     258            0 :             self.finish_change(&state).await?;
     259            0 :         }
     260            0 :         let after = self.acceptor_state.term;
     261            0 :         Ok(TimelineTermBumpResponse {
     262            0 :             previous_term: before,
     263            0 :             current_term: after,
     264            0 :         })
     265            0 :     }
     266              : 
     267              :     /// Switch into membership configuration `to` if it is higher than the
     268              :     /// current one.
     269        20318 :     pub async fn membership_switch(&mut self, to: Configuration) -> Result<MembershipSwitchResult> {
     270        20318 :         let before = self.mconf.clone();
     271              :         // Is switch allowed?
     272        20318 :         if to.generation <= self.mconf.generation {
     273        20318 :             info!(
     274            0 :                 "ignoring request to switch membership conf to {}, current conf {}",
     275            0 :                 to, self.mconf
     276              :             );
     277              :         } else {
     278            0 :             let mut state = self.start_change();
     279            0 :             state.mconf = to.clone();
     280            0 :             self.finish_change(&state).await?;
     281            0 :             info!("switched membership conf to {} from {}", to, before);
     282              :         }
     283        20318 :         Ok(MembershipSwitchResult {
     284        20318 :             previous_conf: before,
     285        20318 :             current_conf: self.mconf.clone(),
     286        20318 :         })
     287            0 :     }
     288              : }
     289              : 
     290              : impl<CTRL> Deref for TimelineState<CTRL>
     291              : where
     292              :     CTRL: control_file::Storage,
     293              : {
     294              :     type Target = TimelinePersistentState;
     295              : 
     296       286451 :     fn deref(&self) -> &Self::Target {
     297       286451 :         &self.pers
     298         7955 :     }
     299              : }
        

Generated by: LCOV version 2.1-beta