LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 87.2 % 857 747
Test Date: 2024-02-12 20:26:03 Functions: 57.1 % 287 164

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use anyhow::{bail, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt};
       5              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6              : 
       7              : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
       8              : use serde::{Deserialize, Serialize};
       9              : use std::cmp::max;
      10              : use std::cmp::min;
      11              : use std::fmt;
      12              : use std::io::Read;
      13              : use std::time::Duration;
      14              : use storage_broker::proto::SafekeeperTimelineInfo;
      15              : 
      16              : use tracing::*;
      17              : 
      18              : use crate::control_file;
      19              : use crate::send_wal::HotStandbyFeedback;
      20              : 
      21              : use crate::state::TimelineState;
      22              : use crate::wal_storage;
      23              : use pq_proto::SystemId;
      24              : use utils::pageserver_feedback::PageserverFeedback;
      25              : use utils::{
      26              :     bin_ser::LeSer,
      27              :     id::{NodeId, TenantId, TimelineId},
      28              :     lsn::Lsn,
      29              : };
      30              : 
      31              : const SK_PROTOCOL_VERSION: u32 = 2;
      32              : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      33              : 
      34              : /// Consensus logical timestamp.
      35              : pub type Term = u64;
      36              : pub const INVALID_TERM: Term = 0;
      37              : 
      38         8733 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      39              : pub struct TermLsn {
      40              :     pub term: Term,
      41              :     pub lsn: Lsn,
      42              : }
      43              : 
      44              : // Creation from tuple provides less typing (e.g. for unit tests).
      45              : impl From<(Term, Lsn)> for TermLsn {
      46      4337756 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      47      4337756 :         TermLsn {
      48      4337756 :             term: pair.0,
      49      4337756 :             lsn: pair.1,
      50      4337756 :         }
      51      4337756 :     }
      52              : }
      53              : 
      54        13745 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      55              : pub struct TermHistory(pub Vec<TermLsn>);
      56              : 
      57              : impl TermHistory {
      58          535 :     pub fn empty() -> TermHistory {
      59          535 :         TermHistory(Vec::new())
      60          535 :     }
      61              : 
      62              :     // Parse TermHistory as n_entries followed by TermLsn pairs
      63          864 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      64          864 :         if bytes.remaining() < 4 {
      65            0 :             bail!("TermHistory misses len");
      66          864 :         }
      67          864 :         let n_entries = bytes.get_u32_le();
      68          864 :         let mut res = Vec::with_capacity(n_entries as usize);
      69          864 :         for _ in 0..n_entries {
      70         3516 :             if bytes.remaining() < 16 {
      71            0 :                 bail!("TermHistory is incomplete");
      72         3516 :             }
      73         3516 :             res.push(TermLsn {
      74         3516 :                 term: bytes.get_u64_le(),
      75         3516 :                 lsn: bytes.get_u64_le().into(),
      76         3516 :             })
      77              :         }
      78          864 :         Ok(TermHistory(res))
      79          864 :     }
      80              : 
      81              :     /// Return copy of self with switches happening strictly after up_to
      82              :     /// truncated.
      83        21872 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      84        21872 :         let mut res = Vec::with_capacity(self.0.len());
      85        57506 :         for e in &self.0 {
      86        35659 :             if e.lsn > up_to {
      87           25 :                 break;
      88        35634 :             }
      89        35634 :             res.push(*e);
      90              :         }
      91        21872 :         TermHistory(res)
      92        21872 :     }
      93              : 
      94              :     /// Find point of divergence between leader (walproposer) term history and
      95              :     /// safekeeper. Arguments are not symmetrics as proposer history ends at
      96              :     /// +infinity while safekeeper at flush_lsn.
      97              :     /// C version is at walproposer SendProposerElected.
      98            9 :     pub fn find_highest_common_point(
      99            9 :         prop_th: &TermHistory,
     100            9 :         sk_th: &TermHistory,
     101            9 :         sk_wal_end: Lsn,
     102            9 :     ) -> Option<TermLsn> {
     103            9 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     104              : 
     105            9 :         if let Some(sk_th_last) = sk_th.last() {
     106            9 :             assert!(
     107            9 :                 sk_th_last.lsn <= sk_wal_end,
     108            0 :                 "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
     109              :                 sk_th_last,
     110              :                 sk_wal_end
     111              :             );
     112            0 :         }
     113              : 
     114              :         // find last common term, if any...
     115            9 :         let mut last_common_idx = None;
     116           17 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     117           17 :             if prop_th[i].term != sk_th[i].term {
     118            4 :                 break;
     119           13 :             }
     120           13 :             // If term is the same, LSN must be equal as well.
     121           13 :             assert!(
     122           13 :                 prop_th[i].lsn == sk_th[i].lsn,
     123            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     124            0 :                 prop_th[i].term,
     125            0 :                 prop_th[i].lsn,
     126            0 :                 sk_th[i].lsn
     127              :             );
     128           13 :             last_common_idx = Some(i);
     129              :         }
     130            9 :         let last_common_idx = match last_common_idx {
     131            2 :             None => return None, // no common point
     132            7 :             Some(lci) => lci,
     133            7 :         };
     134            7 :         // Now find where it ends at both prop and sk and take min. End of
     135            7 :         // (common) term is the start of the next except it is the last one;
     136            7 :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     137            7 :         // +infinity, so we just take flush_lsn then.
     138            7 :         if last_common_idx == prop_th.len() - 1 {
     139            2 :             Some(TermLsn {
     140            2 :                 term: prop_th[last_common_idx].term,
     141            2 :                 lsn: sk_wal_end,
     142            2 :             })
     143              :         } else {
     144            5 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     145            5 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     146            2 :                 sk_th[last_common_idx + 1].lsn
     147              :             } else {
     148            3 :                 sk_wal_end
     149              :             };
     150            5 :             Some(TermLsn {
     151            5 :                 term: prop_th[last_common_idx].term,
     152            5 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     153            5 :             })
     154              :         }
     155            9 :     }
     156              : }
     157              : 
     158              : /// Display only latest entries for Debug.
     159              : impl fmt::Debug for TermHistory {
     160         2715 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     161         2715 :         let n_printed = 20;
     162         2715 :         write!(
     163         2715 :             fmt,
     164         2715 :             "{}{:?}",
     165         2715 :             if self.0.len() > n_printed { "... " } else { "" },
     166         2715 :             self.0
     167         2715 :                 .iter()
     168         2715 :                 .rev()
     169         2715 :                 .take(n_printed)
     170         6643 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     171         2715 :                 .collect::<Vec<_>>()
     172         2715 :         )
     173         2715 :     }
     174              : }
     175              : 
     176              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     177              : pub type PgUuid = [u8; 16];
     178              : 
     179              : /// Persistent consensus state of the acceptor.
     180        12875 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     181              : pub struct AcceptorState {
     182              :     /// acceptor's last term it voted for (advanced in 1 phase)
     183              :     pub term: Term,
     184              :     /// History of term switches for safekeeper's WAL.
     185              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     186              :     /// from the proposer before recovery.
     187              :     pub term_history: TermHistory,
     188              : }
     189              : 
     190              : impl AcceptorState {
     191              :     /// acceptor's epoch is the term of the highest entry in the log
     192        20020 :     pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
     193        20020 :         let th = self.term_history.up_to(flush_lsn);
     194        20020 :         match th.0.last() {
     195        19514 :             Some(e) => e.term,
     196          506 :             None => 0,
     197              :         }
     198        20020 :     }
     199              : }
     200              : 
     201              : /// Information about Postgres. Safekeeper gets it once and then verifies
     202              : /// all further connections from computes match.
     203        12923 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     204              : pub struct ServerInfo {
     205              :     /// Postgres server version
     206              :     pub pg_version: u32,
     207              :     pub system_id: SystemId,
     208              :     pub wal_seg_size: u32,
     209              : }
     210              : 
     211            8 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     212              : pub struct PersistedPeerInfo {
     213              :     /// LSN up to which safekeeper offloaded WAL to s3.
     214              :     pub backup_lsn: Lsn,
     215              :     /// Term of the last entry.
     216              :     pub term: Term,
     217              :     /// LSN of the last record.
     218              :     pub flush_lsn: Lsn,
     219              :     /// Up to which LSN safekeeper regards its WAL as committed.
     220              :     pub commit_lsn: Lsn,
     221              : }
     222              : 
     223              : impl PersistedPeerInfo {
     224            0 :     pub fn new() -> Self {
     225            0 :         Self {
     226            0 :             backup_lsn: Lsn::INVALID,
     227            0 :             term: INVALID_TERM,
     228            0 :             flush_lsn: Lsn(0),
     229            0 :             commit_lsn: Lsn(0),
     230            0 :         }
     231            0 :     }
     232              : }
     233              : 
     234              : // make clippy happy
     235              : impl Default for PersistedPeerInfo {
     236            0 :     fn default() -> Self {
     237            0 :         Self::new()
     238            0 :     }
     239              : }
     240              : 
     241              : // protocol messages
     242              : 
     243              : /// Initial Proposer -> Acceptor message
     244         1848 : #[derive(Debug, Deserialize)]
     245              : pub struct ProposerGreeting {
     246              :     /// proposer-acceptor protocol version
     247              :     pub protocol_version: u32,
     248              :     /// Postgres server version
     249              :     pub pg_version: u32,
     250              :     pub proposer_id: PgUuid,
     251              :     pub system_id: SystemId,
     252              :     pub timeline_id: TimelineId,
     253              :     pub tenant_id: TenantId,
     254              :     pub tli: TimeLineID,
     255              :     pub wal_seg_size: u32,
     256              : }
     257              : 
     258              : /// Acceptor -> Proposer initial response: the highest term known to me
     259              : /// (acceptor voted for).
     260            0 : #[derive(Debug, Serialize)]
     261              : pub struct AcceptorGreeting {
     262              :     term: u64,
     263              :     node_id: NodeId,
     264              : }
     265              : 
     266              : /// Vote request sent from proposer to safekeepers
     267         1844 : #[derive(Debug, Deserialize)]
     268              : pub struct VoteRequest {
     269              :     pub term: Term,
     270              : }
     271              : 
     272              : /// Vote itself, sent from safekeeper to proposer
     273         1845 : #[derive(Debug, Serialize)]
     274              : pub struct VoteResponse {
     275              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     276              :     vote_given: u64, // fixme u64 due to padding
     277              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     278              :     // proposer to choose the most advanced one.
     279              :     pub flush_lsn: Lsn,
     280              :     truncate_lsn: Lsn,
     281              :     pub term_history: TermHistory,
     282              :     timeline_start_lsn: Lsn,
     283              : }
     284              : 
     285              : /*
     286              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     287              :  * term history to it.
     288              :  */
     289          868 : #[derive(Debug)]
     290              : pub struct ProposerElected {
     291              :     pub term: Term,
     292              :     pub start_streaming_at: Lsn,
     293              :     pub term_history: TermHistory,
     294              :     pub timeline_start_lsn: Lsn,
     295              : }
     296              : 
     297              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     298              : /// communicates commit_lsn.
     299            0 : #[derive(Debug)]
     300              : pub struct AppendRequest {
     301              :     pub h: AppendRequestHeader,
     302              :     pub wal_data: Bytes,
     303              : }
     304      2672121 : #[derive(Debug, Clone, Deserialize)]
     305              : pub struct AppendRequestHeader {
     306              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     307              :     pub term: Term,
     308              :     // TODO: remove this field, it in unused -- LSN of term switch can be taken
     309              :     // from ProposerElected (as well as from term history).
     310              :     pub epoch_start_lsn: Lsn,
     311              :     /// start position of message in WAL
     312              :     pub begin_lsn: Lsn,
     313              :     /// end position of message in WAL
     314              :     pub end_lsn: Lsn,
     315              :     /// LSN committed by quorum of safekeepers
     316              :     pub commit_lsn: Lsn,
     317              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     318              :     pub truncate_lsn: Lsn,
     319              :     // only for logging/debugging
     320              :     pub proposer_uuid: PgUuid,
     321              : }
     322              : 
     323              : /// Report safekeeper state to proposer
     324            3 : #[derive(Debug, Serialize)]
     325              : pub struct AppendResponse {
     326              :     // Current term of the safekeeper; if it is higher than proposer's, the
     327              :     // compute is out of date.
     328              :     pub term: Term,
     329              :     // NOTE: this is physical end of wal on safekeeper; currently it doesn't
     330              :     // make much sense without taking epoch into account, as history can be
     331              :     // diverged.
     332              :     pub flush_lsn: Lsn,
     333              :     // We report back our awareness about which WAL is committed, as this is
     334              :     // a criterion for walproposer --sync mode exit
     335              :     pub commit_lsn: Lsn,
     336              :     pub hs_feedback: HotStandbyFeedback,
     337              :     pub pageserver_feedback: PageserverFeedback,
     338              : }
     339              : 
     340              : impl AppendResponse {
     341            0 :     fn term_only(term: Term) -> AppendResponse {
     342            0 :         AppendResponse {
     343            0 :             term,
     344            0 :             flush_lsn: Lsn(0),
     345            0 :             commit_lsn: Lsn(0),
     346            0 :             hs_feedback: HotStandbyFeedback::empty(),
     347            0 :             pageserver_feedback: PageserverFeedback::empty(),
     348            0 :         }
     349            0 :     }
     350              : }
     351              : 
     352              : /// Proposer -> Acceptor messages
     353            0 : #[derive(Debug)]
     354              : pub enum ProposerAcceptorMessage {
     355              :     Greeting(ProposerGreeting),
     356              :     VoteRequest(VoteRequest),
     357              :     Elected(ProposerElected),
     358              :     AppendRequest(AppendRequest),
     359              :     NoFlushAppendRequest(AppendRequest),
     360              :     FlushWAL,
     361              : }
     362              : 
     363              : impl ProposerAcceptorMessage {
     364              :     /// Parse proposer message.
     365      2676677 :     pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
     366      2676677 :         // xxx using Reader is inefficient but easy to work with bincode
     367      2676677 :         let mut stream = msg_bytes.reader();
     368              :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     369      2676677 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     370      2676677 :         match tag {
     371              :             'g' => {
     372         1848 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     373         1848 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     374              :             }
     375              :             'v' => {
     376         1844 :                 let msg = VoteRequest::des_from(&mut stream)?;
     377         1844 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     378              :             }
     379              :             'e' => {
     380          864 :                 let mut msg_bytes = stream.into_inner();
     381          864 :                 if msg_bytes.remaining() < 16 {
     382            0 :                     bail!("ProposerElected message is not complete");
     383          864 :                 }
     384          864 :                 let term = msg_bytes.get_u64_le();
     385          864 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     386          864 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     387          864 :                 if msg_bytes.remaining() < 8 {
     388            0 :                     bail!("ProposerElected message is not complete");
     389          864 :                 }
     390          864 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     391          864 :                 let msg = ProposerElected {
     392          864 :                     term,
     393          864 :                     start_streaming_at,
     394          864 :                     timeline_start_lsn,
     395          864 :                     term_history,
     396          864 :                 };
     397          864 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     398              :             }
     399              :             'a' => {
     400              :                 // read header followed by wal data
     401      2672121 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     402      2672121 :                 let rec_size = hdr
     403      2672121 :                     .end_lsn
     404      2672121 :                     .checked_sub(hdr.begin_lsn)
     405      2672121 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     406              :                     .0 as usize;
     407      2672121 :                 if rec_size > MAX_SEND_SIZE {
     408            0 :                     bail!(
     409            0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     410            0 :                         MAX_SEND_SIZE
     411            0 :                     );
     412      2672121 :                 }
     413      2672121 : 
     414      2672121 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     415      2672121 :                 stream.read_exact(&mut wal_data_vec)?;
     416      2672121 :                 let wal_data = Bytes::from(wal_data_vec);
     417      2672121 :                 let msg = AppendRequest { h: hdr, wal_data };
     418      2672121 : 
     419      2672121 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     420              :             }
     421            0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     422              :         }
     423      2676677 :     }
     424              : }
     425              : 
     426              : /// Acceptor -> Proposer messages
     427            0 : #[derive(Debug)]
     428              : pub enum AcceptorProposerMessage {
     429              :     Greeting(AcceptorGreeting),
     430              :     VoteResponse(VoteResponse),
     431              :     AppendResponse(AppendResponse),
     432              : }
     433              : 
     434              : impl AcceptorProposerMessage {
     435              :     /// Serialize acceptor -> proposer message.
     436      1663975 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     437      1663975 :         match self {
     438         1848 :             AcceptorProposerMessage::Greeting(msg) => {
     439         1848 :                 buf.put_u64_le('g' as u64);
     440         1848 :                 buf.put_u64_le(msg.term);
     441         1848 :                 buf.put_u64_le(msg.node_id.0);
     442         1848 :             }
     443         1843 :             AcceptorProposerMessage::VoteResponse(msg) => {
     444         1843 :                 buf.put_u64_le('v' as u64);
     445         1843 :                 buf.put_u64_le(msg.term);
     446         1843 :                 buf.put_u64_le(msg.vote_given);
     447         1843 :                 buf.put_u64_le(msg.flush_lsn.into());
     448         1843 :                 buf.put_u64_le(msg.truncate_lsn.into());
     449         1843 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     450         5768 :                 for e in &msg.term_history.0 {
     451         3925 :                     buf.put_u64_le(e.term);
     452         3925 :                     buf.put_u64_le(e.lsn.into());
     453         3925 :                 }
     454         1843 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     455              :             }
     456      1660284 :             AcceptorProposerMessage::AppendResponse(msg) => {
     457      1660284 :                 buf.put_u64_le('a' as u64);
     458      1660284 :                 buf.put_u64_le(msg.term);
     459      1660284 :                 buf.put_u64_le(msg.flush_lsn.into());
     460      1660284 :                 buf.put_u64_le(msg.commit_lsn.into());
     461      1660284 :                 buf.put_i64_le(msg.hs_feedback.ts);
     462      1660284 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     463      1660284 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     464      1660284 : 
     465      1660284 :                 msg.pageserver_feedback.serialize(buf);
     466      1660284 :             }
     467              :         }
     468              : 
     469      1663975 :         Ok(())
     470      1663975 :     }
     471              : }
     472              : 
     473              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     474              : /// It controls all WAL disk writes and updates of control file.
     475              : ///
     476              : /// Currently safekeeper processes:
     477              : /// - messages from compute (proposers) and provides replies
     478              : /// - messages from broker peers
     479              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     480              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     481              :     /// determines epoch switch point.
     482              :     pub epoch_start_lsn: Lsn,
     483              : 
     484              :     pub state: TimelineState<CTRL>, // persistent state storage
     485              :     pub wal_store: WAL,
     486              : 
     487              :     node_id: NodeId, // safekeeper's node id
     488              : }
     489              : 
     490              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     491              : where
     492              :     CTRL: control_file::Storage,
     493              :     WAL: wal_storage::Storage,
     494              : {
     495              :     /// Accepts a control file storage containing the safekeeper state.
     496              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     497              :     /// and `server` (`wal_seg_size` inside it) fields.
     498          618 :     pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
     499          618 :         if state.tenant_id == TenantId::from([0u8; 16])
     500          618 :             || state.timeline_id == TimelineId::from([0u8; 16])
     501              :         {
     502            0 :             bail!(
     503            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     504            0 :                 state.tenant_id,
     505            0 :                 state.timeline_id
     506            0 :             );
     507          618 :         }
     508          618 : 
     509          618 :         Ok(SafeKeeper {
     510          618 :             epoch_start_lsn: Lsn(0),
     511          618 :             state: TimelineState::new(state),
     512          618 :             wal_store,
     513          618 :             node_id,
     514          618 :         })
     515          618 :     }
     516              : 
     517              :     /// Get history of term switches for the available WAL
     518         1849 :     fn get_term_history(&self) -> TermHistory {
     519         1849 :         self.state
     520         1849 :             .acceptor_state
     521         1849 :             .term_history
     522         1849 :             .up_to(self.flush_lsn())
     523         1849 :     }
     524              : 
     525              :     /// Get current term.
     526      4337241 :     pub fn get_term(&self) -> Term {
     527      4337241 :         self.state.acceptor_state.term
     528      4337241 :     }
     529              : 
     530        19765 :     pub fn get_epoch(&self) -> Term {
     531        19765 :         self.state.acceptor_state.get_epoch(self.flush_lsn())
     532        19765 :     }
     533              : 
     534              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     535      8709461 :     pub fn flush_lsn(&self) -> Lsn {
     536      8709461 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     537      8709461 :     }
     538              : 
     539              :     /// Process message from proposer and possibly form reply. Concurrent
     540              :     /// callers must exclude each other.
     541      4337120 :     pub async fn process_msg(
     542      4337120 :         &mut self,
     543      4337120 :         msg: &ProposerAcceptorMessage,
     544      4337120 :     ) -> Result<Option<AcceptorProposerMessage>> {
     545      4337120 :         match msg {
     546         1848 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     547         5379 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     548      1590861 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     549            7 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     550           12 :                 self.handle_append_request(msg, true).await
     551              :             }
     552      2672260 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     553      3149426 :                 self.handle_append_request(msg, false).await
     554              :             }
     555      1660286 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     556              :         }
     557      4337120 :     }
     558              : 
     559              :     /// Handle initial message from proposer: check its sanity and send my
     560              :     /// current term.
     561         1848 :     async fn handle_greeting(
     562         1848 :         &mut self,
     563         1848 :         msg: &ProposerGreeting,
     564         1848 :     ) -> Result<Option<AcceptorProposerMessage>> {
     565         1848 :         // Check protocol compatibility
     566         1848 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     567            0 :             bail!(
     568            0 :                 "incompatible protocol version {}, expected {}",
     569            0 :                 msg.protocol_version,
     570            0 :                 SK_PROTOCOL_VERSION
     571            0 :             );
     572         1848 :         }
     573         1848 :         /* Postgres major version mismatch is treated as fatal error
     574         1848 :          * because safekeepers parse WAL headers and the format
     575         1848 :          * may change between versions.
     576         1848 :          */
     577         1848 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     578            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     579              :         {
     580            0 :             bail!(
     581            0 :                 "incompatible server version {}, expected {}",
     582            0 :                 msg.pg_version,
     583            0 :                 self.state.server.pg_version
     584            0 :             );
     585         1848 :         }
     586         1848 : 
     587         1848 :         if msg.tenant_id != self.state.tenant_id {
     588            0 :             bail!(
     589            0 :                 "invalid tenant ID, got {}, expected {}",
     590            0 :                 msg.tenant_id,
     591            0 :                 self.state.tenant_id
     592            0 :             );
     593         1848 :         }
     594         1848 :         if msg.timeline_id != self.state.timeline_id {
     595            0 :             bail!(
     596            0 :                 "invalid timeline ID, got {}, expected {}",
     597            0 :                 msg.timeline_id,
     598            0 :                 self.state.timeline_id
     599            0 :             );
     600         1848 :         }
     601         1848 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     602            0 :             bail!(
     603            0 :                 "invalid wal_seg_size, got {}, expected {}",
     604            0 :                 msg.wal_seg_size,
     605            0 :                 self.state.server.wal_seg_size
     606            0 :             );
     607         1848 :         }
     608         1848 : 
     609         1848 :         // system_id will be updated on mismatch
     610         1848 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     611         1848 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     612          473 :             if self.state.server.system_id != 0 {
     613            0 :                 warn!(
     614            0 :                     "unexpected system ID arrived, got {}, expected {}",
     615            0 :                     msg.system_id, self.state.server.system_id
     616            0 :                 );
     617          473 :             }
     618              : 
     619          473 :             let mut state = self.state.start_change();
     620          473 :             state.server.system_id = msg.system_id;
     621          473 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     622          473 :                 state.server.pg_version = msg.pg_version;
     623          473 :             }
     624         1419 :             self.state.finish_change(&state).await?;
     625         1375 :         }
     626              : 
     627         1848 :         info!(
     628         1848 :             "processed greeting from walproposer {}, sending term {:?}",
     629        29568 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     630         1848 :             self.state.acceptor_state.term
     631         1848 :         );
     632         1848 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     633         1848 :             term: self.state.acceptor_state.term,
     634         1848 :             node_id: self.node_id,
     635         1848 :         })))
     636         1848 :     }
     637              : 
     638              :     /// Give vote for the given term, if we haven't done that previously.
     639         1849 :     async fn handle_vote_request(
     640         1849 :         &mut self,
     641         1849 :         msg: &VoteRequest,
     642         1849 :     ) -> Result<Option<AcceptorProposerMessage>> {
     643         1849 :         // Once voted, we won't accept data from older proposers; flush
     644         1849 :         // everything we've already received so that new proposer starts
     645         1849 :         // streaming at end of our WAL, without overlap. Currently we truncate
     646         1849 :         // WAL at streaming point, so this avoids truncating already committed
     647         1849 :         // WAL.
     648         1849 :         //
     649         1849 :         // TODO: it would be smoother to not truncate committed piece at
     650         1849 :         // handle_elected instead. Currently not a big deal, as proposer is the
     651         1849 :         // only source of WAL; with peer2peer recovery it would be more
     652         1849 :         // important.
     653         1849 :         self.wal_store.flush_wal().await?;
     654              :         // initialize with refusal
     655         1849 :         let mut resp = VoteResponse {
     656         1849 :             term: self.state.acceptor_state.term,
     657         1849 :             vote_given: false as u64,
     658         1849 :             flush_lsn: self.flush_lsn(),
     659         1849 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
     660         1849 :             term_history: self.get_term_history(),
     661         1849 :             timeline_start_lsn: self.state.timeline_start_lsn,
     662         1849 :         };
     663         1849 :         if self.state.acceptor_state.term < msg.term {
     664         1796 :             let mut state = self.state.start_change();
     665         1796 :             state.acceptor_state.term = msg.term;
     666         1796 :             // persist vote before sending it out
     667         5379 :             self.state.finish_change(&state).await?;
     668              : 
     669         1796 :             resp.term = self.state.acceptor_state.term;
     670         1796 :             resp.vote_given = true as u64;
     671           53 :         }
     672         1845 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     673         1849 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     674         1849 :     }
     675              : 
     676              :     /// Form AppendResponse from current state.
     677      1660293 :     fn append_response(&self) -> AppendResponse {
     678      1660293 :         let ar = AppendResponse {
     679      1660293 :             term: self.state.acceptor_state.term,
     680      1660293 :             flush_lsn: self.flush_lsn(),
     681      1660293 :             commit_lsn: self.state.commit_lsn,
     682      1660293 :             // will be filled by the upper code to avoid bothering safekeeper
     683      1660293 :             hs_feedback: HotStandbyFeedback::empty(),
     684      1660293 :             pageserver_feedback: PageserverFeedback::empty(),
     685      1660293 :         };
     686      1660293 :         trace!("formed AppendResponse {:?}", ar);
     687      1660293 :         ar
     688      1660293 :     }
     689              : 
     690          870 :     async fn handle_elected(
     691          870 :         &mut self,
     692          870 :         msg: &ProposerElected,
     693          870 :     ) -> Result<Option<AcceptorProposerMessage>> {
     694          868 :         info!("received ProposerElected {:?}", msg);
     695          870 :         if self.state.acceptor_state.term < msg.term {
     696            5 :             let mut state = self.state.start_change();
     697            5 :             state.acceptor_state.term = msg.term;
     698            9 :             self.state.finish_change(&state).await?;
     699          865 :         }
     700              : 
     701              :         // If our term is higher, ignore the message (next feedback will inform the compute)
     702          870 :         if self.state.acceptor_state.term > msg.term {
     703            0 :             return Ok(None);
     704          870 :         }
     705          870 : 
     706          870 :         // This might happen in a rare race when another (old) connection from
     707          870 :         // the same walproposer writes + flushes WAL after this connection
     708          870 :         // already sent flush_lsn in VoteRequest. It is generally safe to
     709          870 :         // proceed, but to prevent commit_lsn surprisingly going down we should
     710          870 :         // either refuse the session (simpler) or skip the part we already have
     711          870 :         // from the stream (can be implemented).
     712          870 :         if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
     713            0 :             bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
     714            0 :                    msg.term, self.flush_lsn(), msg.start_streaming_at)
     715          870 :         }
     716          870 :         // Otherwise we must never attempt to truncate committed data.
     717          870 :         assert!(
     718          870 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
     719            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
     720              :             msg.start_streaming_at,
     721              :             self.state.inmem.commit_lsn
     722              :         );
     723              : 
     724              :         // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
     725              :         // intersection of our history and history from msg
     726              : 
     727              :         // truncate wal, update the LSNs
     728      1588293 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     729              : 
     730              :         // and now adopt term history from proposer
     731              :         {
     732          870 :             let mut state = self.state.start_change();
     733          870 : 
     734          870 :             // Here we learn initial LSN for the first time, set fields
     735          870 :             // interested in that.
     736          870 : 
     737          870 :             if state.timeline_start_lsn == Lsn(0) {
     738              :                 // Remember point where WAL begins globally.
     739          479 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     740          477 :                 info!(
     741          477 :                     "setting timeline_start_lsn to {:?}",
     742          477 :                     state.timeline_start_lsn
     743          477 :                 );
     744          391 :             }
     745          870 :             if state.peer_horizon_lsn == Lsn(0) {
     746          476 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
     747          476 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
     748          476 :                 state.peer_horizon_lsn = msg.timeline_start_lsn;
     749          476 :             }
     750          870 :             if state.local_start_lsn == Lsn(0) {
     751          476 :                 state.local_start_lsn = msg.start_streaming_at;
     752          474 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     753          394 :             }
     754              :             // Initializing commit_lsn before acking first flushed record is
     755              :             // important to let find_end_of_wal skip the hole in the beginning
     756              :             // of the first segment.
     757              :             //
     758              :             // NB: on new clusters, this happens at the same time as
     759              :             // timeline_start_lsn initialization, it is taken outside to provide
     760              :             // upgrade.
     761          870 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
     762          870 : 
     763          870 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     764          870 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
     765          870 : 
     766          870 :             state.acceptor_state.term_history = msg.term_history.clone();
     767         2559 :             self.state.finish_change(&state).await?;
     768              :         }
     769              : 
     770          868 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     771              : 
     772              :         // Cache LSN where term starts to immediately fsync control file with
     773              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
     774              :         // persisted commit_lsn on majority of safekeepers aligns.
     775          870 :         self.epoch_start_lsn = match msg.term_history.0.last() {
     776            0 :             None => bail!("proposer elected with empty term history"),
     777          870 :             Some(term_lsn_start) => term_lsn_start.lsn,
     778          870 :         };
     779          870 : 
     780          870 :         Ok(None)
     781          870 :     }
     782              : 
     783              :     /// Advance commit_lsn taking into account what we have locally.
     784              :     ///
     785              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     786              :     /// already been done outside.
     787      2680548 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     788      2680548 :         // Both peers and walproposer communicate this value, we might already
     789      2680548 :         // have a fresher (higher) version.
     790      2680548 :         candidate = max(candidate, self.state.inmem.commit_lsn);
     791      2680548 :         let commit_lsn = min(candidate, self.flush_lsn());
     792      2680548 :         assert!(
     793      2680548 :             commit_lsn >= self.state.inmem.commit_lsn,
     794            0 :             "commit_lsn monotonicity violated: old={} new={}",
     795              :             self.state.inmem.commit_lsn,
     796              :             commit_lsn
     797              :         );
     798              : 
     799      2680548 :         self.state.inmem.commit_lsn = commit_lsn;
     800      2680548 : 
     801      2680548 :         // If new commit_lsn reached epoch switch, force sync of control
     802      2680548 :         // file: walproposer in sync mode is very interested when this
     803      2680548 :         // happens. Note: this is for sync-safekeepers mode only, as
     804      2680548 :         // otherwise commit_lsn might jump over epoch_start_lsn.
     805      2680548 :         if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
     806          102 :             self.state.flush().await?;
     807      2680514 :         }
     808              : 
     809      2680548 :         Ok(())
     810      2680548 :     }
     811              : 
     812              :     /// Persist control file if there is something to save and enough time
     813              :     /// passed after the last save.
     814         1737 :     pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<()> {
     815         1737 :         const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
     816         1737 :         if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
     817         1737 :             return Ok(());
     818            0 :         }
     819            0 :         let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn
     820            0 :             || self.state.inmem.backup_lsn > self.state.backup_lsn
     821            0 :             || self.state.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
     822            0 :             || self.state.inmem.remote_consistent_lsn > self.state.remote_consistent_lsn;
     823            0 :         if need_persist {
     824            0 :             self.state.flush().await?;
     825            0 :             trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
     826            0 :         }
     827            0 :         Ok(())
     828         1737 :     }
     829              : 
     830              :     /// Handle request to append WAL.
     831              :     #[allow(clippy::comparison_chain)]
     832      2672267 :     async fn handle_append_request(
     833      2672267 :         &mut self,
     834      2672267 :         msg: &AppendRequest,
     835      2672267 :         require_flush: bool,
     836      2672267 :     ) -> Result<Option<AcceptorProposerMessage>> {
     837      2672267 :         if self.state.acceptor_state.term < msg.h.term {
     838            0 :             bail!("got AppendRequest before ProposerElected");
     839      2672267 :         }
     840      2672267 : 
     841      2672267 :         // If our term is higher, immediately refuse the message.
     842      2672267 :         if self.state.acceptor_state.term > msg.h.term {
     843            0 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     844            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     845      2672267 :         }
     846      2672267 : 
     847      2672267 :         // Now we know that we are in the same term as the proposer,
     848      2672267 :         // processing the message.
     849      2672267 : 
     850      2672267 :         self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
     851      2672267 : 
     852      2672267 :         // do the job
     853      2672267 :         if !msg.wal_data.is_empty() {
     854      1712568 :             self.wal_store
     855      1712568 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     856      3147876 :                 .await?;
     857       959699 :         }
     858              : 
     859              :         // flush wal to the disk, if required
     860      2672265 :         if require_flush {
     861            7 :             self.wal_store.flush_wal().await?;
     862      2672258 :         }
     863              : 
     864              :         // Update commit_lsn.
     865      2672265 :         if msg.h.commit_lsn != Lsn(0) {
     866      2669584 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     867         2681 :         }
     868              :         // Value calculated by walproposer can always lag:
     869              :         // - safekeepers can forget inmem value and send to proposer lower
     870              :         //   persisted one on restart;
     871              :         // - if we make safekeepers always send persistent value,
     872              :         //   any compute restart would pull it down.
     873              :         // Thus, take max before adopting.
     874      2672265 :         self.state.inmem.peer_horizon_lsn =
     875      2672265 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
     876      2672265 : 
     877      2672265 :         // Update truncate and commit LSN in control file.
     878      2672265 :         // To avoid negative impact on performance of extra fsync, do it only
     879      2672265 :         // when commit_lsn delta exceeds WAL segment size.
     880      2672265 :         if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
     881      2672265 :             < self.state.inmem.commit_lsn
     882              :         {
     883         1469 :             self.state.flush().await?;
     884      2671765 :         }
     885              : 
     886            0 :         trace!(
     887            0 :             "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
     888            0 :             msg.wal_data.len(),
     889            0 :             msg.h.end_lsn,
     890            0 :             msg.h.commit_lsn,
     891            0 :             msg.h.truncate_lsn,
     892            0 :             require_flush,
     893            0 :         );
     894              : 
     895              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
     896      2672265 :         if !require_flush {
     897      2672258 :             return Ok(None);
     898            7 :         }
     899            7 : 
     900            7 :         let resp = self.append_response();
     901            7 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
     902      2672267 :     }
     903              : 
     904              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
     905      1660286 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
     906      1660286 :         self.wal_store.flush_wal().await?;
     907      1660286 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
     908      1660286 :             self.append_response(),
     909      1660286 :         )))
     910      1660286 :     }
     911              : 
     912              :     /// Update timeline state with peer safekeeper data.
     913        11104 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     914        11104 :         let mut sync_control_file = false;
     915        11104 : 
     916        11104 :         if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
     917              :             // Note: the check is too restrictive, generally we can update local
     918              :             // commit_lsn if our history matches (is part of) history of advanced
     919              :             // commit_lsn provider.
     920        11026 :             if sk_info.last_log_term == self.get_epoch() {
     921        10964 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
     922           62 :             }
     923           78 :         }
     924              : 
     925        11104 :         self.state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), self.state.inmem.backup_lsn);
     926        11104 :         sync_control_file |= self.state.backup_lsn + (self.state.server.wal_seg_size as u64)
     927        11104 :             < self.state.inmem.backup_lsn;
     928        11104 : 
     929        11104 :         self.state.inmem.remote_consistent_lsn = max(
     930        11104 :             Lsn(sk_info.remote_consistent_lsn),
     931        11104 :             self.state.inmem.remote_consistent_lsn,
     932        11104 :         );
     933        11104 :         sync_control_file |= self.state.remote_consistent_lsn
     934        11104 :             + (self.state.server.wal_seg_size as u64)
     935        11104 :             < self.state.inmem.remote_consistent_lsn;
     936        11104 : 
     937        11104 :         self.state.inmem.peer_horizon_lsn = max(
     938        11104 :             Lsn(sk_info.peer_horizon_lsn),
     939        11104 :             self.state.inmem.peer_horizon_lsn,
     940        11104 :         );
     941        11104 :         sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
     942        11104 :             < self.state.inmem.peer_horizon_lsn;
     943        11104 : 
     944        11104 :         if sync_control_file {
     945          894 :             self.state.flush().await?;
     946        10806 :         }
     947        11104 :         Ok(())
     948        11104 :     }
     949              : 
     950              :     /// Get oldest segno we still need to keep. We hold WAL till it is consumed
     951              :     /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
     952              :     /// offloading.
     953              :     /// While it is safe to use inmem values for determining horizon,
     954              :     /// we use persistent to make possible normal states less surprising.
     955         1737 :     pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
     956         1737 :         let mut horizon_lsn = min(
     957         1737 :             self.state.remote_consistent_lsn,
     958         1737 :             self.state.peer_horizon_lsn,
     959         1737 :         );
     960         1737 :         if wal_backup_enabled {
     961         1737 :             horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
     962         1737 :         }
     963         1737 :         horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
     964         1737 :     }
     965              : }
     966              : 
     967              : #[cfg(test)]
     968              : mod tests {
     969              :     use futures::future::BoxFuture;
     970              :     use postgres_ffi::WAL_SEGMENT_SIZE;
     971              : 
     972              :     use super::*;
     973              :     use crate::{
     974              :         state::{PersistedPeers, TimelinePersistentState},
     975              :         wal_storage::Storage,
     976              :     };
     977              :     use std::{ops::Deref, str::FromStr, time::Instant};
     978              : 
     979              :     // fake storage for tests
     980              :     struct InMemoryState {
     981              :         persisted_state: TimelinePersistentState,
     982              :     }
     983              : 
     984              :     #[async_trait::async_trait]
     985              :     impl control_file::Storage for InMemoryState {
     986            6 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     987            6 :             self.persisted_state = s.clone();
     988            6 :             Ok(())
     989           12 :         }
     990              : 
     991            0 :         fn last_persist_at(&self) -> Instant {
     992            0 :             Instant::now()
     993            0 :         }
     994              :     }
     995              : 
     996              :     impl Deref for InMemoryState {
     997              :         type Target = TimelinePersistentState;
     998              : 
     999          120 :         fn deref(&self) -> &Self::Target {
    1000          120 :             &self.persisted_state
    1001          120 :         }
    1002              :     }
    1003              : 
    1004            4 :     fn test_sk_state() -> TimelinePersistentState {
    1005            4 :         let mut state = TimelinePersistentState::empty();
    1006            4 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1007            4 :         state.tenant_id = TenantId::from([1u8; 16]);
    1008            4 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1009            4 :         state
    1010            4 :     }
    1011              : 
    1012              :     struct DummyWalStore {
    1013              :         lsn: Lsn,
    1014              :     }
    1015              : 
    1016              :     #[async_trait::async_trait]
    1017              :     impl wal_storage::Storage for DummyWalStore {
    1018           18 :         fn flush_lsn(&self) -> Lsn {
    1019           18 :             self.lsn
    1020           18 :         }
    1021              : 
    1022            4 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1023            4 :             self.lsn = startpos + buf.len() as u64;
    1024            4 :             Ok(())
    1025            8 :         }
    1026              : 
    1027            4 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1028            4 :             self.lsn = end_pos;
    1029            4 :             Ok(())
    1030            8 :         }
    1031              : 
    1032            8 :         async fn flush_wal(&mut self) -> Result<()> {
    1033            8 :             Ok(())
    1034           16 :         }
    1035              : 
    1036            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1037            0 :             Box::pin(async { Ok(()) })
    1038            0 :         }
    1039              : 
    1040            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1041            0 :             crate::metrics::WalStorageMetrics::default()
    1042            0 :         }
    1043              :     }
    1044              : 
    1045            2 :     #[tokio::test]
    1046            2 :     async fn test_voting() {
    1047            2 :         let storage = InMemoryState {
    1048            2 :             persisted_state: test_sk_state(),
    1049            2 :         };
    1050            2 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1051            2 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1052            2 : 
    1053            2 :         // check voting for 1 is ok
    1054            2 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1055            2 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1056            2 :         match vote_resp.unwrap() {
    1057            2 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1058            2 :             r => panic!("unexpected response: {:?}", r),
    1059            2 :         }
    1060            2 : 
    1061            2 :         // reboot...
    1062            2 :         let state = sk.state.deref().clone();
    1063            2 :         let storage = InMemoryState {
    1064            2 :             persisted_state: state,
    1065            2 :         };
    1066            2 : 
    1067            2 :         sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
    1068            2 : 
    1069            2 :         // and ensure voting second time for 1 is not ok
    1070            2 :         vote_resp = sk.process_msg(&vote_request).await;
    1071            2 :         match vote_resp.unwrap() {
    1072            2 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1073            2 :             r => panic!("unexpected response: {:?}", r),
    1074            2 :         }
    1075            2 :     }
    1076              : 
    1077            2 :     #[tokio::test]
    1078            2 :     async fn test_epoch_switch() {
    1079            2 :         let storage = InMemoryState {
    1080            2 :             persisted_state: test_sk_state(),
    1081            2 :         };
    1082            2 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1083            2 : 
    1084            2 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1085            2 : 
    1086            2 :         let mut ar_hdr = AppendRequestHeader {
    1087            2 :             term: 1,
    1088            2 :             epoch_start_lsn: Lsn(3),
    1089            2 :             begin_lsn: Lsn(1),
    1090            2 :             end_lsn: Lsn(2),
    1091            2 :             commit_lsn: Lsn(0),
    1092            2 :             truncate_lsn: Lsn(0),
    1093            2 :             proposer_uuid: [0; 16],
    1094            2 :         };
    1095            2 :         let mut append_request = AppendRequest {
    1096            2 :             h: ar_hdr.clone(),
    1097            2 :             wal_data: Bytes::from_static(b"b"),
    1098            2 :         };
    1099            2 : 
    1100            2 :         let pem = ProposerElected {
    1101            2 :             term: 1,
    1102            2 :             start_streaming_at: Lsn(1),
    1103            2 :             term_history: TermHistory(vec![TermLsn {
    1104            2 :                 term: 1,
    1105            2 :                 lsn: Lsn(3),
    1106            2 :             }]),
    1107            2 :             timeline_start_lsn: Lsn(0),
    1108            2 :         };
    1109            2 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1110            2 :             .await
    1111            2 :             .unwrap();
    1112            2 : 
    1113            2 :         // check that AppendRequest before epochStartLsn doesn't switch epoch
    1114            2 :         let resp = sk
    1115            2 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1116            2 :             .await;
    1117            2 :         assert!(resp.is_ok());
    1118            2 :         assert_eq!(sk.get_epoch(), 0);
    1119            2 : 
    1120            2 :         // but record at epochStartLsn does the switch
    1121            2 :         ar_hdr.begin_lsn = Lsn(2);
    1122            2 :         ar_hdr.end_lsn = Lsn(3);
    1123            2 :         append_request = AppendRequest {
    1124            2 :             h: ar_hdr,
    1125            2 :             wal_data: Bytes::from_static(b"b"),
    1126            2 :         };
    1127            2 :         let resp = sk
    1128            2 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1129            2 :             .await;
    1130            2 :         assert!(resp.is_ok());
    1131            2 :         sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
    1132            2 :         assert_eq!(sk.get_epoch(), 1);
    1133            2 :     }
    1134              : 
    1135            2 :     #[test]
    1136            2 :     fn test_find_highest_common_point_none() {
    1137            2 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1138            2 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1139            2 :         assert_eq!(
    1140            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1141            2 :             None
    1142            2 :         );
    1143            2 :     }
    1144              : 
    1145            2 :     #[test]
    1146            2 :     fn test_find_highest_common_point_middle() {
    1147            2 :         let prop_th = TermHistory(vec![
    1148            2 :             (1, Lsn(10)).into(),
    1149            2 :             (2, Lsn(20)).into(),
    1150            2 :             (4, Lsn(40)).into(),
    1151            2 :         ]);
    1152            2 :         let sk_th = TermHistory(vec![
    1153            2 :             (1, Lsn(10)).into(),
    1154            2 :             (2, Lsn(20)).into(),
    1155            2 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1156            2 :         ]);
    1157            2 :         assert_eq!(
    1158            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1159            2 :             Some(TermLsn {
    1160            2 :                 term: 2,
    1161            2 :                 lsn: Lsn(30),
    1162            2 :             })
    1163            2 :         );
    1164            2 :     }
    1165              : 
    1166            2 :     #[test]
    1167            2 :     fn test_find_highest_common_point_sk_end() {
    1168            2 :         let prop_th = TermHistory(vec![
    1169            2 :             (1, Lsn(10)).into(),
    1170            2 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1171            2 :             (4, Lsn(40)).into(),
    1172            2 :         ]);
    1173            2 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1174            2 :         assert_eq!(
    1175            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1176            2 :             Some(TermLsn {
    1177            2 :                 term: 2,
    1178            2 :                 lsn: Lsn(32),
    1179            2 :             })
    1180            2 :         );
    1181            2 :     }
    1182              : 
    1183            2 :     #[test]
    1184            2 :     fn test_find_highest_common_point_walprop() {
    1185            2 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1186            2 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1187            2 :         assert_eq!(
    1188            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1189            2 :             Some(TermLsn {
    1190            2 :                 term: 2,
    1191            2 :                 lsn: Lsn(32),
    1192            2 :             })
    1193            2 :         );
    1194            2 :     }
    1195              : 
    1196            2 :     #[test]
    1197            2 :     fn test_sk_state_bincode_serde_roundtrip() {
    1198            2 :         use utils::Hex;
    1199            2 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1200            2 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1201            2 :         let state = TimelinePersistentState {
    1202            2 :             tenant_id,
    1203            2 :             timeline_id,
    1204            2 :             acceptor_state: AcceptorState {
    1205            2 :                 term: 42,
    1206            2 :                 term_history: TermHistory(vec![TermLsn {
    1207            2 :                     lsn: Lsn(0x1),
    1208            2 :                     term: 41,
    1209            2 :                 }]),
    1210            2 :             },
    1211            2 :             server: ServerInfo {
    1212            2 :                 pg_version: 14,
    1213            2 :                 system_id: 0x1234567887654321,
    1214            2 :                 wal_seg_size: 0x12345678,
    1215            2 :             },
    1216            2 :             proposer_uuid: {
    1217            2 :                 let mut arr = timeline_id.as_arr();
    1218            2 :                 arr.reverse();
    1219            2 :                 arr
    1220            2 :             },
    1221            2 :             timeline_start_lsn: Lsn(0x12345600),
    1222            2 :             local_start_lsn: Lsn(0x12),
    1223            2 :             commit_lsn: Lsn(1234567800),
    1224            2 :             backup_lsn: Lsn(1234567300),
    1225            2 :             peer_horizon_lsn: Lsn(9999999),
    1226            2 :             remote_consistent_lsn: Lsn(1234560000),
    1227            2 :             peers: PersistedPeers(vec![(
    1228            2 :                 NodeId(1),
    1229            2 :                 PersistedPeerInfo {
    1230            2 :                     backup_lsn: Lsn(1234567000),
    1231            2 :                     term: 42,
    1232            2 :                     flush_lsn: Lsn(1234567800 - 8),
    1233            2 :                     commit_lsn: Lsn(1234567600),
    1234            2 :                 },
    1235            2 :             )]),
    1236            2 :         };
    1237            2 : 
    1238            2 :         let ser = state.ser().unwrap();
    1239            2 : 
    1240            2 :         #[rustfmt::skip]
    1241            2 :         let expected = [
    1242            2 :             // tenant_id as length prefixed hex
    1243            2 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1244            2 :             0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
    1245            2 :             // timeline_id as length prefixed hex
    1246            2 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1247            2 :             0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
    1248            2 :             // term
    1249            2 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1250            2 :             // length prefix
    1251            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1252            2 :             // unsure why this order is swapped
    1253            2 :             0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1254            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1255            2 :             // pg_version
    1256            2 :             0x0e, 0x00, 0x00, 0x00,
    1257            2 :             // systemid
    1258            2 :             0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
    1259            2 :             // wal_seg_size
    1260            2 :             0x78, 0x56, 0x34, 0x12,
    1261            2 :             // pguuid as length prefixed hex
    1262            2 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1263            2 :             0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
    1264            2 : 
    1265            2 :             // timeline_start_lsn
    1266            2 :             0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
    1267            2 :             0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1268            2 :             0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1269            2 :             0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1270            2 :             0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
    1271            2 :             0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1272            2 :             // length prefix for persistentpeers
    1273            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1274            2 :             // nodeid
    1275            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1276            2 :             // backuplsn
    1277            2 :             0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1278            2 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1279            2 :             0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1280            2 :             0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1281            2 :         ];
    1282            2 : 
    1283            2 :         assert_eq!(Hex(&ser), Hex(&expected));
    1284              : 
    1285            2 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1286            2 : 
    1287            2 :         assert_eq!(deser, state);
    1288            2 :     }
    1289              : }
        

Generated by: LCOV version 2.1-beta