LCOV - code coverage report
Current view: top level - safekeeper/src - state.rs (source / functions) Coverage Total Hit
Test: 2453312769e0b6b061a2008879e6693300d0b938.info Lines: 98.9 % 89 88
Test Date: 2024-09-06 16:40:18 Functions: 40.2 % 92 37

            Line data    Source code
       1              : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
       2              : //! and its wrapper with in memory layer (SafekeeperState).
       3              : 
       4              : use std::ops::Deref;
       5              : 
       6              : use anyhow::Result;
       7              : use serde::{Deserialize, Serialize};
       8              : use utils::{
       9              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      10              :     lsn::Lsn,
      11              : };
      12              : 
      13              : use crate::{
      14              :     control_file,
      15              :     safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
      16              :     wal_backup_partial::{self},
      17              : };
      18              : 
      19              : /// Persistent information stored on safekeeper node about timeline.
      20              : /// On disk data is prefixed by magic and format version and followed by checksum.
      21            6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      22              : pub struct TimelinePersistentState {
      23              :     #[serde(with = "hex")]
      24              :     pub tenant_id: TenantId,
      25              :     #[serde(with = "hex")]
      26              :     pub timeline_id: TimelineId,
      27              :     /// persistent acceptor state
      28              :     pub acceptor_state: AcceptorState,
      29              :     /// information about server
      30              :     pub server: ServerInfo,
      31              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      32              :     /// for correctness, exists for monitoring purposes.
      33              :     #[serde(with = "hex")]
      34              :     pub proposer_uuid: PgUuid,
      35              :     /// Since which LSN this timeline generally starts. Safekeeper might have
      36              :     /// joined later.
      37              :     pub timeline_start_lsn: Lsn,
      38              :     /// Since which LSN safekeeper has (had) WAL for this timeline.
      39              :     /// All WAL segments next to one containing local_start_lsn are
      40              :     /// filled with data from the beginning.
      41              :     pub local_start_lsn: Lsn,
      42              :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
      43              :     /// to record boundary.
      44              :     pub commit_lsn: Lsn,
      45              :     /// LSN that points to the end of the last backed up segment. Useful to
      46              :     /// persist to avoid finding out offloading progress on boot.
      47              :     pub backup_lsn: Lsn,
      48              :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      49              :     /// of last record streamed to everyone). Persisting it helps skipping
      50              :     /// recovery in walproposer, generally we compute it from peers. In
      51              :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
      52              :     /// only by walproposer.
      53              :     pub peer_horizon_lsn: Lsn,
      54              :     /// LSN of the oldest known checkpoint made by pageserver and successfully
      55              :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
      56              :     /// informational purposes, we receive it from pageserver (or broker).
      57              :     pub remote_consistent_lsn: Lsn,
      58              :     /// Peers and their state as we remember it. Knowing peers themselves is
      59              :     /// fundamental; but state is saved here only for informational purposes and
      60              :     /// obviously can be stale. (Currently not saved at all, but let's provision
      61              :     /// place to have less file version upgrades).
      62              :     pub peers: PersistedPeers,
      63              :     /// Holds names of partial segments uploaded to remote storage. Used to
      64              :     /// clean up old objects without leaving garbage in remote storage.
      65              :     pub partial_backup: wal_backup_partial::State,
      66              :     /// Eviction state of the timeline. If it's Offloaded, we should download
      67              :     /// WAL files from remote storage to serve the timeline.
      68              :     pub eviction_state: EvictionState,
      69              : }
      70              : 
      71            4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      72              : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
      73              : 
      74              : /// State of the local WAL files. Used to track current timeline state,
      75              : /// that can be either WAL files are present on disk or last partial segment
      76              : /// is offloaded to remote storage.
      77            2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
      78              : pub enum EvictionState {
      79              :     /// WAL files are present on disk.
      80              :     Present,
      81              :     /// Last partial segment is offloaded to remote storage.
      82              :     /// Contains flush_lsn of the last offloaded segment.
      83              :     Offloaded(Lsn),
      84              : }
      85              : 
      86              : impl TimelinePersistentState {
      87         5736 :     pub fn new(
      88         5736 :         ttid: &TenantTimelineId,
      89         5736 :         server_info: ServerInfo,
      90         5736 :         peers: Vec<NodeId>,
      91         5736 :         commit_lsn: Lsn,
      92         5736 :         local_start_lsn: Lsn,
      93         5736 :     ) -> TimelinePersistentState {
      94         5736 :         TimelinePersistentState {
      95         5736 :             tenant_id: ttid.tenant_id,
      96         5736 :             timeline_id: ttid.timeline_id,
      97         5736 :             acceptor_state: AcceptorState {
      98         5736 :                 term: 0,
      99         5736 :                 term_history: TermHistory::empty(),
     100         5736 :             },
     101         5736 :             server: server_info,
     102         5736 :             proposer_uuid: [0; 16],
     103         5736 :             timeline_start_lsn: Lsn(0),
     104         5736 :             local_start_lsn,
     105         5736 :             commit_lsn,
     106         5736 :             backup_lsn: local_start_lsn,
     107         5736 :             peer_horizon_lsn: local_start_lsn,
     108         5736 :             remote_consistent_lsn: Lsn(0),
     109         5736 :             peers: PersistedPeers(
     110         5736 :                 peers
     111         5736 :                     .iter()
     112         5736 :                     .map(|p| (*p, PersistedPeerInfo::new()))
     113         5736 :                     .collect(),
     114         5736 :             ),
     115         5736 :             partial_backup: wal_backup_partial::State::default(),
     116         5736 :             eviction_state: EvictionState::Present,
     117         5736 :         }
     118         5736 :     }
     119              : 
     120              :     #[cfg(test)]
     121            5 :     pub fn empty() -> Self {
     122              :         use crate::safekeeper::UNKNOWN_SERVER_VERSION;
     123              : 
     124            5 :         TimelinePersistentState::new(
     125            5 :             &TenantTimelineId::empty(),
     126            5 :             ServerInfo {
     127            5 :                 pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
     128            5 :                 system_id: 0,                       /* Postgres system identifier */
     129            5 :                 wal_seg_size: 0,
     130            5 :             },
     131            5 :             vec![],
     132            5 :             Lsn::INVALID,
     133            5 :             Lsn::INVALID,
     134            5 :         )
     135            5 :     }
     136              : }
     137              : 
     138            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     139              : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
     140              : // are not flushed yet.
     141              : pub struct TimelineMemState {
     142              :     pub commit_lsn: Lsn,
     143              :     pub backup_lsn: Lsn,
     144              :     pub peer_horizon_lsn: Lsn,
     145              :     pub remote_consistent_lsn: Lsn,
     146              :     #[serde(with = "hex")]
     147              :     pub proposer_uuid: PgUuid,
     148              : }
     149              : 
     150              : /// Safekeeper persistent state plus in memory layer.
     151              : ///
     152              : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
     153              : /// which don't need immediate persistence. Provides transactional like API
     154              : /// to atomically update the state.
     155              : ///
     156              : /// Implements Deref into *persistent* part.
     157              : pub struct TimelineState<CTRL: control_file::Storage> {
     158              :     pub inmem: TimelineMemState,
     159              :     pub pers: CTRL, // persistent
     160              : }
     161              : 
     162              : impl<CTRL> TimelineState<CTRL>
     163              : where
     164              :     CTRL: control_file::Storage,
     165              : {
     166        35172 :     pub fn new(state: CTRL) -> Self {
     167        35172 :         TimelineState {
     168        35172 :             inmem: TimelineMemState {
     169        35172 :                 commit_lsn: state.commit_lsn,
     170        35172 :                 backup_lsn: state.backup_lsn,
     171        35172 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     172        35172 :                 remote_consistent_lsn: state.remote_consistent_lsn,
     173        35172 :                 proposer_uuid: state.proposer_uuid,
     174        35172 :             },
     175        35172 :             pers: state,
     176        35172 :         }
     177        35172 :     }
     178              : 
     179              :     /// Start atomic change. Returns SafeKeeperPersistentState with in memory
     180              :     /// values applied; the protocol is to 1) change returned struct as desired
     181              :     /// 2) atomically persist it with finish_change.
     182        15466 :     pub fn start_change(&self) -> TimelinePersistentState {
     183        15466 :         let mut s = self.pers.clone();
     184        15466 :         s.commit_lsn = self.inmem.commit_lsn;
     185        15466 :         s.backup_lsn = self.inmem.backup_lsn;
     186        15466 :         s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     187        15466 :         s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
     188        15466 :         s.proposer_uuid = self.inmem.proposer_uuid;
     189        15466 :         s
     190        15466 :     }
     191              : 
     192              :     /// Persist given state. c.f. start_change.
     193        15466 :     pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
     194        15466 :         if s.eq(&*self.pers) {
     195          165 :             // nothing to do if state didn't change
     196          165 :         } else {
     197        15301 :             self.pers.persist(s).await?;
     198              :         }
     199              : 
     200              :         // keep in memory values up to date
     201        15466 :         self.inmem.commit_lsn = s.commit_lsn;
     202        15466 :         self.inmem.backup_lsn = s.backup_lsn;
     203        15466 :         self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
     204        15466 :         self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
     205        15466 :         self.inmem.proposer_uuid = s.proposer_uuid;
     206        15466 :         Ok(())
     207        15466 :     }
     208              : 
     209              :     /// Flush in memory values.
     210          489 :     pub async fn flush(&mut self) -> Result<()> {
     211          489 :         let s = self.start_change();
     212          489 :         self.finish_change(&s).await
     213          489 :     }
     214              : }
     215              : 
     216              : impl<CTRL> Deref for TimelineState<CTRL>
     217              : where
     218              :     CTRL: control_file::Storage,
     219              : {
     220              :     type Target = TimelinePersistentState;
     221              : 
     222       752244 :     fn deref(&self) -> &Self::Target {
     223       752244 :         &self.pers
     224       752244 :     }
     225              : }
        

Generated by: LCOV version 2.1-beta