LCOV - code coverage report
Current view: top level - safekeeper/src - state.rs (source / functions) Coverage Total Hit
Test: 8b13a09a5c233d98abd4a0d3e59157e7db16d6fd.info Lines: 80.3 % 122 98
Test Date: 2024-11-21 10:53:51 Functions: 39.4 % 94 37

            Line data    Source code
       1              : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
       2              : //! and its wrapper with in memory layer (SafekeeperState).
       3              : 
       4              : use std::{cmp::max, ops::Deref};
       5              : 
       6              : use anyhow::{bail, Result};
       7              : use postgres_ffi::WAL_SEGMENT_SIZE;
       8              : use safekeeper_api::models::TimelineTermBumpResponse;
       9              : use serde::{Deserialize, Serialize};
      10              : use utils::{
      11              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      12              :     lsn::Lsn,
      13              : };
      14              : 
      15              : use crate::{
      16              :     control_file,
      17              :     safekeeper::{
      18              :         AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
      19              :         UNKNOWN_SERVER_VERSION,
      20              :     },
      21              :     timeline::TimelineError,
      22              :     wal_backup_partial::{self},
      23              : };
      24              : 
      25              : /// Persistent information stored on safekeeper node about timeline.
      26              : /// On disk data is prefixed by magic and format version and followed by checksum.
      27            6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      28              : pub struct TimelinePersistentState {
      29              :     #[serde(with = "hex")]
      30              :     pub tenant_id: TenantId,
      31              :     #[serde(with = "hex")]
      32              :     pub timeline_id: TimelineId,
      33              :     /// persistent acceptor state
      34              :     pub acceptor_state: AcceptorState,
      35              :     /// information about server
      36              :     pub server: ServerInfo,
      37              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      38              :     /// for correctness, exists for monitoring purposes.
      39              :     #[serde(with = "hex")]
      40              :     pub proposer_uuid: PgUuid,
      41              :     /// Since which LSN this timeline generally starts. Safekeeper might have
      42              :     /// joined later.
      43              :     pub timeline_start_lsn: Lsn,
      44              :     /// Since which LSN safekeeper has (had) WAL for this timeline.
      45              :     /// All WAL segments next to one containing local_start_lsn are
      46              :     /// filled with data from the beginning.
      47              :     pub local_start_lsn: Lsn,
      48              :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
      49              :     /// to record boundary.
      50              :     pub commit_lsn: Lsn,
      51              :     /// LSN that points to the end of the last backed up segment. Useful to
      52              :     /// persist to avoid finding out offloading progress on boot.
      53              :     pub backup_lsn: Lsn,
      54              :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      55              :     /// of last record streamed to everyone). Persisting it helps skipping
      56              :     /// recovery in walproposer, generally we compute it from peers. In
      57              :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
      58              :     /// only by walproposer.
      59              :     pub peer_horizon_lsn: Lsn,
      60              :     /// LSN of the oldest known checkpoint made by pageserver and successfully
      61              :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
      62              :     /// informational purposes, we receive it from pageserver (or broker).
      63              :     pub remote_consistent_lsn: Lsn,
      64              :     /// Peers and their state as we remember it. Knowing peers themselves is
      65              :     /// fundamental; but state is saved here only for informational purposes and
      66              :     /// obviously can be stale. (Currently not saved at all, but let's provision
      67              :     /// place to have less file version upgrades).
      68              :     pub peers: PersistedPeers,
      69              :     /// Holds names of partial segments uploaded to remote storage. Used to
      70              :     /// clean up old objects without leaving garbage in remote storage.
      71              :     pub partial_backup: wal_backup_partial::State,
      72              :     /// Eviction state of the timeline. If it's Offloaded, we should download
      73              :     /// WAL files from remote storage to serve the timeline.
      74              :     pub eviction_state: EvictionState,
      75              : }
      76              : 
      77            3 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      78              : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
      79              : 
      80              : /// State of the local WAL files. Used to track current timeline state,
      81              : /// that can be either WAL files are present on disk or last partial segment
      82              : /// is offloaded to remote storage.
      83            2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
      84              : pub enum EvictionState {
      85              :     /// WAL files are present on disk.
      86              :     Present,
      87              :     /// Last partial segment is offloaded to remote storage.
      88              :     /// Contains flush_lsn of the last offloaded segment.
      89              :     Offloaded(Lsn),
      90              : }
      91              : 
      92              : impl TimelinePersistentState {
      93         1444 :     pub fn new(
      94         1444 :         ttid: &TenantTimelineId,
      95         1444 :         server_info: ServerInfo,
      96         1444 :         peers: Vec<NodeId>,
      97         1444 :         commit_lsn: Lsn,
      98         1444 :         local_start_lsn: Lsn,
      99         1444 :     ) -> anyhow::Result<TimelinePersistentState> {
     100         1444 :         if server_info.wal_seg_size == 0 {
     101            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     102         1444 :         }
     103         1444 : 
     104         1444 :         if server_info.pg_version == UNKNOWN_SERVER_VERSION {
     105            0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     106         1444 :         }
     107         1444 : 
     108         1444 :         if commit_lsn < local_start_lsn {
     109            0 :             bail!(
     110            0 :                 "commit_lsn {} is smaller than local_start_lsn {}",
     111            0 :                 commit_lsn,
     112            0 :                 local_start_lsn
     113            0 :             );
     114         1444 :         }
     115         1444 : 
     116         1444 :         Ok(TimelinePersistentState {
     117         1444 :             tenant_id: ttid.tenant_id,
     118         1444 :             timeline_id: ttid.timeline_id,
     119         1444 :             acceptor_state: AcceptorState {
     120         1444 :                 term: 0,
     121         1444 :                 term_history: TermHistory::empty(),
     122         1444 :             },
     123         1444 :             server: server_info,
     124         1444 :             proposer_uuid: [0; 16],
     125         1444 :             timeline_start_lsn: Lsn(0),
     126         1444 :             local_start_lsn,
     127         1444 :             commit_lsn,
     128         1444 :             backup_lsn: local_start_lsn,
     129         1444 :             peer_horizon_lsn: local_start_lsn,
     130         1444 :             remote_consistent_lsn: Lsn(0),
     131         1444 :             peers: PersistedPeers(
     132         1444 :                 peers
     133         1444 :                     .iter()
     134         1444 :                     .map(|p| (*p, PersistedPeerInfo::new()))
     135         1444 :                     .collect(),
     136         1444 :             ),
     137         1444 :             partial_backup: wal_backup_partial::State::default(),
     138         1444 :             eviction_state: EvictionState::Present,
     139         1444 :         })
     140         1444 :     }
     141              : 
     142            5 :     pub fn empty() -> Self {
     143            5 :         TimelinePersistentState::new(
     144            5 :             &TenantTimelineId::empty(),
     145            5 :             ServerInfo {
     146            5 :                 pg_version: 170000, /* Postgres server version (major * 10000) */
     147            5 :                 system_id: 0,       /* Postgres system identifier */
     148            5 :                 wal_seg_size: WAL_SEGMENT_SIZE as u32,
     149            5 :             },
     150            5 :             vec![],
     151            5 :             Lsn::INVALID,
     152            5 :             Lsn::INVALID,
     153            5 :         )
     154            5 :         .unwrap()
     155            5 :     }
     156              : }
     157              : 
     158            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     159              : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
     160              : // are not flushed yet.
     161              : pub struct TimelineMemState {
     162              :     pub commit_lsn: Lsn,
     163              :     pub backup_lsn: Lsn,
     164              :     pub peer_horizon_lsn: Lsn,
     165              :     pub remote_consistent_lsn: Lsn,
     166              :     #[serde(with = "hex")]
     167              :     pub proposer_uuid: PgUuid,
     168              : }
     169              : 
     170              : /// Safekeeper persistent state plus in memory layer.
     171              : ///
     172              : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
     173              : /// which don't need immediate persistence. Provides transactional like API
     174              : /// to atomically update the state.
     175              : ///
     176              : /// Implements Deref into *persistent* part.
     177              : pub struct TimelineState<CTRL: control_file::Storage> {
     178              :     pub inmem: TimelineMemState,
     179              :     pub pers: CTRL, // persistent
     180              : }
     181              : 
     182              : impl<CTRL> TimelineState<CTRL>
     183              : where
     184              :     CTRL: control_file::Storage,
     185              : {
     186         8496 :     pub fn new(state: CTRL) -> Self {
     187         8496 :         TimelineState {
     188         8496 :             inmem: TimelineMemState {
     189         8496 :                 commit_lsn: state.commit_lsn,
     190         8496 :                 backup_lsn: state.backup_lsn,
     191         8496 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     192         8496 :                 remote_consistent_lsn: state.remote_consistent_lsn,
     193         8496 :                 proposer_uuid: state.proposer_uuid,
     194         8496 :             },
     195         8496 :             pers: state,
     196         8496 :         }
     197         8496 :     }
     198              : 
     199              :     /// Start atomic change. Returns SafeKeeperPersistentState with in memory
     200              :     /// values applied; the protocol is to 1) change returned struct as desired
     201              :     /// 2) atomically persist it with finish_change.
     202         4308 :     pub fn start_change(&self) -> TimelinePersistentState {
     203         4308 :         let mut s = self.pers.clone();
     204         4308 :         s.commit_lsn = self.inmem.commit_lsn;
     205         4308 :         s.backup_lsn = self.inmem.backup_lsn;
     206         4308 :         s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     207         4308 :         s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
     208         4308 :         s.proposer_uuid = self.inmem.proposer_uuid;
     209         4308 :         s
     210         4308 :     }
     211              : 
     212              :     /// Persist given state. c.f. start_change.
     213         4308 :     pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
     214         4308 :         if s.eq(&*self.pers) {
     215           62 :             // nothing to do if state didn't change
     216           62 :         } else {
     217         4246 :             self.pers.persist(s).await?;
     218              :         }
     219              : 
     220              :         // keep in memory values up to date
     221         4308 :         self.inmem.commit_lsn = s.commit_lsn;
     222         4308 :         self.inmem.backup_lsn = s.backup_lsn;
     223         4308 :         self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
     224         4308 :         self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
     225         4308 :         self.inmem.proposer_uuid = s.proposer_uuid;
     226         4308 :         Ok(())
     227         4308 :     }
     228              : 
     229              :     /// Flush in memory values.
     230          147 :     pub async fn flush(&mut self) -> Result<()> {
     231          147 :         let s = self.start_change();
     232          147 :         self.finish_change(&s).await
     233          147 :     }
     234              : 
     235              :     /// Make term at least as `to`. If `to` is None, increment current one. This
     236              :     /// is not in safekeeper.rs because we want to be able to do it even if
     237              :     /// timeline is offloaded.
     238            0 :     pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     239            0 :         let before = self.acceptor_state.term;
     240            0 :         let mut state = self.start_change();
     241            0 :         let new = match to {
     242            0 :             Some(to) => max(state.acceptor_state.term, to),
     243            0 :             None => state.acceptor_state.term + 1,
     244              :         };
     245            0 :         if new > state.acceptor_state.term {
     246            0 :             state.acceptor_state.term = new;
     247            0 :             self.finish_change(&state).await?;
     248            0 :         }
     249            0 :         let after = self.acceptor_state.term;
     250            0 :         Ok(TimelineTermBumpResponse {
     251            0 :             previous_term: before,
     252            0 :             current_term: after,
     253            0 :         })
     254            0 :     }
     255              : }
     256              : 
     257              : impl<CTRL> Deref for TimelineState<CTRL>
     258              : where
     259              :     CTRL: control_file::Storage,
     260              : {
     261              :     type Target = TimelinePersistentState;
     262              : 
     263       195558 :     fn deref(&self) -> &Self::Target {
     264       195558 :         &self.pers
     265       195558 :     }
     266              : }
        

Generated by: LCOV version 2.1-beta