LCOV - code coverage report
Current view: top level - safekeeper/src - state.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 100.0 % 85 85
Test Date: 2024-02-12 20:26:03 Functions: 61.1 % 108 66

            Line data    Source code
       1              : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
       2              : //! and its wrapper with in memory layer (SafekeeperState).
       3              : 
       4              : use std::ops::Deref;
       5              : 
       6              : use anyhow::Result;
       7              : use serde::{Deserialize, Serialize};
       8              : use utils::{
       9              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      10              :     lsn::Lsn,
      11              : };
      12              : 
      13              : use crate::{
      14              :     control_file,
      15              :     safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
      16              : };
      17              : 
      18              : /// Persistent information stored on safekeeper node about timeline.
      19              : /// On disk data is prefixed by magic and format version and followed by checksum.
      20        12875 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      21              : pub struct TimelinePersistentState {
      22              :     #[serde(with = "hex")]
      23              :     pub tenant_id: TenantId,
      24              :     #[serde(with = "hex")]
      25              :     pub timeline_id: TimelineId,
      26              :     /// persistent acceptor state
      27              :     pub acceptor_state: AcceptorState,
      28              :     /// information about server
      29              :     pub server: ServerInfo,
      30              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      31              :     /// for correctness, exists for monitoring purposes.
      32              :     #[serde(with = "hex")]
      33              :     pub proposer_uuid: PgUuid,
      34              :     /// Since which LSN this timeline generally starts. Safekeeper might have
      35              :     /// joined later.
      36              :     pub timeline_start_lsn: Lsn,
      37              :     /// Since which LSN safekeeper has (had) WAL for this timeline.
      38              :     /// All WAL segments next to one containing local_start_lsn are
      39              :     /// filled with data from the beginning.
      40              :     pub local_start_lsn: Lsn,
      41              :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
      42              :     /// to record boundary.
      43              :     pub commit_lsn: Lsn,
      44              :     /// LSN that points to the end of the last backed up segment. Useful to
      45              :     /// persist to avoid finding out offloading progress on boot.
      46              :     pub backup_lsn: Lsn,
      47              :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      48              :     /// of last record streamed to everyone). Persisting it helps skipping
      49              :     /// recovery in walproposer, generally we compute it from peers. In
      50              :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
      51              :     /// only by walproposer.
      52              :     pub peer_horizon_lsn: Lsn,
      53              :     /// LSN of the oldest known checkpoint made by pageserver and successfully
      54              :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
      55              :     /// informational purposes, we receive it from pageserver (or broker).
      56              :     pub remote_consistent_lsn: Lsn,
      57              :     // Peers and their state as we remember it. Knowing peers themselves is
      58              :     // fundamental; but state is saved here only for informational purposes and
      59              :     // obviously can be stale. (Currently not saved at all, but let's provision
      60              :     // place to have less file version upgrades).
      61              :     pub peers: PersistedPeers,
      62              : }
      63              : 
      64        12875 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      65              : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
      66              : 
      67              : impl TimelinePersistentState {
      68          535 :     pub fn new(
      69          535 :         ttid: &TenantTimelineId,
      70          535 :         server_info: ServerInfo,
      71          535 :         peers: Vec<NodeId>,
      72          535 :         commit_lsn: Lsn,
      73          535 :         local_start_lsn: Lsn,
      74          535 :     ) -> TimelinePersistentState {
      75          535 :         TimelinePersistentState {
      76          535 :             tenant_id: ttid.tenant_id,
      77          535 :             timeline_id: ttid.timeline_id,
      78          535 :             acceptor_state: AcceptorState {
      79          535 :                 term: 0,
      80          535 :                 term_history: TermHistory::empty(),
      81          535 :             },
      82          535 :             server: server_info,
      83          535 :             proposer_uuid: [0; 16],
      84          535 :             timeline_start_lsn: Lsn(0),
      85          535 :             local_start_lsn,
      86          535 :             commit_lsn,
      87          535 :             backup_lsn: local_start_lsn,
      88          535 :             peer_horizon_lsn: local_start_lsn,
      89          535 :             remote_consistent_lsn: Lsn(0),
      90          535 :             peers: PersistedPeers(
      91          535 :                 peers
      92          535 :                     .iter()
      93          535 :                     .map(|p| (*p, PersistedPeerInfo::new()))
      94          535 :                     .collect(),
      95          535 :             ),
      96          535 :         }
      97          535 :     }
      98              : 
      99              :     #[cfg(test)]
     100            8 :     pub fn empty() -> Self {
     101            8 :         use crate::safekeeper::UNKNOWN_SERVER_VERSION;
     102            8 : 
     103            8 :         TimelinePersistentState::new(
     104            8 :             &TenantTimelineId::empty(),
     105            8 :             ServerInfo {
     106            8 :                 pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
     107            8 :                 system_id: 0,                       /* Postgres system identifier */
     108            8 :                 wal_seg_size: 0,
     109            8 :             },
     110            8 :             vec![],
     111            8 :             Lsn::INVALID,
     112            8 :             Lsn::INVALID,
     113            8 :         )
     114            8 :     }
     115              : }
     116              : 
     117         2870 : #[derive(Debug, Clone, Serialize, Deserialize)]
     118              : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
     119              : // are not flushed yet.
     120              : pub struct TimelineMemState {
     121              :     pub commit_lsn: Lsn,
     122              :     pub backup_lsn: Lsn,
     123              :     pub peer_horizon_lsn: Lsn,
     124              :     pub remote_consistent_lsn: Lsn,
     125              :     #[serde(with = "hex")]
     126              :     pub proposer_uuid: PgUuid,
     127              : }
     128              : 
     129              : /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
     130              : /// when we update fields like commit_lsn which don't need immediate
     131              : /// persistence. Provides transactional like API to atomically update the state.
     132              : ///
     133              : /// Implements Deref into *persistent* part.
     134              : pub struct TimelineState<CTRL: control_file::Storage> {
     135              :     pub inmem: TimelineMemState,
     136              :     pub pers: CTRL, // persistent
     137              : }
     138              : 
     139              : impl<CTRL> TimelineState<CTRL>
     140              : where
     141              :     CTRL: control_file::Storage,
     142              : {
     143          618 :     pub fn new(state: CTRL) -> Self {
     144          618 :         TimelineState {
     145          618 :             inmem: TimelineMemState {
     146          618 :                 commit_lsn: state.commit_lsn,
     147          618 :                 backup_lsn: state.backup_lsn,
     148          618 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     149          618 :                 remote_consistent_lsn: state.remote_consistent_lsn,
     150          618 :                 proposer_uuid: state.proposer_uuid,
     151          618 :             },
     152          618 :             pers: state,
     153          618 :         }
     154          618 :     }
     155              : 
     156              :     /// Start atomic change. Returns SafeKeeperPersistentState with in memory
     157              :     /// values applied; the protocol is to 1) change returned struct as desired
     158              :     /// 2) atomically persist it with finish_change.
     159         4951 :     pub fn start_change(&self) -> TimelinePersistentState {
     160         4951 :         let mut s = self.pers.clone();
     161         4951 :         s.commit_lsn = self.inmem.commit_lsn;
     162         4951 :         s.backup_lsn = self.inmem.backup_lsn;
     163         4951 :         s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     164         4951 :         s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
     165         4951 :         s.proposer_uuid = self.inmem.proposer_uuid;
     166         4951 :         s
     167         4951 :     }
     168              : 
     169              :     /// Persist given state. c.f. start_change.
     170         4951 :     pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
     171        14875 :         self.pers.persist(s).await?;
     172              :         // keep in memory values up to date
     173         4951 :         self.inmem.commit_lsn = s.commit_lsn;
     174         4951 :         self.inmem.backup_lsn = s.backup_lsn;
     175         4951 :         self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
     176         4951 :         self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
     177         4951 :         self.inmem.proposer_uuid = s.proposer_uuid;
     178         4951 :         Ok(())
     179         4951 :     }
     180              : 
     181              :     /// Flush in memory values.
     182         1806 :     pub async fn flush(&mut self) -> Result<()> {
     183         1806 :         let s = self.start_change();
     184         5506 :         self.finish_change(&s).await
     185         1806 :     }
     186              : }
     187              : 
     188              : impl<CTRL> Deref for TimelineState<CTRL>
     189              : where
     190              :     CTRL: control_file::Storage,
     191              : {
     192              :     type Target = TimelinePersistentState;
     193              : 
     194     29917445 :     fn deref(&self) -> &Self::Target {
     195     29917445 :         &self.pers
     196     29917445 :     }
     197              : }
        

Generated by: LCOV version 2.1-beta