LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 87.1 % 699 609
Test Date: 2023-09-06 10:18:01 Functions: 49.8 % 299 149

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use anyhow::{bail, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt};
       5              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6              : 
       7              : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
       8              : use serde::{Deserialize, Serialize};
       9              : use std::cmp::max;
      10              : use std::cmp::min;
      11              : use std::fmt;
      12              : use std::io::Read;
      13              : use std::time::Duration;
      14              : use storage_broker::proto::SafekeeperTimelineInfo;
      15              : 
      16              : use tracing::*;
      17              : 
      18              : use crate::control_file;
      19              : use crate::send_wal::HotStandbyFeedback;
      20              : 
      21              : use crate::wal_storage;
      22              : use pq_proto::SystemId;
      23              : use utils::pageserver_feedback::PageserverFeedback;
      24              : use utils::{
      25              :     bin_ser::LeSer,
      26              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      27              :     lsn::Lsn,
      28              : };
      29              : 
      30              : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      31              : pub const SK_FORMAT_VERSION: u32 = 7;
      32              : const SK_PROTOCOL_VERSION: u32 = 2;
      33              : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      34              : 
      35              : /// Consensus logical timestamp.
      36              : pub type Term = u64;
      37              : pub const INVALID_TERM: Term = 0;
      38              : 
      39        13384 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      40              : pub struct TermLsn {
      41              :     pub term: Term,
      42              :     pub lsn: Lsn,
      43              : }
      44              : 
      45              : // Creation from tuple provides less typing (e.g. for unit tests).
      46              : impl From<(Term, Lsn)> for TermLsn {
      47      4849236 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      48      4849236 :         TermLsn {
      49      4849236 :             term: pair.0,
      50      4849236 :             lsn: pair.1,
      51      4849236 :         }
      52      4849236 :     }
      53              : }
      54              : 
      55        14759 : #[derive(Clone, Serialize, Deserialize)]
      56              : pub struct TermHistory(pub Vec<TermLsn>);
      57              : 
      58              : impl TermHistory {
      59          527 :     pub fn empty() -> TermHistory {
      60          527 :         TermHistory(Vec::new())
      61          527 :     }
      62              : 
      63              :     // Parse TermHistory as n_entries followed by TermLsn pairs
      64          968 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      65          968 :         if bytes.remaining() < 4 {
      66            0 :             bail!("TermHistory misses len");
      67          968 :         }
      68          968 :         let n_entries = bytes.get_u32_le();
      69          968 :         let mut res = Vec::with_capacity(n_entries as usize);
      70          968 :         for _ in 0..n_entries {
      71         5124 :             if bytes.remaining() < 16 {
      72            0 :                 bail!("TermHistory is incomplete");
      73         5124 :             }
      74         5124 :             res.push(TermLsn {
      75         5124 :                 term: bytes.get_u64_le(),
      76         5124 :                 lsn: bytes.get_u64_le().into(),
      77         5124 :             })
      78              :         }
      79          968 :         Ok(TermHistory(res))
      80          968 :     }
      81              : 
      82              :     /// Return copy of self with switches happening strictly after up_to
      83              :     /// truncated.
      84        18494 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      85        18494 :         let mut res = Vec::with_capacity(self.0.len());
      86        63204 :         for e in &self.0 {
      87        44712 :             if e.lsn > up_to {
      88            2 :                 break;
      89        44710 :             }
      90        44710 :             res.push(*e);
      91              :         }
      92        18494 :         TermHistory(res)
      93        18494 :     }
      94              : }
      95              : 
      96              : /// Display only latest entries for Debug.
      97              : impl fmt::Debug for TermHistory {
      98         3074 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
      99         3074 :         let n_printed = 20;
     100         3074 :         write!(
     101         3074 :             fmt,
     102         3074 :             "{}{:?}",
     103         3074 :             if self.0.len() > n_printed { "... " } else { "" },
     104         3074 :             self.0
     105         3074 :                 .iter()
     106         3074 :                 .rev()
     107         3074 :                 .take(n_printed)
     108        10603 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     109         3074 :                 .collect::<Vec<_>>()
     110         3074 :         )
     111         3074 :     }
     112              : }
     113              : 
     114              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     115              : pub type PgUuid = [u8; 16];
     116              : 
     117              : /// Persistent consensus state of the acceptor.
     118        13787 : #[derive(Debug, Clone, Serialize, Deserialize)]
     119              : pub struct AcceptorState {
     120              :     /// acceptor's last term it voted for (advanced in 1 phase)
     121              :     pub term: Term,
     122              :     /// History of term switches for safekeeper's WAL.
     123              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     124              :     /// from the proposer before recovery.
     125              :     pub term_history: TermHistory,
     126              : }
     127              : 
     128              : impl AcceptorState {
     129              :     /// acceptor's epoch is the term of the highest entry in the log
     130        16386 :     pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
     131        16386 :         let th = self.term_history.up_to(flush_lsn);
     132        16386 :         match th.0.last() {
     133        15807 :             Some(e) => e.term,
     134          579 :             None => 0,
     135              :         }
     136        16386 :     }
     137              : }
     138              : 
     139              : /// Information about Postgres. Safekeeper gets it once and then verifies
     140              : /// all further connections from computes match.
     141        13787 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     142              : pub struct ServerInfo {
     143              :     /// Postgres server version
     144              :     pub pg_version: u32,
     145              :     pub system_id: SystemId,
     146              :     pub wal_seg_size: u32,
     147              : }
     148              : 
     149            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     150              : pub struct PersistedPeerInfo {
     151              :     /// LSN up to which safekeeper offloaded WAL to s3.
     152              :     backup_lsn: Lsn,
     153              :     /// Term of the last entry.
     154              :     term: Term,
     155              :     /// LSN of the last record.
     156              :     flush_lsn: Lsn,
     157              :     /// Up to which LSN safekeeper regards its WAL as committed.
     158              :     commit_lsn: Lsn,
     159              : }
     160              : 
     161              : impl PersistedPeerInfo {
     162            0 :     fn new() -> Self {
     163            0 :         Self {
     164            0 :             backup_lsn: Lsn::INVALID,
     165            0 :             term: INVALID_TERM,
     166            0 :             flush_lsn: Lsn(0),
     167            0 :             commit_lsn: Lsn(0),
     168            0 :         }
     169            0 :     }
     170              : }
     171              : 
     172        13787 : #[derive(Debug, Clone, Serialize, Deserialize)]
     173              : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
     174              : 
     175              : /// Persistent information stored on safekeeper node
     176              : /// On disk data is prefixed by magic and format version and followed by checksum.
     177        13787 : #[derive(Debug, Clone, Serialize, Deserialize)]
     178              : pub struct SafeKeeperState {
     179              :     #[serde(with = "hex")]
     180              :     pub tenant_id: TenantId,
     181              :     #[serde(with = "hex")]
     182              :     pub timeline_id: TimelineId,
     183              :     /// persistent acceptor state
     184              :     pub acceptor_state: AcceptorState,
     185              :     /// information about server
     186              :     pub server: ServerInfo,
     187              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
     188              :     /// for correctness, exists for monitoring purposes.
     189              :     #[serde(with = "hex")]
     190              :     pub proposer_uuid: PgUuid,
     191              :     /// Since which LSN this timeline generally starts. Safekeeper might have
     192              :     /// joined later.
     193              :     pub timeline_start_lsn: Lsn,
     194              :     /// Since which LSN safekeeper has (had) WAL for this timeline.
     195              :     /// All WAL segments next to one containing local_start_lsn are
     196              :     /// filled with data from the beginning.
     197              :     pub local_start_lsn: Lsn,
     198              :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
     199              :     /// to record boundary.
     200              :     pub commit_lsn: Lsn,
     201              :     /// LSN that points to the end of the last backed up segment. Useful to
     202              :     /// persist to avoid finding out offloading progress on boot.
     203              :     pub backup_lsn: Lsn,
     204              :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
     205              :     /// of last record streamed to everyone). Persisting it helps skipping
     206              :     /// recovery in walproposer, generally we compute it from peers. In
     207              :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
     208              :     /// only by walproposer.
     209              :     pub peer_horizon_lsn: Lsn,
     210              :     /// LSN of the oldest known checkpoint made by pageserver and successfully
     211              :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
     212              :     /// informational purposes, we receive it from pageserver (or broker).
     213              :     pub remote_consistent_lsn: Lsn,
     214              :     // Peers and their state as we remember it. Knowing peers themselves is
     215              :     // fundamental; but state is saved here only for informational purposes and
     216              :     // obviously can be stale. (Currently not saved at all, but let's provision
     217              :     // place to have less file version upgrades).
     218              :     pub peers: PersistedPeers,
     219              : }
     220              : 
     221         2907 : #[derive(Debug, Clone, Serialize, Deserialize)]
     222              : // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
     223              : // are not flushed yet.
     224              : pub struct SafekeeperMemState {
     225              :     pub commit_lsn: Lsn,
     226              :     pub backup_lsn: Lsn,
     227              :     pub peer_horizon_lsn: Lsn,
     228              :     #[serde(with = "hex")]
     229              :     pub proposer_uuid: PgUuid,
     230              : }
     231              : 
     232              : impl SafeKeeperState {
     233          527 :     pub fn new(
     234          527 :         ttid: &TenantTimelineId,
     235          527 :         server_info: ServerInfo,
     236          527 :         peers: Vec<NodeId>,
     237          527 :         commit_lsn: Lsn,
     238          527 :         local_start_lsn: Lsn,
     239          527 :     ) -> SafeKeeperState {
     240          527 :         SafeKeeperState {
     241          527 :             tenant_id: ttid.tenant_id,
     242          527 :             timeline_id: ttid.timeline_id,
     243          527 :             acceptor_state: AcceptorState {
     244          527 :                 term: 0,
     245          527 :                 term_history: TermHistory::empty(),
     246          527 :             },
     247          527 :             server: server_info,
     248          527 :             proposer_uuid: [0; 16],
     249          527 :             timeline_start_lsn: Lsn(0),
     250          527 :             local_start_lsn,
     251          527 :             commit_lsn,
     252          527 :             backup_lsn: local_start_lsn,
     253          527 :             peer_horizon_lsn: local_start_lsn,
     254          527 :             remote_consistent_lsn: Lsn(0),
     255          527 :             peers: PersistedPeers(
     256          527 :                 peers
     257          527 :                     .iter()
     258          527 :                     .map(|p| (*p, PersistedPeerInfo::new()))
     259          527 :                     .collect(),
     260          527 :             ),
     261          527 :         }
     262          527 :     }
     263              : 
     264              :     #[cfg(test)]
     265            4 :     pub fn empty() -> Self {
     266            4 :         SafeKeeperState::new(
     267            4 :             &TenantTimelineId::empty(),
     268            4 :             ServerInfo {
     269            4 :                 pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
     270            4 :                 system_id: 0,                       /* Postgres system identifier */
     271            4 :                 wal_seg_size: 0,
     272            4 :             },
     273            4 :             vec![],
     274            4 :             Lsn::INVALID,
     275            4 :             Lsn::INVALID,
     276            4 :         )
     277            4 :     }
     278              : }
     279              : 
     280              : // protocol messages
     281              : 
     282              : /// Initial Proposer -> Acceptor message
     283         2105 : #[derive(Debug, Deserialize)]
     284              : pub struct ProposerGreeting {
     285              :     /// proposer-acceptor protocol version
     286              :     pub protocol_version: u32,
     287              :     /// Postgres server version
     288              :     pub pg_version: u32,
     289              :     pub proposer_id: PgUuid,
     290              :     pub system_id: SystemId,
     291              :     pub timeline_id: TimelineId,
     292              :     pub tenant_id: TenantId,
     293              :     pub tli: TimeLineID,
     294              :     pub wal_seg_size: u32,
     295              : }
     296              : 
     297              : /// Acceptor -> Proposer initial response: the highest term known to me
     298              : /// (acceptor voted for).
     299            0 : #[derive(Debug, Serialize)]
     300              : pub struct AcceptorGreeting {
     301              :     term: u64,
     302              :     node_id: NodeId,
     303              : }
     304              : 
     305              : /// Vote request sent from proposer to safekeepers
     306         2103 : #[derive(Debug, Deserialize)]
     307              : pub struct VoteRequest {
     308              :     term: Term,
     309              : }
     310              : 
     311              : /// Vote itself, sent from safekeeper to proposer
     312         2103 : #[derive(Debug, Serialize)]
     313              : pub struct VoteResponse {
     314              :     term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     315              :     vote_given: u64, // fixme u64 due to padding
     316              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     317              :     // proposer to choose the most advanced one.
     318              :     flush_lsn: Lsn,
     319              :     truncate_lsn: Lsn,
     320              :     term_history: TermHistory,
     321              :     timeline_start_lsn: Lsn,
     322              : }
     323              : 
     324              : /*
     325              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     326              :  * term history to it.
     327              :  */
     328          971 : #[derive(Debug)]
     329              : pub struct ProposerElected {
     330              :     pub term: Term,
     331              :     pub start_streaming_at: Lsn,
     332              :     pub term_history: TermHistory,
     333              :     pub timeline_start_lsn: Lsn,
     334              : }
     335              : 
     336              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     337              : /// communicates commit_lsn.
     338            0 : #[derive(Debug)]
     339              : pub struct AppendRequest {
     340              :     pub h: AppendRequestHeader,
     341              :     pub wal_data: Bytes,
     342              : }
     343      2643242 : #[derive(Debug, Clone, Deserialize)]
     344              : pub struct AppendRequestHeader {
     345              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     346              :     pub term: Term,
     347              :     // LSN since the proposer appends WAL; determines epoch switch point.
     348              :     pub epoch_start_lsn: Lsn,
     349              :     /// start position of message in WAL
     350              :     pub begin_lsn: Lsn,
     351              :     /// end position of message in WAL
     352              :     pub end_lsn: Lsn,
     353              :     /// LSN committed by quorum of safekeepers
     354              :     pub commit_lsn: Lsn,
     355              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     356              :     pub truncate_lsn: Lsn,
     357              :     // only for logging/debugging
     358              :     pub proposer_uuid: PgUuid,
     359              : }
     360              : 
     361              : /// Report safekeeper state to proposer
     362            3 : #[derive(Debug, Serialize)]
     363              : pub struct AppendResponse {
     364              :     // Current term of the safekeeper; if it is higher than proposer's, the
     365              :     // compute is out of date.
     366              :     pub term: Term,
     367              :     // NOTE: this is physical end of wal on safekeeper; currently it doesn't
     368              :     // make much sense without taking epoch into account, as history can be
     369              :     // diverged.
     370              :     pub flush_lsn: Lsn,
     371              :     // We report back our awareness about which WAL is committed, as this is
     372              :     // a criterion for walproposer --sync mode exit
     373              :     pub commit_lsn: Lsn,
     374              :     pub hs_feedback: HotStandbyFeedback,
     375              :     pub pageserver_feedback: PageserverFeedback,
     376              : }
     377              : 
     378              : impl AppendResponse {
     379           72 :     fn term_only(term: Term) -> AppendResponse {
     380           72 :         AppendResponse {
     381           72 :             term,
     382           72 :             flush_lsn: Lsn(0),
     383           72 :             commit_lsn: Lsn(0),
     384           72 :             hs_feedback: HotStandbyFeedback::empty(),
     385           72 :             pageserver_feedback: PageserverFeedback::empty(),
     386           72 :         }
     387           72 :     }
     388              : }
     389              : 
     390              : /// Proposer -> Acceptor messages
     391            0 : #[derive(Debug)]
     392              : pub enum ProposerAcceptorMessage {
     393              :     Greeting(ProposerGreeting),
     394              :     VoteRequest(VoteRequest),
     395              :     Elected(ProposerElected),
     396              :     AppendRequest(AppendRequest),
     397              :     NoFlushAppendRequest(AppendRequest),
     398              :     FlushWAL,
     399              : }
     400              : 
     401              : impl ProposerAcceptorMessage {
     402              :     /// Parse proposer message.
     403      2648418 :     pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
     404      2648418 :         // xxx using Reader is inefficient but easy to work with bincode
     405      2648418 :         let mut stream = msg_bytes.reader();
     406              :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     407      2648418 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     408      2648418 :         match tag {
     409              :             'g' => {
     410         2105 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     411         2105 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     412              :             }
     413              :             'v' => {
     414         2103 :                 let msg = VoteRequest::des_from(&mut stream)?;
     415         2103 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     416              :             }
     417              :             'e' => {
     418          968 :                 let mut msg_bytes = stream.into_inner();
     419          968 :                 if msg_bytes.remaining() < 16 {
     420            0 :                     bail!("ProposerElected message is not complete");
     421          968 :                 }
     422          968 :                 let term = msg_bytes.get_u64_le();
     423          968 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     424          968 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     425          968 :                 if msg_bytes.remaining() < 8 {
     426            0 :                     bail!("ProposerElected message is not complete");
     427          968 :                 }
     428          968 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     429          968 :                 let msg = ProposerElected {
     430          968 :                     term,
     431          968 :                     start_streaming_at,
     432          968 :                     timeline_start_lsn,
     433          968 :                     term_history,
     434          968 :                 };
     435          968 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     436              :             }
     437              :             'a' => {
     438              :                 // read header followed by wal data
     439      2643242 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     440      2643242 :                 let rec_size = hdr
     441      2643242 :                     .end_lsn
     442      2643242 :                     .checked_sub(hdr.begin_lsn)
     443      2643242 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     444              :                     .0 as usize;
     445      2643242 :                 if rec_size > MAX_SEND_SIZE {
     446            0 :                     bail!(
     447            0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     448            0 :                         MAX_SEND_SIZE
     449            0 :                     );
     450      2643242 :                 }
     451      2643242 : 
     452      2643242 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     453      2643242 :                 stream.read_exact(&mut wal_data_vec)?;
     454      2643242 :                 let wal_data = Bytes::from(wal_data_vec);
     455      2643242 :                 let msg = AppendRequest { h: hdr, wal_data };
     456      2643242 : 
     457      2643242 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     458              :             }
     459            0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag,),
     460              :         }
     461      2648418 :     }
     462              : }
     463              : 
     464              : /// Acceptor -> Proposer messages
     465            0 : #[derive(Debug)]
     466              : pub enum AcceptorProposerMessage {
     467              :     Greeting(AcceptorGreeting),
     468              :     VoteResponse(VoteResponse),
     469              :     AppendResponse(AppendResponse),
     470              : }
     471              : 
     472              : impl AcceptorProposerMessage {
     473              :     /// Serialize acceptor -> proposer message.
     474      2204482 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     475      2204482 :         match self {
     476         2103 :             AcceptorProposerMessage::Greeting(msg) => {
     477         2103 :                 buf.put_u64_le('g' as u64);
     478         2103 :                 buf.put_u64_le(msg.term);
     479         2103 :                 buf.put_u64_le(msg.node_id.0);
     480         2103 :             }
     481         2099 :             AcceptorProposerMessage::VoteResponse(msg) => {
     482         2099 :                 buf.put_u64_le('v' as u64);
     483         2099 :                 buf.put_u64_le(msg.term);
     484         2099 :                 buf.put_u64_le(msg.vote_given);
     485         2099 :                 buf.put_u64_le(msg.flush_lsn.into());
     486         2099 :                 buf.put_u64_le(msg.truncate_lsn.into());
     487         2099 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     488         8886 :                 for e in &msg.term_history.0 {
     489         6787 :                     buf.put_u64_le(e.term);
     490         6787 :                     buf.put_u64_le(e.lsn.into());
     491         6787 :                 }
     492         2099 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     493              :             }
     494      2200280 :             AcceptorProposerMessage::AppendResponse(msg) => {
     495      2200280 :                 buf.put_u64_le('a' as u64);
     496      2200280 :                 buf.put_u64_le(msg.term);
     497      2200280 :                 buf.put_u64_le(msg.flush_lsn.into());
     498      2200280 :                 buf.put_u64_le(msg.commit_lsn.into());
     499      2200280 :                 buf.put_i64_le(msg.hs_feedback.ts);
     500      2200280 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     501      2200280 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     502      2200280 : 
     503      2200280 :                 msg.pageserver_feedback.serialize(buf);
     504      2200280 :             }
     505              :         }
     506              : 
     507      2204482 :         Ok(())
     508      2204482 :     }
     509              : }
     510              : 
     511              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     512              : /// It controls all WAL disk writes and updates of control file.
     513              : ///
     514              : /// Currently safekeeper processes:
     515              : /// - messages from compute (proposers) and provides replies
     516              : /// - messages from broker peers
     517              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     518              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     519              :     /// determines epoch switch point.
     520              :     pub epoch_start_lsn: Lsn,
     521              : 
     522              :     pub inmem: SafekeeperMemState, // in memory part
     523              :     pub state: CTRL,               // persistent state storage
     524              : 
     525              :     pub wal_store: WAL,
     526              : 
     527              :     node_id: NodeId, // safekeeper's node id
     528              : }
     529              : 
     530              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     531              : where
     532              :     CTRL: control_file::Storage,
     533              :     WAL: wal_storage::Storage,
     534              : {
     535              :     /// Accepts a control file storage containing the safekeeper state.
     536              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     537              :     /// and `server` (`wal_seg_size` inside it) fields.
     538          607 :     pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
     539          607 :         if state.tenant_id == TenantId::from([0u8; 16])
     540          607 :             || state.timeline_id == TimelineId::from([0u8; 16])
     541              :         {
     542            0 :             bail!(
     543            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     544            0 :                 state.tenant_id,
     545            0 :                 state.timeline_id
     546            0 :             );
     547          607 :         }
     548          607 : 
     549          607 :         Ok(SafeKeeper {
     550          607 :             epoch_start_lsn: Lsn(0),
     551          607 :             inmem: SafekeeperMemState {
     552          607 :                 commit_lsn: state.commit_lsn,
     553          607 :                 backup_lsn: state.backup_lsn,
     554          607 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     555          607 :                 proposer_uuid: state.proposer_uuid,
     556          607 :             },
     557          607 :             state,
     558          607 :             wal_store,
     559          607 :             node_id,
     560          607 :         })
     561          607 :     }
     562              : 
     563              :     /// Get history of term switches for the available WAL
     564         2105 :     fn get_term_history(&self) -> TermHistory {
     565         2105 :         self.state
     566         2105 :             .acceptor_state
     567         2105 :             .term_history
     568         2105 :             .up_to(self.flush_lsn())
     569         2105 :     }
     570              : 
     571              :     /// Get current term.
     572      4848713 :     pub fn get_term(&self) -> Term {
     573      4848713 :         self.state.acceptor_state.term
     574      4848713 :     }
     575              : 
     576        16187 :     pub fn get_epoch(&self) -> Term {
     577        16187 :         self.state.acceptor_state.get_epoch(self.flush_lsn())
     578        16187 :     }
     579              : 
     580              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     581      9725803 :     pub fn flush_lsn(&self) -> Lsn {
     582      9725803 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     583      9725803 :     }
     584              : 
     585              :     /// Process message from proposer and possibly form reply. Concurrent
     586              :     /// callers must exclude each other.
     587      4848637 :     pub async fn process_msg(
     588      4848637 :         &mut self,
     589      4848637 :         msg: &ProposerAcceptorMessage,
     590      4848637 :     ) -> Result<Option<AcceptorProposerMessage>> {
     591      4848637 :         match msg {
     592         2105 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     593         6142 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     594      1736690 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     595            5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     596           21 :                 self.handle_append_request(msg, true).await
     597              :             }
     598      2643242 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     599      3045045 :                 self.handle_append_request(msg, false).await
     600              :             }
     601      2200208 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     602              :         }
     603      4848637 :     }
     604              : 
     605              :     /// Handle initial message from proposer: check its sanity and send my
     606              :     /// current term.
     607         2105 :     async fn handle_greeting(
     608         2105 :         &mut self,
     609         2105 :         msg: &ProposerGreeting,
     610         2105 :     ) -> Result<Option<AcceptorProposerMessage>> {
     611         2105 :         // Check protocol compatibility
     612         2105 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     613            0 :             bail!(
     614            0 :                 "incompatible protocol version {}, expected {}",
     615            0 :                 msg.protocol_version,
     616            0 :                 SK_PROTOCOL_VERSION
     617            0 :             );
     618         2105 :         }
     619         2105 :         /* Postgres major version mismatch is treated as fatal error
     620         2105 :          * because safekeepers parse WAL headers and the format
     621         2105 :          * may change between versions.
     622         2105 :          */
     623         2105 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     624            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     625              :         {
     626            0 :             bail!(
     627            0 :                 "incompatible server version {}, expected {}",
     628            0 :                 msg.pg_version,
     629            0 :                 self.state.server.pg_version
     630            0 :             );
     631         2105 :         }
     632         2105 : 
     633         2105 :         if msg.tenant_id != self.state.tenant_id {
     634            0 :             bail!(
     635            0 :                 "invalid tenant ID, got {}, expected {}",
     636            0 :                 msg.tenant_id,
     637            0 :                 self.state.tenant_id
     638            0 :             );
     639         2105 :         }
     640         2105 :         if msg.timeline_id != self.state.timeline_id {
     641            0 :             bail!(
     642            0 :                 "invalid timeline ID, got {}, expected {}",
     643            0 :                 msg.timeline_id,
     644            0 :                 self.state.timeline_id
     645            0 :             );
     646         2105 :         }
     647         2105 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     648            0 :             bail!(
     649            0 :                 "invalid wal_seg_size, got {}, expected {}",
     650            0 :                 msg.wal_seg_size,
     651            0 :                 self.state.server.wal_seg_size
     652            0 :             );
     653         2105 :         }
     654         2105 : 
     655         2105 :         // system_id will be updated on mismatch
     656         2105 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     657         2105 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     658          518 :             if self.state.server.system_id != 0 {
     659            0 :                 warn!(
     660            0 :                     "unexpected system ID arrived, got {}, expected {}",
     661            0 :                     msg.system_id, self.state.server.system_id
     662            0 :                 );
     663          518 :             }
     664              : 
     665          518 :             let mut state = self.state.clone();
     666          518 :             state.server.system_id = msg.system_id;
     667          518 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     668          518 :                 state.server.pg_version = msg.pg_version;
     669          518 :             }
     670         1554 :             self.state.persist(&state).await?;
     671         1587 :         }
     672              : 
     673         2105 :         info!(
     674         2105 :             "processed greeting from walproposer {}, sending term {:?}",
     675        33680 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     676         2105 :             self.state.acceptor_state.term
     677         2105 :         );
     678         2105 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     679         2105 :             term: self.state.acceptor_state.term,
     680         2105 :             node_id: self.node_id,
     681         2105 :         })))
     682         2105 :     }
     683              : 
     684              :     /// Give vote for the given term, if we haven't done that previously.
     685         2105 :     async fn handle_vote_request(
     686         2105 :         &mut self,
     687         2105 :         msg: &VoteRequest,
     688         2105 :     ) -> Result<Option<AcceptorProposerMessage>> {
     689              :         // Once voted, we won't accept data from older proposers; flush
     690              :         // everything we've already received so that new proposer starts
     691              :         // streaming at end of our WAL, without overlap. Currently we truncate
     692              :         // WAL at streaming point, so this avoids truncating already committed
     693              :         // WAL.
     694              :         //
     695              :         // TODO: it would be smoother to not truncate committed piece at
     696              :         // handle_elected instead. Currently not a big deal, as proposer is the
     697              :         // only source of WAL; with peer2peer recovery it would be more
     698              :         // important.
     699         2105 :         self.wal_store.flush_wal().await?;
     700              :         // initialize with refusal
     701         2105 :         let mut resp = VoteResponse {
     702         2105 :             term: self.state.acceptor_state.term,
     703         2105 :             vote_given: false as u64,
     704         2105 :             flush_lsn: self.flush_lsn(),
     705         2105 :             truncate_lsn: self.inmem.peer_horizon_lsn,
     706         2105 :             term_history: self.get_term_history(),
     707         2105 :             timeline_start_lsn: self.state.timeline_start_lsn,
     708         2105 :         };
     709         2105 :         if self.state.acceptor_state.term < msg.term {
     710         2049 :             let mut state = self.state.clone();
     711         2049 :             state.acceptor_state.term = msg.term;
     712         2049 :             // persist vote before sending it out
     713         6142 :             self.state.persist(&state).await?;
     714              : 
     715         2049 :             resp.term = self.state.acceptor_state.term;
     716         2049 :             resp.vote_given = true as u64;
     717           56 :         }
     718         2103 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     719         2105 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     720         2105 :     }
     721              : 
     722              :     /// Form AppendResponse from current state.
     723      2200213 :     fn append_response(&self) -> AppendResponse {
     724      2200213 :         let ar = AppendResponse {
     725      2200213 :             term: self.state.acceptor_state.term,
     726      2200213 :             flush_lsn: self.flush_lsn(),
     727      2200213 :             commit_lsn: self.state.commit_lsn,
     728      2200213 :             // will be filled by the upper code to avoid bothering safekeeper
     729      2200213 :             hs_feedback: HotStandbyFeedback::empty(),
     730      2200213 :             pageserver_feedback: PageserverFeedback::empty(),
     731      2200213 :         };
     732      2200213 :         trace!("formed AppendResponse {:?}", ar);
     733      2200213 :         ar
     734      2200213 :     }
     735              : 
     736          972 :     async fn handle_elected(
     737          972 :         &mut self,
     738          972 :         msg: &ProposerElected,
     739          972 :     ) -> Result<Option<AcceptorProposerMessage>> {
     740          971 :         info!("received ProposerElected {:?}", msg);
     741          972 :         if self.state.acceptor_state.term < msg.term {
     742            4 :             let mut state = self.state.clone();
     743            4 :             state.acceptor_state.term = msg.term;
     744            9 :             self.state.persist(&state).await?;
     745          968 :         }
     746              : 
     747              :         // If our term is higher, ignore the message (next feedback will inform the compute)
     748          972 :         if self.state.acceptor_state.term > msg.term {
     749            0 :             return Ok(None);
     750          972 :         }
     751          972 : 
     752          972 :         // This might happen in a rare race when another (old) connection from
     753          972 :         // the same walproposer writes + flushes WAL after this connection
     754          972 :         // already sent flush_lsn in VoteRequest. It is generally safe to
     755          972 :         // proceed, but to prevent commit_lsn surprisingly going down we should
     756          972 :         // either refuse the session (simpler) or skip the part we already have
     757          972 :         // from the stream (can be implemented).
     758          972 :         if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
     759            0 :             bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
     760            0 :                    msg.term, self.flush_lsn(), msg.start_streaming_at)
     761          972 :         }
     762          972 :         // Otherwise this shouldn't happen.
     763          972 :         assert!(
     764          972 :             msg.start_streaming_at >= self.inmem.commit_lsn,
     765            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
     766              :             msg.start_streaming_at,
     767              :             self.inmem.commit_lsn
     768              :         );
     769              : 
     770              :         // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
     771              :         // intersection of our history and history from msg
     772              : 
     773              :         // truncate wal, update the LSNs
     774      1733812 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     775              : 
     776              :         // and now adopt term history from proposer
     777              :         {
     778          972 :             let mut state = self.state.clone();
     779          972 : 
     780          972 :             // Here we learn initial LSN for the first time, set fields
     781          972 :             // interested in that.
     782          972 : 
     783          972 :             if state.timeline_start_lsn == Lsn(0) {
     784              :                 // Remember point where WAL begins globally.
     785          523 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     786          522 :                 info!(
     787          522 :                     "setting timeline_start_lsn to {:?}",
     788          522 :                     state.timeline_start_lsn
     789          522 :                 );
     790          449 :             }
     791          972 :             if state.local_start_lsn == Lsn(0) {
     792          514 :                 state.local_start_lsn = msg.start_streaming_at;
     793          513 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     794          458 :             }
     795              :             // Initializing commit_lsn before acking first flushed record is
     796              :             // important to let find_end_of_wal skip the hole in the beginning
     797              :             // of the first segment.
     798              :             //
     799              :             // NB: on new clusters, this happens at the same time as
     800              :             // timeline_start_lsn initialization, it is taken outside to provide
     801              :             // upgrade.
     802          972 :             self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
     803          972 : 
     804          972 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     805          972 :             self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
     806          972 : 
     807          972 :             state.acceptor_state.term_history = msg.term_history.clone();
     808         2869 :             self.persist_control_file(state).await?;
     809              :         }
     810              : 
     811          971 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     812              : 
     813          972 :         Ok(None)
     814          972 :     }
     815              : 
     816              :     /// Advance commit_lsn taking into account what we have locally.
     817              :     ///
     818              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     819              :     /// already been done outside.
     820      2650125 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     821      2650125 :         // Both peers and walproposer communicate this value, we might already
     822      2650125 :         // have a fresher (higher) version.
     823      2650125 :         candidate = max(candidate, self.inmem.commit_lsn);
     824      2650125 :         let commit_lsn = min(candidate, self.flush_lsn());
     825      2650125 :         assert!(
     826      2650125 :             commit_lsn >= self.inmem.commit_lsn,
     827            0 :             "commit_lsn monotonicity violated: old={} new={}",
     828              :             self.inmem.commit_lsn,
     829              :             commit_lsn
     830              :         );
     831              : 
     832      2650125 :         self.inmem.commit_lsn = commit_lsn;
     833      2650125 : 
     834      2650125 :         // If new commit_lsn reached epoch switch, force sync of control
     835      2650125 :         // file: walproposer in sync mode is very interested when this
     836      2650125 :         // happens. Note: this is for sync-safekeepers mode only, as
     837      2650125 :         // otherwise commit_lsn might jump over epoch_start_lsn.
     838      2650125 :         // Also note that commit_lsn can reach epoch_start_lsn earlier
     839      2650125 :         // that we receive new epoch_start_lsn, and we still need to sync
     840      2650125 :         // control file in this case.
     841      2650125 :         if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
     842           63 :             self.persist_control_file(self.state.clone()).await?;
     843      2650104 :         }
     844              : 
     845      2650125 :         Ok(())
     846      2650125 :     }
     847              : 
     848              :     /// Persist control file to disk, called only after timeline creation (bootstrap).
     849          523 :     pub async fn persist(&mut self) -> Result<()> {
     850         1687 :         self.persist_control_file(self.state.clone()).await
     851          523 :     }
     852              : 
     853              :     /// Persist in-memory state to the disk, taking other data from state.
     854         2869 :     async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
     855         2869 :         state.commit_lsn = self.inmem.commit_lsn;
     856         2869 :         state.backup_lsn = self.inmem.backup_lsn;
     857         2869 :         state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     858         2869 :         state.proposer_uuid = self.inmem.proposer_uuid;
     859         8622 :         self.state.persist(&state).await
     860         2869 :     }
     861              : 
     862              :     /// Persist control file if there is something to save and enough time
     863              :     /// passed after the last save.
     864         1315 :     pub async fn maybe_persist_control_file(
     865         1315 :         &mut self,
     866         1315 :         inmem_remote_consistent_lsn: Lsn,
     867         1315 :     ) -> Result<()> {
     868         1315 :         const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
     869         1315 :         if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
     870         1314 :             return Ok(());
     871            1 :         }
     872            1 :         let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
     873            0 :             || self.inmem.backup_lsn > self.state.backup_lsn
     874            0 :             || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
     875            0 :             || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
     876            1 :         if need_persist {
     877            1 :             let mut state = self.state.clone();
     878            1 :             state.remote_consistent_lsn = inmem_remote_consistent_lsn;
     879            3 :             self.persist_control_file(state).await?;
     880            0 :             trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
     881            0 :         }
     882            1 :         Ok(())
     883         1315 :     }
     884              : 
     885              :     /// Handle request to append WAL.
     886              :     #[allow(clippy::comparison_chain)]
     887      2643247 :     async fn handle_append_request(
     888      2643247 :         &mut self,
     889      2643247 :         msg: &AppendRequest,
     890      2643247 :         require_flush: bool,
     891      2643247 :     ) -> Result<Option<AcceptorProposerMessage>> {
     892      2643247 :         if self.state.acceptor_state.term < msg.h.term {
     893            0 :             bail!("got AppendRequest before ProposerElected");
     894      2643247 :         }
     895      2643247 : 
     896      2643247 :         // If our term is higher, immediately refuse the message.
     897      2643247 :         if self.state.acceptor_state.term > msg.h.term {
     898           72 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     899           72 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     900      2643175 :         }
     901      2643175 : 
     902      2643175 :         // Now we know that we are in the same term as the proposer,
     903      2643175 :         // processing the message.
     904      2643175 : 
     905      2643175 :         self.epoch_start_lsn = msg.h.epoch_start_lsn;
     906      2643175 :         self.inmem.proposer_uuid = msg.h.proposer_uuid;
     907      2643175 : 
     908      2643175 :         // do the job
     909      2643175 :         if !msg.wal_data.is_empty() {
     910      1460404 :             self.wal_store
     911      1460404 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     912      3041998 :                 .await?;
     913      1182771 :         }
     914              : 
     915              :         // flush wal to the disk, if required
     916      2643175 :         if require_flush {
     917            5 :             self.wal_store.flush_wal().await?;
     918      2643170 :         }
     919              : 
     920              :         // Update commit_lsn.
     921      2643175 :         if msg.h.commit_lsn != Lsn(0) {
     922      2641221 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     923         1954 :         }
     924              :         // Value calculated by walproposer can always lag:
     925              :         // - safekeepers can forget inmem value and send to proposer lower
     926              :         //   persisted one on restart;
     927              :         // - if we make safekeepers always send persistent value,
     928              :         //   any compute restart would pull it down.
     929              :         // Thus, take max before adopting.
     930      2643175 :         self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
     931      2643175 : 
     932      2643175 :         // Update truncate and commit LSN in control file.
     933      2643175 :         // To avoid negative impact on performance of extra fsync, do it only
     934      2643175 :         // when truncate_lsn delta exceeds WAL segment size.
     935      2643175 :         if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
     936      2643175 :             < self.inmem.peer_horizon_lsn
     937              :         {
     938         3011 :             self.persist_control_file(self.state.clone()).await?;
     939      2642153 :         }
     940              : 
     941            0 :         trace!(
     942            0 :             "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
     943            0 :             msg.wal_data.len(),
     944            0 :             msg.h.end_lsn,
     945            0 :             msg.h.commit_lsn,
     946            0 :             msg.h.truncate_lsn,
     947            0 :             require_flush,
     948            0 :         );
     949              : 
     950              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
     951      2643175 :         if !require_flush {
     952      2643170 :             return Ok(None);
     953            5 :         }
     954            5 : 
     955            5 :         let resp = self.append_response();
     956            5 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
     957      2643247 :     }
     958              : 
     959              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
     960      2200208 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
     961      2200208 :         self.wal_store.flush_wal().await?;
     962      2200208 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
     963      2200208 :             self.append_response(),
     964      2200208 :         )))
     965      2200208 :     }
     966              : 
     967              :     /// Update timeline state with peer safekeeper data.
     968         9077 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     969         9077 :         let mut sync_control_file = false;
     970         9077 : 
     971         9077 :         if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
     972              :             // Note: the check is too restrictive, generally we can update local
     973              :             // commit_lsn if our history matches (is part of) history of advanced
     974              :             // commit_lsn provider.
     975         8913 :             if sk_info.last_log_term == self.get_epoch() {
     976         8904 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
     977            9 :             }
     978          164 :         }
     979              : 
     980         9077 :         let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
     981         9077 :         sync_control_file |=
     982         9077 :             self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
     983         9077 :         self.inmem.backup_lsn = new_backup_lsn;
     984         9077 : 
     985         9077 :         // value in sk_info should be maximized over our local in memory value.
     986         9077 :         let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
     987         9077 :         assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
     988         9077 :         sync_control_file |= self.state.remote_consistent_lsn
     989         9077 :             + (self.state.server.wal_seg_size as u64)
     990         9077 :             < new_remote_consistent_lsn;
     991         9077 : 
     992         9077 :         let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
     993         9077 :         sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
     994         9077 :             < new_peer_horizon_lsn;
     995         9077 :         self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
     996         9077 : 
     997         9077 :         if sync_control_file {
     998          330 :             let mut state = self.state.clone();
     999          330 :             // Note: we could make remote_consistent_lsn update in cf common by
    1000          330 :             // storing Arc to walsenders in Safekeeper.
    1001          330 :             state.remote_consistent_lsn = new_remote_consistent_lsn;
    1002          989 :             self.persist_control_file(state).await?;
    1003         8747 :         }
    1004         9077 :         Ok(())
    1005         9077 :     }
    1006              : 
    1007              :     /// Get oldest segno we still need to keep. We hold WAL till it is consumed
    1008              :     /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
    1009              :     /// offloading.
    1010              :     /// While it is safe to use inmem values for determining horizon,
    1011              :     /// we use persistent to make possible normal states less surprising.
    1012         1315 :     pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
    1013         1315 :         let mut horizon_lsn = min(
    1014         1315 :             self.state.remote_consistent_lsn,
    1015         1315 :             self.state.peer_horizon_lsn,
    1016         1315 :         );
    1017         1315 :         if wal_backup_enabled {
    1018         1315 :             horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
    1019         1315 :         }
    1020         1315 :         horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
    1021         1315 :     }
    1022              : }
    1023              : 
    1024              : #[cfg(test)]
    1025              : mod tests {
    1026              :     use futures::future::BoxFuture;
    1027              :     use postgres_ffi::WAL_SEGMENT_SIZE;
    1028              : 
    1029              :     use super::*;
    1030              :     use crate::wal_storage::Storage;
    1031              :     use std::{ops::Deref, time::Instant};
    1032              : 
    1033              :     // fake storage for tests
    1034              :     struct InMemoryState {
    1035              :         persisted_state: SafeKeeperState,
    1036              :     }
    1037              : 
    1038              :     #[async_trait::async_trait]
    1039              :     impl control_file::Storage for InMemoryState {
    1040            3 :         async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
    1041            3 :             self.persisted_state = s.clone();
    1042            3 :             Ok(())
    1043            3 :         }
    1044              : 
    1045            0 :         fn last_persist_at(&self) -> Instant {
    1046            0 :             Instant::now()
    1047            0 :         }
    1048              :     }
    1049              : 
    1050              :     impl Deref for InMemoryState {
    1051              :         type Target = SafeKeeperState;
    1052              : 
    1053           56 :         fn deref(&self) -> &Self::Target {
    1054           56 :             &self.persisted_state
    1055           56 :         }
    1056              :     }
    1057              : 
    1058            2 :     fn test_sk_state() -> SafeKeeperState {
    1059            2 :         let mut state = SafeKeeperState::empty();
    1060            2 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1061            2 :         state.tenant_id = TenantId::from([1u8; 16]);
    1062            2 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1063            2 :         state
    1064            2 :     }
    1065              : 
    1066              :     struct DummyWalStore {
    1067              :         lsn: Lsn,
    1068              :     }
    1069              : 
    1070              :     #[async_trait::async_trait]
    1071              :     impl wal_storage::Storage for DummyWalStore {
    1072            9 :         fn flush_lsn(&self) -> Lsn {
    1073            9 :             self.lsn
    1074            9 :         }
    1075              : 
    1076            2 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1077            2 :             self.lsn = startpos + buf.len() as u64;
    1078            2 :             Ok(())
    1079            2 :         }
    1080              : 
    1081            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1082            2 :             self.lsn = end_pos;
    1083            2 :             Ok(())
    1084            2 :         }
    1085              : 
    1086            4 :         async fn flush_wal(&mut self) -> Result<()> {
    1087            4 :             Ok(())
    1088            4 :         }
    1089              : 
    1090            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1091            0 :             Box::pin(async { Ok(()) })
    1092            0 :         }
    1093              : 
    1094            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1095            0 :             crate::metrics::WalStorageMetrics::default()
    1096            0 :         }
    1097              :     }
    1098              : 
    1099            1 :     #[tokio::test]
    1100            1 :     async fn test_voting() {
    1101            1 :         let storage = InMemoryState {
    1102            1 :             persisted_state: test_sk_state(),
    1103            1 :         };
    1104            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1105            1 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1106            1 : 
    1107            1 :         // check voting for 1 is ok
    1108            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1109            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1110            1 :         match vote_resp.unwrap() {
    1111            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1112            0 :             r => panic!("unexpected response: {:?}", r),
    1113              :         }
    1114              : 
    1115              :         // reboot...
    1116            1 :         let state = sk.state.persisted_state.clone();
    1117            1 :         let storage = InMemoryState {
    1118            1 :             persisted_state: state,
    1119            1 :         };
    1120            1 : 
    1121            1 :         sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
    1122              : 
    1123              :         // and ensure voting second time for 1 is not ok
    1124            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1125            1 :         match vote_resp.unwrap() {
    1126            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1127            0 :             r => panic!("unexpected response: {:?}", r),
    1128              :         }
    1129              :     }
    1130              : 
    1131            1 :     #[tokio::test]
    1132            1 :     async fn test_epoch_switch() {
    1133            1 :         let storage = InMemoryState {
    1134            1 :             persisted_state: test_sk_state(),
    1135            1 :         };
    1136            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1137            1 : 
    1138            1 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1139            1 : 
    1140            1 :         let mut ar_hdr = AppendRequestHeader {
    1141            1 :             term: 1,
    1142            1 :             epoch_start_lsn: Lsn(3),
    1143            1 :             begin_lsn: Lsn(1),
    1144            1 :             end_lsn: Lsn(2),
    1145            1 :             commit_lsn: Lsn(0),
    1146            1 :             truncate_lsn: Lsn(0),
    1147            1 :             proposer_uuid: [0; 16],
    1148            1 :         };
    1149            1 :         let mut append_request = AppendRequest {
    1150            1 :             h: ar_hdr.clone(),
    1151            1 :             wal_data: Bytes::from_static(b"b"),
    1152            1 :         };
    1153            1 : 
    1154            1 :         let pem = ProposerElected {
    1155            1 :             term: 1,
    1156            1 :             start_streaming_at: Lsn(1),
    1157            1 :             term_history: TermHistory(vec![TermLsn {
    1158            1 :                 term: 1,
    1159            1 :                 lsn: Lsn(3),
    1160            1 :             }]),
    1161            1 :             timeline_start_lsn: Lsn(0),
    1162            1 :         };
    1163            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1164            0 :             .await
    1165            1 :             .unwrap();
    1166              : 
    1167              :         // check that AppendRequest before epochStartLsn doesn't switch epoch
    1168            1 :         let resp = sk
    1169            1 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1170            0 :             .await;
    1171            1 :         assert!(resp.is_ok());
    1172            1 :         assert_eq!(sk.get_epoch(), 0);
    1173              : 
    1174              :         // but record at epochStartLsn does the switch
    1175            1 :         ar_hdr.begin_lsn = Lsn(2);
    1176            1 :         ar_hdr.end_lsn = Lsn(3);
    1177            1 :         append_request = AppendRequest {
    1178            1 :             h: ar_hdr,
    1179            1 :             wal_data: Bytes::from_static(b"b"),
    1180            1 :         };
    1181            1 :         let resp = sk
    1182            1 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1183            0 :             .await;
    1184            1 :         assert!(resp.is_ok());
    1185            1 :         sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
    1186            1 :         assert_eq!(sk.get_epoch(), 1);
    1187              :     }
    1188              : }
        

Generated by: LCOV version 2.1-beta