LCOV - differential code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 87.7 % 699 613 86 613
Current Date: 2023-10-19 02:04:12 Functions: 49.8 % 299 149 150 149
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Acceptor part of proposer-acceptor consensus algorithm.
       2                 : 
       3                 : use anyhow::{bail, Context, Result};
       4                 : use byteorder::{LittleEndian, ReadBytesExt};
       5                 : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6                 : 
       7                 : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
       8                 : use serde::{Deserialize, Serialize};
       9                 : use std::cmp::max;
      10                 : use std::cmp::min;
      11                 : use std::fmt;
      12                 : use std::io::Read;
      13                 : use std::time::Duration;
      14                 : use storage_broker::proto::SafekeeperTimelineInfo;
      15                 : 
      16                 : use tracing::*;
      17                 : 
      18                 : use crate::control_file;
      19                 : use crate::send_wal::HotStandbyFeedback;
      20                 : 
      21                 : use crate::wal_storage;
      22                 : use pq_proto::SystemId;
      23                 : use utils::pageserver_feedback::PageserverFeedback;
      24                 : use utils::{
      25                 :     bin_ser::LeSer,
      26                 :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      27                 :     lsn::Lsn,
      28                 : };
      29                 : 
      30                 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      31                 : pub const SK_FORMAT_VERSION: u32 = 7;
      32                 : const SK_PROTOCOL_VERSION: u32 = 2;
      33                 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      34                 : 
      35                 : /// Consensus logical timestamp.
      36                 : pub type Term = u64;
      37                 : pub const INVALID_TERM: Term = 0;
      38                 : 
      39 CBC       16470 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      40                 : pub struct TermLsn {
      41                 :     pub term: Term,
      42                 :     pub lsn: Lsn,
      43                 : }
      44                 : 
      45                 : // Creation from tuple provides less typing (e.g. for unit tests).
      46                 : impl From<(Term, Lsn)> for TermLsn {
      47         5378031 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      48         5378031 :         TermLsn {
      49         5378031 :             term: pair.0,
      50         5378031 :             lsn: pair.1,
      51         5378031 :         }
      52         5378031 :     }
      53                 : }
      54                 : 
      55           14592 : #[derive(Clone, Serialize, Deserialize)]
      56                 : pub struct TermHistory(pub Vec<TermLsn>);
      57                 : 
      58                 : impl TermHistory {
      59             485 :     pub fn empty() -> TermHistory {
      60             485 :         TermHistory(Vec::new())
      61             485 :     }
      62                 : 
      63                 :     // Parse TermHistory as n_entries followed by TermLsn pairs
      64            1016 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      65            1016 :         if bytes.remaining() < 4 {
      66 UBC           0 :             bail!("TermHistory misses len");
      67 CBC        1016 :         }
      68            1016 :         let n_entries = bytes.get_u32_le();
      69            1016 :         let mut res = Vec::with_capacity(n_entries as usize);
      70            1016 :         for _ in 0..n_entries {
      71            7108 :             if bytes.remaining() < 16 {
      72 UBC           0 :                 bail!("TermHistory is incomplete");
      73 CBC        7108 :             }
      74            7108 :             res.push(TermLsn {
      75            7108 :                 term: bytes.get_u64_le(),
      76            7108 :                 lsn: bytes.get_u64_le().into(),
      77            7108 :             })
      78                 :         }
      79            1016 :         Ok(TermHistory(res))
      80            1016 :     }
      81                 : 
      82                 :     /// Return copy of self with switches happening strictly after up_to
      83                 :     /// truncated.
      84           19491 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      85           19491 :         let mut res = Vec::with_capacity(self.0.len());
      86           82324 :         for e in &self.0 {
      87           62837 :             if e.lsn > up_to {
      88               4 :                 break;
      89           62833 :             }
      90           62833 :             res.push(*e);
      91                 :         }
      92           19491 :         TermHistory(res)
      93           19491 :     }
      94                 : }
      95                 : 
      96                 : /// Display only latest entries for Debug.
      97                 : impl fmt::Debug for TermHistory {
      98            3028 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
      99            3028 :         let n_printed = 20;
     100            3028 :         write!(
     101            3028 :             fmt,
     102            3028 :             "{}{:?}",
     103            3028 :             if self.0.len() > n_printed { "... " } else { "" },
     104            3028 :             self.0
     105            3028 :                 .iter()
     106            3028 :                 .rev()
     107            3028 :                 .take(n_printed)
     108           12246 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     109            3028 :                 .collect::<Vec<_>>()
     110            3028 :         )
     111            3028 :     }
     112                 : }
     113                 : 
     114                 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     115                 : pub type PgUuid = [u8; 16];
     116                 : 
     117                 : /// Persistent consensus state of the acceptor.
     118           13572 : #[derive(Debug, Clone, Serialize, Deserialize)]
     119                 : pub struct AcceptorState {
     120                 :     /// acceptor's last term it voted for (advanced in 1 phase)
     121                 :     pub term: Term,
     122                 :     /// History of term switches for safekeeper's WAL.
     123                 :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     124                 :     /// from the proposer before recovery.
     125                 :     pub term_history: TermHistory,
     126                 : }
     127                 : 
     128                 : impl AcceptorState {
     129                 :     /// acceptor's epoch is the term of the highest entry in the log
     130           17477 :     pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
     131           17477 :         let th = self.term_history.up_to(flush_lsn);
     132           17477 :         match th.0.last() {
     133           16941 :             Some(e) => e.term,
     134             536 :             None => 0,
     135                 :         }
     136           17477 :     }
     137                 : }
     138                 : 
     139                 : /// Information about Postgres. Safekeeper gets it once and then verifies
     140                 : /// all further connections from computes match.
     141           13572 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     142                 : pub struct ServerInfo {
     143                 :     /// Postgres server version
     144                 :     pub pg_version: u32,
     145                 :     pub system_id: SystemId,
     146                 :     pub wal_seg_size: u32,
     147                 : }
     148                 : 
     149 UBC           0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     150                 : pub struct PersistedPeerInfo {
     151                 :     /// LSN up to which safekeeper offloaded WAL to s3.
     152                 :     backup_lsn: Lsn,
     153                 :     /// Term of the last entry.
     154                 :     term: Term,
     155                 :     /// LSN of the last record.
     156                 :     flush_lsn: Lsn,
     157                 :     /// Up to which LSN safekeeper regards its WAL as committed.
     158                 :     commit_lsn: Lsn,
     159                 : }
     160                 : 
     161                 : impl PersistedPeerInfo {
     162               0 :     fn new() -> Self {
     163               0 :         Self {
     164               0 :             backup_lsn: Lsn::INVALID,
     165               0 :             term: INVALID_TERM,
     166               0 :             flush_lsn: Lsn(0),
     167               0 :             commit_lsn: Lsn(0),
     168               0 :         }
     169               0 :     }
     170                 : }
     171                 : 
     172 CBC       13572 : #[derive(Debug, Clone, Serialize, Deserialize)]
     173                 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
     174                 : 
     175                 : /// Persistent information stored on safekeeper node
     176                 : /// On disk data is prefixed by magic and format version and followed by checksum.
     177           13572 : #[derive(Debug, Clone, Serialize, Deserialize)]
     178                 : pub struct SafeKeeperState {
     179                 :     #[serde(with = "hex")]
     180                 :     pub tenant_id: TenantId,
     181                 :     #[serde(with = "hex")]
     182                 :     pub timeline_id: TimelineId,
     183                 :     /// persistent acceptor state
     184                 :     pub acceptor_state: AcceptorState,
     185                 :     /// information about server
     186                 :     pub server: ServerInfo,
     187                 :     /// Unique id of the last *elected* proposer we dealt with. Not needed
     188                 :     /// for correctness, exists for monitoring purposes.
     189                 :     #[serde(with = "hex")]
     190                 :     pub proposer_uuid: PgUuid,
     191                 :     /// Since which LSN this timeline generally starts. Safekeeper might have
     192                 :     /// joined later.
     193                 :     pub timeline_start_lsn: Lsn,
     194                 :     /// Since which LSN safekeeper has (had) WAL for this timeline.
     195                 :     /// All WAL segments next to one containing local_start_lsn are
     196                 :     /// filled with data from the beginning.
     197                 :     pub local_start_lsn: Lsn,
     198                 :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
     199                 :     /// to record boundary.
     200                 :     pub commit_lsn: Lsn,
     201                 :     /// LSN that points to the end of the last backed up segment. Useful to
     202                 :     /// persist to avoid finding out offloading progress on boot.
     203                 :     pub backup_lsn: Lsn,
     204                 :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
     205                 :     /// of last record streamed to everyone). Persisting it helps skipping
     206                 :     /// recovery in walproposer, generally we compute it from peers. In
     207                 :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
     208                 :     /// only by walproposer.
     209                 :     pub peer_horizon_lsn: Lsn,
     210                 :     /// LSN of the oldest known checkpoint made by pageserver and successfully
     211                 :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
     212                 :     /// informational purposes, we receive it from pageserver (or broker).
     213                 :     pub remote_consistent_lsn: Lsn,
     214                 :     // Peers and their state as we remember it. Knowing peers themselves is
     215                 :     // fundamental; but state is saved here only for informational purposes and
     216                 :     // obviously can be stale. (Currently not saved at all, but let's provision
     217                 :     // place to have less file version upgrades).
     218                 :     pub peers: PersistedPeers,
     219                 : }
     220                 : 
     221            2734 : #[derive(Debug, Clone, Serialize, Deserialize)]
     222                 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
     223                 : // are not flushed yet.
     224                 : pub struct SafekeeperMemState {
     225                 :     pub commit_lsn: Lsn,
     226                 :     pub backup_lsn: Lsn,
     227                 :     pub peer_horizon_lsn: Lsn,
     228                 :     #[serde(with = "hex")]
     229                 :     pub proposer_uuid: PgUuid,
     230                 : }
     231                 : 
     232                 : impl SafeKeeperState {
     233             485 :     pub fn new(
     234             485 :         ttid: &TenantTimelineId,
     235             485 :         server_info: ServerInfo,
     236             485 :         peers: Vec<NodeId>,
     237             485 :         commit_lsn: Lsn,
     238             485 :         local_start_lsn: Lsn,
     239             485 :     ) -> SafeKeeperState {
     240             485 :         SafeKeeperState {
     241             485 :             tenant_id: ttid.tenant_id,
     242             485 :             timeline_id: ttid.timeline_id,
     243             485 :             acceptor_state: AcceptorState {
     244             485 :                 term: 0,
     245             485 :                 term_history: TermHistory::empty(),
     246             485 :             },
     247             485 :             server: server_info,
     248             485 :             proposer_uuid: [0; 16],
     249             485 :             timeline_start_lsn: Lsn(0),
     250             485 :             local_start_lsn,
     251             485 :             commit_lsn,
     252             485 :             backup_lsn: local_start_lsn,
     253             485 :             peer_horizon_lsn: local_start_lsn,
     254             485 :             remote_consistent_lsn: Lsn(0),
     255             485 :             peers: PersistedPeers(
     256             485 :                 peers
     257             485 :                     .iter()
     258             485 :                     .map(|p| (*p, PersistedPeerInfo::new()))
     259             485 :                     .collect(),
     260             485 :             ),
     261             485 :         }
     262             485 :     }
     263                 : 
     264                 :     #[cfg(test)]
     265               4 :     pub fn empty() -> Self {
     266               4 :         SafeKeeperState::new(
     267               4 :             &TenantTimelineId::empty(),
     268               4 :             ServerInfo {
     269               4 :                 pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
     270               4 :                 system_id: 0,                       /* Postgres system identifier */
     271               4 :                 wal_seg_size: 0,
     272               4 :             },
     273               4 :             vec![],
     274               4 :             Lsn::INVALID,
     275               4 :             Lsn::INVALID,
     276               4 :         )
     277               4 :     }
     278                 : }
     279                 : 
     280                 : // protocol messages
     281                 : 
     282                 : /// Initial Proposer -> Acceptor message
     283            2012 : #[derive(Debug, Deserialize)]
     284                 : pub struct ProposerGreeting {
     285                 :     /// proposer-acceptor protocol version
     286                 :     pub protocol_version: u32,
     287                 :     /// Postgres server version
     288                 :     pub pg_version: u32,
     289                 :     pub proposer_id: PgUuid,
     290                 :     pub system_id: SystemId,
     291                 :     pub timeline_id: TimelineId,
     292                 :     pub tenant_id: TenantId,
     293                 :     pub tli: TimeLineID,
     294                 :     pub wal_seg_size: u32,
     295                 : }
     296                 : 
     297                 : /// Acceptor -> Proposer initial response: the highest term known to me
     298                 : /// (acceptor voted for).
     299 UBC           0 : #[derive(Debug, Serialize)]
     300                 : pub struct AcceptorGreeting {
     301                 :     term: u64,
     302                 :     node_id: NodeId,
     303                 : }
     304                 : 
     305                 : /// Vote request sent from proposer to safekeepers
     306 CBC        2009 : #[derive(Debug, Deserialize)]
     307                 : pub struct VoteRequest {
     308                 :     term: Term,
     309                 : }
     310                 : 
     311                 : /// Vote itself, sent from safekeeper to proposer
     312            2009 : #[derive(Debug, Serialize)]
     313                 : pub struct VoteResponse {
     314                 :     term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     315                 :     vote_given: u64, // fixme u64 due to padding
     316                 :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     317                 :     // proposer to choose the most advanced one.
     318                 :     flush_lsn: Lsn,
     319                 :     truncate_lsn: Lsn,
     320                 :     term_history: TermHistory,
     321                 :     timeline_start_lsn: Lsn,
     322                 : }
     323                 : 
     324                 : /*
     325                 :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     326                 :  * term history to it.
     327                 :  */
     328            1019 : #[derive(Debug)]
     329                 : pub struct ProposerElected {
     330                 :     pub term: Term,
     331                 :     pub start_streaming_at: Lsn,
     332                 :     pub term_history: TermHistory,
     333                 :     pub timeline_start_lsn: Lsn,
     334                 : }
     335                 : 
     336                 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     337                 : /// communicates commit_lsn.
     338 UBC           0 : #[derive(Debug)]
     339                 : pub struct AppendRequest {
     340                 :     pub h: AppendRequestHeader,
     341                 :     pub wal_data: Bytes,
     342                 : }
     343 CBC     2912723 : #[derive(Debug, Clone, Deserialize)]
     344                 : pub struct AppendRequestHeader {
     345                 :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     346                 :     pub term: Term,
     347                 :     // LSN since the proposer appends WAL; determines epoch switch point.
     348                 :     pub epoch_start_lsn: Lsn,
     349                 :     /// start position of message in WAL
     350                 :     pub begin_lsn: Lsn,
     351                 :     /// end position of message in WAL
     352                 :     pub end_lsn: Lsn,
     353                 :     /// LSN committed by quorum of safekeepers
     354                 :     pub commit_lsn: Lsn,
     355                 :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     356                 :     pub truncate_lsn: Lsn,
     357                 :     // only for logging/debugging
     358                 :     pub proposer_uuid: PgUuid,
     359                 : }
     360                 : 
     361                 : /// Report safekeeper state to proposer
     362               3 : #[derive(Debug, Serialize)]
     363                 : pub struct AppendResponse {
     364                 :     // Current term of the safekeeper; if it is higher than proposer's, the
     365                 :     // compute is out of date.
     366                 :     pub term: Term,
     367                 :     // NOTE: this is physical end of wal on safekeeper; currently it doesn't
     368                 :     // make much sense without taking epoch into account, as history can be
     369                 :     // diverged.
     370                 :     pub flush_lsn: Lsn,
     371                 :     // We report back our awareness about which WAL is committed, as this is
     372                 :     // a criterion for walproposer --sync mode exit
     373                 :     pub commit_lsn: Lsn,
     374                 :     pub hs_feedback: HotStandbyFeedback,
     375                 :     pub pageserver_feedback: PageserverFeedback,
     376                 : }
     377                 : 
     378                 : impl AppendResponse {
     379              57 :     fn term_only(term: Term) -> AppendResponse {
     380              57 :         AppendResponse {
     381              57 :             term,
     382              57 :             flush_lsn: Lsn(0),
     383              57 :             commit_lsn: Lsn(0),
     384              57 :             hs_feedback: HotStandbyFeedback::empty(),
     385              57 :             pageserver_feedback: PageserverFeedback::empty(),
     386              57 :         }
     387              57 :     }
     388                 : }
     389                 : 
     390                 : /// Proposer -> Acceptor messages
     391 UBC           0 : #[derive(Debug)]
     392                 : pub enum ProposerAcceptorMessage {
     393                 :     Greeting(ProposerGreeting),
     394                 :     VoteRequest(VoteRequest),
     395                 :     Elected(ProposerElected),
     396                 :     AppendRequest(AppendRequest),
     397                 :     NoFlushAppendRequest(AppendRequest),
     398                 :     FlushWAL,
     399                 : }
     400                 : 
     401                 : impl ProposerAcceptorMessage {
     402                 :     /// Parse proposer message.
     403 CBC     2917760 :     pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
     404         2917760 :         // xxx using Reader is inefficient but easy to work with bincode
     405         2917760 :         let mut stream = msg_bytes.reader();
     406                 :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     407         2917760 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     408         2917760 :         match tag {
     409                 :             'g' => {
     410            2012 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     411            2012 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     412                 :             }
     413                 :             'v' => {
     414            2009 :                 let msg = VoteRequest::des_from(&mut stream)?;
     415            2009 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     416                 :             }
     417                 :             'e' => {
     418            1016 :                 let mut msg_bytes = stream.into_inner();
     419            1016 :                 if msg_bytes.remaining() < 16 {
     420 UBC           0 :                     bail!("ProposerElected message is not complete");
     421 CBC        1016 :                 }
     422            1016 :                 let term = msg_bytes.get_u64_le();
     423            1016 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     424            1016 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     425            1016 :                 if msg_bytes.remaining() < 8 {
     426 UBC           0 :                     bail!("ProposerElected message is not complete");
     427 CBC        1016 :                 }
     428            1016 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     429            1016 :                 let msg = ProposerElected {
     430            1016 :                     term,
     431            1016 :                     start_streaming_at,
     432            1016 :                     timeline_start_lsn,
     433            1016 :                     term_history,
     434            1016 :                 };
     435            1016 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     436                 :             }
     437                 :             'a' => {
     438                 :                 // read header followed by wal data
     439         2912723 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     440         2912723 :                 let rec_size = hdr
     441         2912723 :                     .end_lsn
     442         2912723 :                     .checked_sub(hdr.begin_lsn)
     443         2912723 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     444                 :                     .0 as usize;
     445         2912723 :                 if rec_size > MAX_SEND_SIZE {
     446 UBC           0 :                     bail!(
     447               0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     448               0 :                         MAX_SEND_SIZE
     449               0 :                     );
     450 CBC     2912723 :                 }
     451         2912723 : 
     452         2912723 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     453         2912723 :                 stream.read_exact(&mut wal_data_vec)?;
     454         2912723 :                 let wal_data = Bytes::from(wal_data_vec);
     455         2912723 :                 let msg = AppendRequest { h: hdr, wal_data };
     456         2912723 : 
     457         2912723 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     458                 :             }
     459 UBC           0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     460                 :         }
     461 CBC     2917760 :     }
     462                 : }
     463                 : 
     464                 : /// Acceptor -> Proposer messages
     465 UBC           0 : #[derive(Debug)]
     466                 : pub enum AcceptorProposerMessage {
     467                 :     Greeting(AcceptorGreeting),
     468                 :     VoteResponse(VoteResponse),
     469                 :     AppendResponse(AppendResponse),
     470                 : }
     471                 : 
     472                 : impl AcceptorProposerMessage {
     473                 :     /// Serialize acceptor -> proposer message.
     474 CBC     2463777 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     475         2463777 :         match self {
     476            2012 :             AcceptorProposerMessage::Greeting(msg) => {
     477            2012 :                 buf.put_u64_le('g' as u64);
     478            2012 :                 buf.put_u64_le(msg.term);
     479            2012 :                 buf.put_u64_le(msg.node_id.0);
     480            2012 :             }
     481            2008 :             AcceptorProposerMessage::VoteResponse(msg) => {
     482            2008 :                 buf.put_u64_le('v' as u64);
     483            2008 :                 buf.put_u64_le(msg.term);
     484            2008 :                 buf.put_u64_le(msg.vote_given);
     485            2008 :                 buf.put_u64_le(msg.flush_lsn.into());
     486            2008 :                 buf.put_u64_le(msg.truncate_lsn.into());
     487            2008 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     488            9713 :                 for e in &msg.term_history.0 {
     489            7705 :                     buf.put_u64_le(e.term);
     490            7705 :                     buf.put_u64_le(e.lsn.into());
     491            7705 :                 }
     492            2008 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     493                 :             }
     494         2459757 :             AcceptorProposerMessage::AppendResponse(msg) => {
     495         2459757 :                 buf.put_u64_le('a' as u64);
     496         2459757 :                 buf.put_u64_le(msg.term);
     497         2459757 :                 buf.put_u64_le(msg.flush_lsn.into());
     498         2459757 :                 buf.put_u64_le(msg.commit_lsn.into());
     499         2459757 :                 buf.put_i64_le(msg.hs_feedback.ts);
     500         2459757 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     501         2459757 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     502         2459757 : 
     503         2459757 :                 msg.pageserver_feedback.serialize(buf);
     504         2459757 :             }
     505                 :         }
     506                 : 
     507         2463777 :         Ok(())
     508         2463777 :     }
     509                 : }
     510                 : 
     511                 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     512                 : /// It controls all WAL disk writes and updates of control file.
     513                 : ///
     514                 : /// Currently safekeeper processes:
     515                 : /// - messages from compute (proposers) and provides replies
     516                 : /// - messages from broker peers
     517                 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     518                 :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     519                 :     /// determines epoch switch point.
     520                 :     pub epoch_start_lsn: Lsn,
     521                 : 
     522                 :     pub inmem: SafekeeperMemState, // in memory part
     523                 :     pub state: CTRL,               // persistent state storage
     524                 : 
     525                 :     pub wal_store: WAL,
     526                 : 
     527                 :     node_id: NodeId, // safekeeper's node id
     528                 : }
     529                 : 
     530                 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     531                 : where
     532                 :     CTRL: control_file::Storage,
     533                 :     WAL: wal_storage::Storage,
     534                 : {
     535                 :     /// Accepts a control file storage containing the safekeeper state.
     536                 :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     537                 :     /// and `server` (`wal_seg_size` inside it) fields.
     538             568 :     pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
     539             568 :         if state.tenant_id == TenantId::from([0u8; 16])
     540             568 :             || state.timeline_id == TimelineId::from([0u8; 16])
     541                 :         {
     542 UBC           0 :             bail!(
     543               0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     544               0 :                 state.tenant_id,
     545               0 :                 state.timeline_id
     546               0 :             );
     547 CBC         568 :         }
     548             568 : 
     549             568 :         Ok(SafeKeeper {
     550             568 :             epoch_start_lsn: Lsn(0),
     551             568 :             inmem: SafekeeperMemState {
     552             568 :                 commit_lsn: state.commit_lsn,
     553             568 :                 backup_lsn: state.backup_lsn,
     554             568 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     555             568 :                 proposer_uuid: state.proposer_uuid,
     556             568 :             },
     557             568 :             state,
     558             568 :             wal_store,
     559             568 :             node_id,
     560             568 :         })
     561             568 :     }
     562                 : 
     563                 :     /// Get history of term switches for the available WAL
     564            2011 :     fn get_term_history(&self) -> TermHistory {
     565            2011 :         self.state
     566            2011 :             .acceptor_state
     567            2011 :             .term_history
     568            2011 :             .up_to(self.flush_lsn())
     569            2011 :     }
     570                 : 
     571                 :     /// Get current term.
     572         5377550 :     pub fn get_term(&self) -> Term {
     573         5377550 :         self.state.acceptor_state.term
     574         5377550 :     }
     575                 : 
     576           17265 :     pub fn get_epoch(&self) -> Term {
     577           17265 :         self.state.acceptor_state.get_epoch(self.flush_lsn())
     578           17265 :     }
     579                 : 
     580                 :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     581        10784999 :     pub fn flush_lsn(&self) -> Lsn {
     582        10784999 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     583        10784999 :     }
     584                 : 
     585                 :     /// Process message from proposer and possibly form reply. Concurrent
     586                 :     /// callers must exclude each other.
     587         5377471 :     pub async fn process_msg(
     588         5377471 :         &mut self,
     589         5377471 :         msg: &ProposerAcceptorMessage,
     590         5377471 :     ) -> Result<Option<AcceptorProposerMessage>> {
     591         5377471 :         match msg {
     592            2012 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     593            5846 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     594         1624569 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     595               5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     596              18 :                 self.handle_append_request(msg, true).await
     597                 :             }
     598         2912723 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     599         3134402 :                 self.handle_append_request(msg, false).await
     600                 :             }
     601         2459700 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     602                 :         }
     603         5377471 :     }
     604                 : 
     605                 :     /// Handle initial message from proposer: check its sanity and send my
     606                 :     /// current term.
     607            2012 :     async fn handle_greeting(
     608            2012 :         &mut self,
     609            2012 :         msg: &ProposerGreeting,
     610            2012 :     ) -> Result<Option<AcceptorProposerMessage>> {
     611            2012 :         // Check protocol compatibility
     612            2012 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     613 UBC           0 :             bail!(
     614               0 :                 "incompatible protocol version {}, expected {}",
     615               0 :                 msg.protocol_version,
     616               0 :                 SK_PROTOCOL_VERSION
     617               0 :             );
     618 CBC        2012 :         }
     619            2012 :         /* Postgres major version mismatch is treated as fatal error
     620            2012 :          * because safekeepers parse WAL headers and the format
     621            2012 :          * may change between versions.
     622            2012 :          */
     623            2012 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     624 UBC           0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     625                 :         {
     626               0 :             bail!(
     627               0 :                 "incompatible server version {}, expected {}",
     628               0 :                 msg.pg_version,
     629               0 :                 self.state.server.pg_version
     630               0 :             );
     631 CBC        2012 :         }
     632            2012 : 
     633            2012 :         if msg.tenant_id != self.state.tenant_id {
     634 UBC           0 :             bail!(
     635               0 :                 "invalid tenant ID, got {}, expected {}",
     636               0 :                 msg.tenant_id,
     637               0 :                 self.state.tenant_id
     638               0 :             );
     639 CBC        2012 :         }
     640            2012 :         if msg.timeline_id != self.state.timeline_id {
     641 UBC           0 :             bail!(
     642               0 :                 "invalid timeline ID, got {}, expected {}",
     643               0 :                 msg.timeline_id,
     644               0 :                 self.state.timeline_id
     645               0 :             );
     646 CBC        2012 :         }
     647            2012 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     648 UBC           0 :             bail!(
     649               0 :                 "invalid wal_seg_size, got {}, expected {}",
     650               0 :                 msg.wal_seg_size,
     651               0 :                 self.state.server.wal_seg_size
     652               0 :             );
     653 CBC        2012 :         }
     654            2012 : 
     655            2012 :         // system_id will be updated on mismatch
     656            2012 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     657            2012 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     658             475 :             if self.state.server.system_id != 0 {
     659 UBC           0 :                 warn!(
     660               0 :                     "unexpected system ID arrived, got {}, expected {}",
     661               0 :                     msg.system_id, self.state.server.system_id
     662               0 :                 );
     663 CBC         475 :             }
     664                 : 
     665             475 :             let mut state = self.state.clone();
     666             475 :             state.server.system_id = msg.system_id;
     667             475 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     668             475 :                 state.server.pg_version = msg.pg_version;
     669             475 :             }
     670            1425 :             self.state.persist(&state).await?;
     671            1537 :         }
     672                 : 
     673            2012 :         info!(
     674            2012 :             "processed greeting from walproposer {}, sending term {:?}",
     675           32192 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     676            2012 :             self.state.acceptor_state.term
     677            2012 :         );
     678            2012 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     679            2012 :             term: self.state.acceptor_state.term,
     680            2012 :             node_id: self.node_id,
     681            2012 :         })))
     682            2012 :     }
     683                 : 
     684                 :     /// Give vote for the given term, if we haven't done that previously.
     685            2011 :     async fn handle_vote_request(
     686            2011 :         &mut self,
     687            2011 :         msg: &VoteRequest,
     688            2011 :     ) -> Result<Option<AcceptorProposerMessage>> {
     689                 :         // Once voted, we won't accept data from older proposers; flush
     690                 :         // everything we've already received so that new proposer starts
     691                 :         // streaming at end of our WAL, without overlap. Currently we truncate
     692                 :         // WAL at streaming point, so this avoids truncating already committed
     693                 :         // WAL.
     694                 :         //
     695                 :         // TODO: it would be smoother to not truncate committed piece at
     696                 :         // handle_elected instead. Currently not a big deal, as proposer is the
     697                 :         // only source of WAL; with peer2peer recovery it would be more
     698                 :         // important.
     699            2011 :         self.wal_store.flush_wal().await?;
     700                 :         // initialize with refusal
     701            2011 :         let mut resp = VoteResponse {
     702            2011 :             term: self.state.acceptor_state.term,
     703            2011 :             vote_given: false as u64,
     704            2011 :             flush_lsn: self.flush_lsn(),
     705            2011 :             truncate_lsn: self.inmem.peer_horizon_lsn,
     706            2011 :             term_history: self.get_term_history(),
     707            2011 :             timeline_start_lsn: self.state.timeline_start_lsn,
     708            2011 :         };
     709            2011 :         if self.state.acceptor_state.term < msg.term {
     710            1950 :             let mut state = self.state.clone();
     711            1950 :             state.acceptor_state.term = msg.term;
     712            1950 :             // persist vote before sending it out
     713            5846 :             self.state.persist(&state).await?;
     714                 : 
     715            1950 :             resp.term = self.state.acceptor_state.term;
     716            1950 :             resp.vote_given = true as u64;
     717              61 :         }
     718            2009 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     719            2011 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     720            2011 :     }
     721                 : 
     722                 :     /// Form AppendResponse from current state.
     723         2459705 :     fn append_response(&self) -> AppendResponse {
     724         2459705 :         let ar = AppendResponse {
     725         2459705 :             term: self.state.acceptor_state.term,
     726         2459705 :             flush_lsn: self.flush_lsn(),
     727         2459705 :             commit_lsn: self.state.commit_lsn,
     728         2459705 :             // will be filled by the upper code to avoid bothering safekeeper
     729         2459705 :             hs_feedback: HotStandbyFeedback::empty(),
     730         2459705 :             pageserver_feedback: PageserverFeedback::empty(),
     731         2459705 :         };
     732         2459705 :         trace!("formed AppendResponse {:?}", ar);
     733         2459705 :         ar
     734         2459705 :     }
     735                 : 
     736            1020 :     async fn handle_elected(
     737            1020 :         &mut self,
     738            1020 :         msg: &ProposerElected,
     739            1020 :     ) -> Result<Option<AcceptorProposerMessage>> {
     740            1019 :         info!("received ProposerElected {:?}", msg);
     741            1020 :         if self.state.acceptor_state.term < msg.term {
     742               4 :             let mut state = self.state.clone();
     743               4 :             state.acceptor_state.term = msg.term;
     744              10 :             self.state.persist(&state).await?;
     745            1016 :         }
     746                 : 
     747                 :         // If our term is higher, ignore the message (next feedback will inform the compute)
     748            1020 :         if self.state.acceptor_state.term > msg.term {
     749 UBC           0 :             return Ok(None);
     750 CBC        1020 :         }
     751            1020 : 
     752            1020 :         // This might happen in a rare race when another (old) connection from
     753            1020 :         // the same walproposer writes + flushes WAL after this connection
     754            1020 :         // already sent flush_lsn in VoteRequest. It is generally safe to
     755            1020 :         // proceed, but to prevent commit_lsn surprisingly going down we should
     756            1020 :         // either refuse the session (simpler) or skip the part we already have
     757            1020 :         // from the stream (can be implemented).
     758            1020 :         if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
     759 UBC           0 :             bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
     760               0 :                    msg.term, self.flush_lsn(), msg.start_streaming_at)
     761 CBC        1020 :         }
     762            1020 :         // Otherwise this shouldn't happen.
     763            1020 :         assert!(
     764            1020 :             msg.start_streaming_at >= self.inmem.commit_lsn,
     765 UBC           0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
     766                 :             msg.start_streaming_at,
     767                 :             self.inmem.commit_lsn
     768                 :         );
     769                 : 
     770                 :         // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
     771                 :         // intersection of our history and history from msg
     772                 : 
     773                 :         // truncate wal, update the LSNs
     774 CBC     1621532 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     775                 : 
     776                 :         // and now adopt term history from proposer
     777                 :         {
     778            1020 :             let mut state = self.state.clone();
     779            1020 : 
     780            1020 :             // Here we learn initial LSN for the first time, set fields
     781            1020 :             // interested in that.
     782            1020 : 
     783            1020 :             if state.timeline_start_lsn == Lsn(0) {
     784                 :                 // Remember point where WAL begins globally.
     785             480 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     786             479 :                 info!(
     787             479 :                     "setting timeline_start_lsn to {:?}",
     788             479 :                     state.timeline_start_lsn
     789             479 :                 );
     790             540 :             }
     791            1020 :             if state.local_start_lsn == Lsn(0) {
     792             471 :                 state.local_start_lsn = msg.start_streaming_at;
     793             470 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     794             549 :             }
     795                 :             // Initializing commit_lsn before acking first flushed record is
     796                 :             // important to let find_end_of_wal skip the hole in the beginning
     797                 :             // of the first segment.
     798                 :             //
     799                 :             // NB: on new clusters, this happens at the same time as
     800                 :             // timeline_start_lsn initialization, it is taken outside to provide
     801                 :             // upgrade.
     802            1020 :             self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
     803            1020 : 
     804            1020 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     805            1020 :             self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
     806            1020 : 
     807            1020 :             state.acceptor_state.term_history = msg.term_history.clone();
     808            3027 :             self.persist_control_file(state).await?;
     809                 :         }
     810                 : 
     811            1019 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     812                 : 
     813            1020 :         Ok(None)
     814            1020 :     }
     815                 : 
     816                 :     /// Advance commit_lsn taking into account what we have locally.
     817                 :     ///
     818                 :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     819                 :     /// already been done outside.
     820         2919732 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     821         2919732 :         // Both peers and walproposer communicate this value, we might already
     822         2919732 :         // have a fresher (higher) version.
     823         2919732 :         candidate = max(candidate, self.inmem.commit_lsn);
     824         2919732 :         let commit_lsn = min(candidate, self.flush_lsn());
     825         2919732 :         assert!(
     826         2919732 :             commit_lsn >= self.inmem.commit_lsn,
     827 UBC           0 :             "commit_lsn monotonicity violated: old={} new={}",
     828                 :             self.inmem.commit_lsn,
     829                 :             commit_lsn
     830                 :         );
     831                 : 
     832 CBC     2919732 :         self.inmem.commit_lsn = commit_lsn;
     833         2919732 : 
     834         2919732 :         // If new commit_lsn reached epoch switch, force sync of control
     835         2919732 :         // file: walproposer in sync mode is very interested when this
     836         2919732 :         // happens. Note: this is for sync-safekeepers mode only, as
     837         2919732 :         // otherwise commit_lsn might jump over epoch_start_lsn.
     838         2919732 :         // Also note that commit_lsn can reach epoch_start_lsn earlier
     839         2919732 :         // that we receive new epoch_start_lsn, and we still need to sync
     840         2919732 :         // control file in this case.
     841         2919732 :         if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
     842              75 :             self.persist_control_file(self.state.clone()).await?;
     843         2919707 :         }
     844                 : 
     845         2919732 :         Ok(())
     846         2919732 :     }
     847                 : 
     848                 :     /// Persist control file to disk, called only after timeline creation (bootstrap).
     849             481 :     pub async fn persist(&mut self) -> Result<()> {
     850            1559 :         self.persist_control_file(self.state.clone()).await
     851             481 :     }
     852                 : 
     853                 :     /// Persist in-memory state to the disk, taking other data from state.
     854            2990 :     async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
     855            2990 :         state.commit_lsn = self.inmem.commit_lsn;
     856            2990 :         state.backup_lsn = self.inmem.backup_lsn;
     857            2990 :         state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     858            2990 :         state.proposer_uuid = self.inmem.proposer_uuid;
     859            9008 :         self.state.persist(&state).await
     860            2990 :     }
     861                 : 
     862                 :     /// Persist control file if there is something to save and enough time
     863                 :     /// passed after the last save.
     864            1363 :     pub async fn maybe_persist_control_file(
     865            1363 :         &mut self,
     866            1363 :         inmem_remote_consistent_lsn: Lsn,
     867            1363 :     ) -> Result<()> {
     868            1363 :         const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
     869            1363 :         if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
     870            1325 :             return Ok(());
     871              38 :         }
     872              38 :         let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
     873              12 :             || self.inmem.backup_lsn > self.state.backup_lsn
     874              12 :             || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
     875              12 :             || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
     876              38 :         if need_persist {
     877              27 :             let mut state = self.state.clone();
     878              27 :             state.remote_consistent_lsn = inmem_remote_consistent_lsn;
     879              81 :             self.persist_control_file(state).await?;
     880 UBC           0 :             trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
     881 CBC          11 :         }
     882              38 :         Ok(())
     883            1363 :     }
     884                 : 
     885                 :     /// Handle request to append WAL.
     886                 :     #[allow(clippy::comparison_chain)]
     887         2912728 :     async fn handle_append_request(
     888         2912728 :         &mut self,
     889         2912728 :         msg: &AppendRequest,
     890         2912728 :         require_flush: bool,
     891         2912728 :     ) -> Result<Option<AcceptorProposerMessage>> {
     892         2912728 :         if self.state.acceptor_state.term < msg.h.term {
     893 UBC           0 :             bail!("got AppendRequest before ProposerElected");
     894 CBC     2912728 :         }
     895         2912728 : 
     896         2912728 :         // If our term is higher, immediately refuse the message.
     897         2912728 :         if self.state.acceptor_state.term > msg.h.term {
     898              57 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     899              57 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     900         2912671 :         }
     901         2912671 : 
     902         2912671 :         // Now we know that we are in the same term as the proposer,
     903         2912671 :         // processing the message.
     904         2912671 : 
     905         2912671 :         self.epoch_start_lsn = msg.h.epoch_start_lsn;
     906         2912671 :         self.inmem.proposer_uuid = msg.h.proposer_uuid;
     907         2912671 : 
     908         2912671 :         // do the job
     909         2912671 :         if !msg.wal_data.is_empty() {
     910         1597354 :             self.wal_store
     911         1597354 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     912         3131744 :                 .await?;
     913         1315317 :         }
     914                 : 
     915                 :         // flush wal to the disk, if required
     916         2912671 :         if require_flush {
     917               5 :             self.wal_store.flush_wal().await?;
     918         2912666 :         }
     919                 : 
     920                 :         // Update commit_lsn.
     921         2912671 :         if msg.h.commit_lsn != Lsn(0) {
     922         2910168 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     923            2503 :         }
     924                 :         // Value calculated by walproposer can always lag:
     925                 :         // - safekeepers can forget inmem value and send to proposer lower
     926                 :         //   persisted one on restart;
     927                 :         // - if we make safekeepers always send persistent value,
     928                 :         //   any compute restart would pull it down.
     929                 :         // Thus, take max before adopting.
     930         2912671 :         self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
     931         2912671 : 
     932         2912671 :         // Update truncate and commit LSN in control file.
     933         2912671 :         // To avoid negative impact on performance of extra fsync, do it only
     934         2912671 :         // when truncate_lsn delta exceeds WAL segment size.
     935         2912671 :         if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
     936         2912671 :             < self.inmem.peer_horizon_lsn
     937                 :         {
     938            2610 :             self.persist_control_file(self.state.clone()).await?;
     939         2911786 :         }
     940                 : 
     941 UBC           0 :         trace!(
     942               0 :             "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
     943               0 :             msg.wal_data.len(),
     944               0 :             msg.h.end_lsn,
     945               0 :             msg.h.commit_lsn,
     946               0 :             msg.h.truncate_lsn,
     947               0 :             require_flush,
     948               0 :         );
     949                 : 
     950                 :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
     951 CBC     2912671 :         if !require_flush {
     952         2912666 :             return Ok(None);
     953               5 :         }
     954               5 : 
     955               5 :         let resp = self.append_response();
     956               5 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
     957         2912728 :     }
     958                 : 
     959                 :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
     960         2459700 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
     961         2459700 :         self.wal_store.flush_wal().await?;
     962         2459700 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
     963         2459700 :             self.append_response(),
     964         2459700 :         )))
     965         2459700 :     }
     966                 : 
     967                 :     /// Update timeline state with peer safekeeper data.
     968            9742 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     969            9742 :         let mut sync_control_file = false;
     970            9742 : 
     971            9742 :         if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
     972                 :             // Note: the check is too restrictive, generally we can update local
     973                 :             // commit_lsn if our history matches (is part of) history of advanced
     974                 :             // commit_lsn provider.
     975            9578 :             if sk_info.last_log_term == self.get_epoch() {
     976            9564 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
     977              14 :             }
     978             164 :         }
     979                 : 
     980            9742 :         let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
     981            9742 :         sync_control_file |=
     982            9742 :             self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
     983            9742 :         self.inmem.backup_lsn = new_backup_lsn;
     984            9742 : 
     985            9742 :         // value in sk_info should be maximized over our local in memory value.
     986            9742 :         let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
     987            9742 :         assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
     988            9742 :         sync_control_file |= self.state.remote_consistent_lsn
     989            9742 :             + (self.state.server.wal_seg_size as u64)
     990            9742 :             < new_remote_consistent_lsn;
     991            9742 : 
     992            9742 :         let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
     993            9742 :         sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
     994            9742 :             < new_peer_horizon_lsn;
     995            9742 :         self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
     996            9742 : 
     997            9742 :         if sync_control_file {
     998             552 :             let mut state = self.state.clone();
     999             552 :             // Note: we could make remote_consistent_lsn update in cf common by
    1000             552 :             // storing Arc to walsenders in Safekeeper.
    1001             552 :             state.remote_consistent_lsn = new_remote_consistent_lsn;
    1002            1656 :             self.persist_control_file(state).await?;
    1003            9190 :         }
    1004            9742 :         Ok(())
    1005            9742 :     }
    1006                 : 
    1007                 :     /// Get oldest segno we still need to keep. We hold WAL till it is consumed
    1008                 :     /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
    1009                 :     /// offloading.
    1010                 :     /// While it is safe to use inmem values for determining horizon,
    1011                 :     /// we use persistent to make possible normal states less surprising.
    1012            1363 :     pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
    1013            1363 :         let mut horizon_lsn = min(
    1014            1363 :             self.state.remote_consistent_lsn,
    1015            1363 :             self.state.peer_horizon_lsn,
    1016            1363 :         );
    1017            1363 :         if wal_backup_enabled {
    1018            1363 :             horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
    1019            1363 :         }
    1020            1363 :         horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
    1021            1363 :     }
    1022                 : }
    1023                 : 
    1024                 : #[cfg(test)]
    1025                 : mod tests {
    1026                 :     use futures::future::BoxFuture;
    1027                 :     use postgres_ffi::WAL_SEGMENT_SIZE;
    1028                 : 
    1029                 :     use super::*;
    1030                 :     use crate::wal_storage::Storage;
    1031                 :     use std::{ops::Deref, time::Instant};
    1032                 : 
    1033                 :     // fake storage for tests
    1034                 :     struct InMemoryState {
    1035                 :         persisted_state: SafeKeeperState,
    1036                 :     }
    1037                 : 
    1038                 :     #[async_trait::async_trait]
    1039                 :     impl control_file::Storage for InMemoryState {
    1040               3 :         async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
    1041               3 :             self.persisted_state = s.clone();
    1042               3 :             Ok(())
    1043               3 :         }
    1044                 : 
    1045 UBC           0 :         fn last_persist_at(&self) -> Instant {
    1046               0 :             Instant::now()
    1047               0 :         }
    1048                 :     }
    1049                 : 
    1050                 :     impl Deref for InMemoryState {
    1051                 :         type Target = SafeKeeperState;
    1052                 : 
    1053 CBC          56 :         fn deref(&self) -> &Self::Target {
    1054              56 :             &self.persisted_state
    1055              56 :         }
    1056                 :     }
    1057                 : 
    1058               2 :     fn test_sk_state() -> SafeKeeperState {
    1059               2 :         let mut state = SafeKeeperState::empty();
    1060               2 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1061               2 :         state.tenant_id = TenantId::from([1u8; 16]);
    1062               2 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1063               2 :         state
    1064               2 :     }
    1065                 : 
    1066                 :     struct DummyWalStore {
    1067                 :         lsn: Lsn,
    1068                 :     }
    1069                 : 
    1070                 :     #[async_trait::async_trait]
    1071                 :     impl wal_storage::Storage for DummyWalStore {
    1072               9 :         fn flush_lsn(&self) -> Lsn {
    1073               9 :             self.lsn
    1074               9 :         }
    1075                 : 
    1076               2 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1077               2 :             self.lsn = startpos + buf.len() as u64;
    1078               2 :             Ok(())
    1079               2 :         }
    1080                 : 
    1081               2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1082               2 :             self.lsn = end_pos;
    1083               2 :             Ok(())
    1084               2 :         }
    1085                 : 
    1086               4 :         async fn flush_wal(&mut self) -> Result<()> {
    1087               4 :             Ok(())
    1088               4 :         }
    1089                 : 
    1090 UBC           0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1091               0 :             Box::pin(async { Ok(()) })
    1092               0 :         }
    1093                 : 
    1094               0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1095               0 :             crate::metrics::WalStorageMetrics::default()
    1096               0 :         }
    1097                 :     }
    1098                 : 
    1099 CBC           1 :     #[tokio::test]
    1100               1 :     async fn test_voting() {
    1101               1 :         let storage = InMemoryState {
    1102               1 :             persisted_state: test_sk_state(),
    1103               1 :         };
    1104               1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1105               1 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1106               1 : 
    1107               1 :         // check voting for 1 is ok
    1108               1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1109               1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1110               1 :         match vote_resp.unwrap() {
    1111               1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1112 UBC           0 :             r => panic!("unexpected response: {:?}", r),
    1113                 :         }
    1114                 : 
    1115                 :         // reboot...
    1116 CBC           1 :         let state = sk.state.persisted_state.clone();
    1117               1 :         let storage = InMemoryState {
    1118               1 :             persisted_state: state,
    1119               1 :         };
    1120               1 : 
    1121               1 :         sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
    1122                 : 
    1123                 :         // and ensure voting second time for 1 is not ok
    1124               1 :         vote_resp = sk.process_msg(&vote_request).await;
    1125               1 :         match vote_resp.unwrap() {
    1126               1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1127 UBC           0 :             r => panic!("unexpected response: {:?}", r),
    1128                 :         }
    1129                 :     }
    1130                 : 
    1131 CBC           1 :     #[tokio::test]
    1132               1 :     async fn test_epoch_switch() {
    1133               1 :         let storage = InMemoryState {
    1134               1 :             persisted_state: test_sk_state(),
    1135               1 :         };
    1136               1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1137               1 : 
    1138               1 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1139               1 : 
    1140               1 :         let mut ar_hdr = AppendRequestHeader {
    1141               1 :             term: 1,
    1142               1 :             epoch_start_lsn: Lsn(3),
    1143               1 :             begin_lsn: Lsn(1),
    1144               1 :             end_lsn: Lsn(2),
    1145               1 :             commit_lsn: Lsn(0),
    1146               1 :             truncate_lsn: Lsn(0),
    1147               1 :             proposer_uuid: [0; 16],
    1148               1 :         };
    1149               1 :         let mut append_request = AppendRequest {
    1150               1 :             h: ar_hdr.clone(),
    1151               1 :             wal_data: Bytes::from_static(b"b"),
    1152               1 :         };
    1153               1 : 
    1154               1 :         let pem = ProposerElected {
    1155               1 :             term: 1,
    1156               1 :             start_streaming_at: Lsn(1),
    1157               1 :             term_history: TermHistory(vec![TermLsn {
    1158               1 :                 term: 1,
    1159               1 :                 lsn: Lsn(3),
    1160               1 :             }]),
    1161               1 :             timeline_start_lsn: Lsn(0),
    1162               1 :         };
    1163               1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1164 UBC           0 :             .await
    1165 CBC           1 :             .unwrap();
    1166                 : 
    1167                 :         // check that AppendRequest before epochStartLsn doesn't switch epoch
    1168               1 :         let resp = sk
    1169               1 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1170 UBC           0 :             .await;
    1171 CBC           1 :         assert!(resp.is_ok());
    1172               1 :         assert_eq!(sk.get_epoch(), 0);
    1173                 : 
    1174                 :         // but record at epochStartLsn does the switch
    1175               1 :         ar_hdr.begin_lsn = Lsn(2);
    1176               1 :         ar_hdr.end_lsn = Lsn(3);
    1177               1 :         append_request = AppendRequest {
    1178               1 :             h: ar_hdr,
    1179               1 :             wal_data: Bytes::from_static(b"b"),
    1180               1 :         };
    1181               1 :         let resp = sk
    1182               1 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1183 UBC           0 :             .await;
    1184 CBC           1 :         assert!(resp.is_ok());
    1185               1 :         sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
    1186               1 :         assert_eq!(sk.get_epoch(), 1);
    1187                 :     }
    1188                 : }
        

Generated by: LCOV version 2.1-beta