LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 7eb96e224e685167ad85f58f858387d8cf253f63.info Lines: 86.2 % 905 780
Test Date: 2024-09-23 21:23:07 Functions: 52.8 % 216 114

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use anyhow::{bail, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt};
       5              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6              : 
       7              : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
       8              : use serde::{Deserialize, Serialize};
       9              : use std::cmp::max;
      10              : use std::cmp::min;
      11              : use std::fmt;
      12              : use std::io::Read;
      13              : use storage_broker::proto::SafekeeperTimelineInfo;
      14              : 
      15              : use tracing::*;
      16              : 
      17              : use crate::control_file;
      18              : use crate::metrics::MISC_OPERATION_SECONDS;
      19              : use crate::send_wal::HotStandbyFeedback;
      20              : 
      21              : use crate::state::TimelineState;
      22              : use crate::wal_storage;
      23              : use pq_proto::SystemId;
      24              : use utils::pageserver_feedback::PageserverFeedback;
      25              : use utils::{
      26              :     bin_ser::LeSer,
      27              :     id::{NodeId, TenantId, TimelineId},
      28              :     lsn::Lsn,
      29              : };
      30              : 
      31              : const SK_PROTOCOL_VERSION: u32 = 2;
      32              : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      33              : 
      34              : /// Consensus logical timestamp.
      35              : pub type Term = u64;
      36              : pub const INVALID_TERM: Term = 0;
      37              : 
      38            4 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      39              : pub struct TermLsn {
      40              :     pub term: Term,
      41              :     pub lsn: Lsn,
      42              : }
      43              : 
      44              : // Creation from tuple provides less typing (e.g. for unit tests).
      45              : impl From<(Term, Lsn)> for TermLsn {
      46           18 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      47           18 :         TermLsn {
      48           18 :             term: pair.0,
      49           18 :             lsn: pair.1,
      50           18 :         }
      51           18 :     }
      52              : }
      53              : 
      54            6 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      55              : pub struct TermHistory(pub Vec<TermLsn>);
      56              : 
      57              : impl TermHistory {
      58         5719 :     pub fn empty() -> TermHistory {
      59         5719 :         TermHistory(Vec::new())
      60         5719 :     }
      61              : 
      62              :     // Parse TermHistory as n_entries followed by TermLsn pairs
      63         3913 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      64         3913 :         if bytes.remaining() < 4 {
      65            0 :             bail!("TermHistory misses len");
      66         3913 :         }
      67         3913 :         let n_entries = bytes.get_u32_le();
      68         3913 :         let mut res = Vec::with_capacity(n_entries as usize);
      69         3913 :         for _ in 0..n_entries {
      70        37816 :             if bytes.remaining() < 16 {
      71            0 :                 bail!("TermHistory is incomplete");
      72        37816 :             }
      73        37816 :             res.push(TermLsn {
      74        37816 :                 term: bytes.get_u64_le(),
      75        37816 :                 lsn: bytes.get_u64_le().into(),
      76        37816 :             })
      77              :         }
      78         3913 :         Ok(TermHistory(res))
      79         3913 :     }
      80              : 
      81              :     /// Return copy of self with switches happening strictly after up_to
      82              :     /// truncated.
      83        16888 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      84        16888 :         let mut res = Vec::with_capacity(self.0.len());
      85       109521 :         for e in &self.0 {
      86        92648 :             if e.lsn > up_to {
      87           15 :                 break;
      88        92633 :             }
      89        92633 :             res.push(*e);
      90              :         }
      91        16888 :         TermHistory(res)
      92        16888 :     }
      93              : 
      94              :     /// Find point of divergence between leader (walproposer) term history and
      95              :     /// safekeeper. Arguments are not symmetric as proposer history ends at
      96              :     /// +infinity while safekeeper at flush_lsn.
      97              :     /// C version is at walproposer SendProposerElected.
      98         3919 :     pub fn find_highest_common_point(
      99         3919 :         prop_th: &TermHistory,
     100         3919 :         sk_th: &TermHistory,
     101         3919 :         sk_wal_end: Lsn,
     102         3919 :     ) -> Option<TermLsn> {
     103         3919 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     104              : 
     105         3919 :         if let Some(sk_th_last) = sk_th.last() {
     106         3354 :             assert!(
     107         3354 :                 sk_th_last.lsn <= sk_wal_end,
     108            0 :                 "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
     109              :                 sk_th_last,
     110              :                 sk_wal_end
     111              :             );
     112          565 :         }
     113              : 
     114              :         // find last common term, if any...
     115         3919 :         let mut last_common_idx = None;
     116        33592 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     117        33592 :             if prop_th[i].term != sk_th[i].term {
     118           19 :                 break;
     119        33573 :             }
     120        33573 :             // If term is the same, LSN must be equal as well.
     121        33573 :             assert!(
     122        33573 :                 prop_th[i].lsn == sk_th[i].lsn,
     123            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     124            0 :                 prop_th[i].term,
     125            0 :                 prop_th[i].lsn,
     126            0 :                 sk_th[i].lsn
     127              :             );
     128        33573 :             last_common_idx = Some(i);
     129              :         }
     130         3919 :         let last_common_idx = match last_common_idx {
     131          567 :             None => return None, // no common point
     132         3352 :             Some(lci) => lci,
     133         3352 :         };
     134         3352 :         // Now find where it ends at both prop and sk and take min. End of
     135         3352 :         // (common) term is the start of the next except it is the last one;
     136         3352 :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     137         3352 :         // +infinity, so we just take flush_lsn then.
     138         3352 :         if last_common_idx == prop_th.len() - 1 {
     139          264 :             Some(TermLsn {
     140          264 :                 term: prop_th[last_common_idx].term,
     141          264 :                 lsn: sk_wal_end,
     142          264 :             })
     143              :         } else {
     144         3088 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     145         3088 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     146           17 :                 sk_th[last_common_idx + 1].lsn
     147              :             } else {
     148         3071 :                 sk_wal_end
     149              :             };
     150         3088 :             Some(TermLsn {
     151         3088 :                 term: prop_th[last_common_idx].term,
     152         3088 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     153         3088 :             })
     154              :         }
     155         3919 :     }
     156              : }
     157              : 
     158              : /// Display only latest entries for Debug.
     159              : impl fmt::Debug for TermHistory {
     160          766 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     161          766 :         let n_printed = 20;
     162          766 :         write!(
     163          766 :             fmt,
     164          766 :             "{}{:?}",
     165          766 :             if self.0.len() > n_printed { "... " } else { "" },
     166          766 :             self.0
     167          766 :                 .iter()
     168          766 :                 .rev()
     169          766 :                 .take(n_printed)
     170         1787 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     171          766 :                 .collect::<Vec<_>>()
     172          766 :         )
     173          766 :     }
     174              : }
     175              : 
     176              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     177              : pub type PgUuid = [u8; 16];
     178              : 
     179              : /// Persistent consensus state of the acceptor.
     180            6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     181              : pub struct AcceptorState {
     182              :     /// acceptor's last term it voted for (advanced in 1 phase)
     183              :     pub term: Term,
     184              :     /// History of term switches for safekeeper's WAL.
     185              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     186              :     /// from the proposer before recovery.
     187              :     pub term_history: TermHistory,
     188              : }
     189              : 
     190              : impl AcceptorState {
     191              :     /// acceptor's last_log_term is the term of the highest entry in the log
     192           43 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     193           43 :         let th = self.term_history.up_to(flush_lsn);
     194           43 :         match th.0.last() {
     195           37 :             Some(e) => e.term,
     196            6 :             None => 0,
     197              :         }
     198           43 :     }
     199              : }
     200              : 
     201              : /// Information about Postgres. Safekeeper gets it once and then verifies
     202              : /// all further connections from computes match.
     203            4 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     204              : pub struct ServerInfo {
     205              :     /// Postgres server version
     206              :     pub pg_version: u32,
     207              :     pub system_id: SystemId,
     208              :     pub wal_seg_size: u32,
     209              : }
     210              : 
     211            2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     212              : pub struct PersistedPeerInfo {
     213              :     /// LSN up to which safekeeper offloaded WAL to s3.
     214              :     pub backup_lsn: Lsn,
     215              :     /// Term of the last entry.
     216              :     pub term: Term,
     217              :     /// LSN of the last record.
     218              :     pub flush_lsn: Lsn,
     219              :     /// Up to which LSN safekeeper regards its WAL as committed.
     220              :     pub commit_lsn: Lsn,
     221              : }
     222              : 
     223              : impl PersistedPeerInfo {
     224            0 :     pub fn new() -> Self {
     225            0 :         Self {
     226            0 :             backup_lsn: Lsn::INVALID,
     227            0 :             term: INVALID_TERM,
     228            0 :             flush_lsn: Lsn(0),
     229            0 :             commit_lsn: Lsn(0),
     230            0 :         }
     231            0 :     }
     232              : }
     233              : 
     234              : // make clippy happy
     235              : impl Default for PersistedPeerInfo {
     236            0 :     fn default() -> Self {
     237            0 :         Self::new()
     238            0 :     }
     239              : }
     240              : 
     241              : // protocol messages
     242              : 
     243              : /// Initial Proposer -> Acceptor message
     244        77950 : #[derive(Debug, Deserialize)]
     245              : pub struct ProposerGreeting {
     246              :     /// proposer-acceptor protocol version
     247              :     pub protocol_version: u32,
     248              :     /// Postgres server version
     249              :     pub pg_version: u32,
     250              :     pub proposer_id: PgUuid,
     251              :     pub system_id: SystemId,
     252              :     pub timeline_id: TimelineId,
     253              :     pub tenant_id: TenantId,
     254              :     pub tli: TimeLineID,
     255              :     pub wal_seg_size: u32,
     256              : }
     257              : 
     258              : /// Acceptor -> Proposer initial response: the highest term known to me
     259              : /// (acceptor voted for).
     260              : #[derive(Debug, Serialize)]
     261              : pub struct AcceptorGreeting {
     262              :     term: u64,
     263              :     node_id: NodeId,
     264              : }
     265              : 
     266              : /// Vote request sent from proposer to safekeepers
     267        12928 : #[derive(Debug, Deserialize)]
     268              : pub struct VoteRequest {
     269              :     pub term: Term,
     270              : }
     271              : 
     272              : /// Vote itself, sent from safekeeper to proposer
     273              : #[derive(Debug, Serialize)]
     274              : pub struct VoteResponse {
     275              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     276              :     vote_given: u64, // fixme u64 due to padding
     277              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     278              :     // proposer to choose the most advanced one.
     279              :     pub flush_lsn: Lsn,
     280              :     truncate_lsn: Lsn,
     281              :     pub term_history: TermHistory,
     282              :     timeline_start_lsn: Lsn,
     283              : }
     284              : 
     285              : /*
     286              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     287              :  * term history to it.
     288              :  */
     289              : #[derive(Debug)]
     290              : pub struct ProposerElected {
     291              :     pub term: Term,
     292              :     pub start_streaming_at: Lsn,
     293              :     pub term_history: TermHistory,
     294              :     pub timeline_start_lsn: Lsn,
     295              : }
     296              : 
     297              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     298              : /// communicates commit_lsn.
     299              : #[derive(Debug)]
     300              : pub struct AppendRequest {
     301              :     pub h: AppendRequestHeader,
     302              :     pub wal_data: Bytes,
     303              : }
     304        13151 : #[derive(Debug, Clone, Deserialize)]
     305              : pub struct AppendRequestHeader {
     306              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     307              :     pub term: Term,
     308              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     309              :     // switch can be taken from ProposerElected (as well as from term history).
     310              :     pub term_start_lsn: Lsn,
     311              :     /// start position of message in WAL
     312              :     pub begin_lsn: Lsn,
     313              :     /// end position of message in WAL
     314              :     pub end_lsn: Lsn,
     315              :     /// LSN committed by quorum of safekeepers
     316              :     pub commit_lsn: Lsn,
     317              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     318              :     pub truncate_lsn: Lsn,
     319              :     // only for logging/debugging
     320              :     pub proposer_uuid: PgUuid,
     321              : }
     322              : 
     323              : /// Report safekeeper state to proposer
     324              : #[derive(Debug, Serialize, Clone)]
     325              : pub struct AppendResponse {
     326              :     // Current term of the safekeeper; if it is higher than proposer's, the
     327              :     // compute is out of date.
     328              :     pub term: Term,
     329              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     330              :     // term history this value comes, either checking history directly or
     331              :     // observing term being set to one for which WAL truncation is known to have
     332              :     // happened.
     333              :     pub flush_lsn: Lsn,
     334              :     // We report back our awareness about which WAL is committed, as this is
     335              :     // a criterion for walproposer --sync mode exit
     336              :     pub commit_lsn: Lsn,
     337              :     pub hs_feedback: HotStandbyFeedback,
     338              :     pub pageserver_feedback: Option<PageserverFeedback>,
     339              : }
     340              : 
     341              : impl AppendResponse {
     342            0 :     fn term_only(term: Term) -> AppendResponse {
     343            0 :         AppendResponse {
     344            0 :             term,
     345            0 :             flush_lsn: Lsn(0),
     346            0 :             commit_lsn: Lsn(0),
     347            0 :             hs_feedback: HotStandbyFeedback::empty(),
     348            0 :             pageserver_feedback: None,
     349            0 :         }
     350            0 :     }
     351              : }
     352              : 
     353              : /// Proposer -> Acceptor messages
     354              : #[derive(Debug)]
     355              : pub enum ProposerAcceptorMessage {
     356              :     Greeting(ProposerGreeting),
     357              :     VoteRequest(VoteRequest),
     358              :     Elected(ProposerElected),
     359              :     AppendRequest(AppendRequest),
     360              :     NoFlushAppendRequest(AppendRequest),
     361              :     FlushWAL,
     362              : }
     363              : 
     364              : impl ProposerAcceptorMessage {
     365              :     /// Parse proposer message.
     366       107942 :     pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
     367       107942 :         // xxx using Reader is inefficient but easy to work with bincode
     368       107942 :         let mut stream = msg_bytes.reader();
     369              :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     370       107942 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     371       107942 :         match tag {
     372              :             'g' => {
     373        77950 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     374        77950 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     375              :             }
     376              :             'v' => {
     377        12928 :                 let msg = VoteRequest::des_from(&mut stream)?;
     378        12928 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     379              :             }
     380              :             'e' => {
     381         3913 :                 let mut msg_bytes = stream.into_inner();
     382         3913 :                 if msg_bytes.remaining() < 16 {
     383            0 :                     bail!("ProposerElected message is not complete");
     384         3913 :                 }
     385         3913 :                 let term = msg_bytes.get_u64_le();
     386         3913 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     387         3913 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     388         3913 :                 if msg_bytes.remaining() < 8 {
     389            0 :                     bail!("ProposerElected message is not complete");
     390         3913 :                 }
     391         3913 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     392         3913 :                 let msg = ProposerElected {
     393         3913 :                     term,
     394         3913 :                     start_streaming_at,
     395         3913 :                     timeline_start_lsn,
     396         3913 :                     term_history,
     397         3913 :                 };
     398         3913 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     399              :             }
     400              :             'a' => {
     401              :                 // read header followed by wal data
     402        13151 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     403        13151 :                 let rec_size = hdr
     404        13151 :                     .end_lsn
     405        13151 :                     .checked_sub(hdr.begin_lsn)
     406        13151 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     407              :                     .0 as usize;
     408        13151 :                 if rec_size > MAX_SEND_SIZE {
     409            0 :                     bail!(
     410            0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     411            0 :                         MAX_SEND_SIZE
     412            0 :                     );
     413        13151 :                 }
     414        13151 : 
     415        13151 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     416        13151 :                 stream.read_exact(&mut wal_data_vec)?;
     417        13151 :                 let wal_data = Bytes::from(wal_data_vec);
     418        13151 :                 let msg = AppendRequest { h: hdr, wal_data };
     419        13151 : 
     420        13151 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     421              :             }
     422            0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     423              :         }
     424       107942 :     }
     425              : }
     426              : 
     427              : /// Acceptor -> Proposer messages
     428              : #[derive(Debug)]
     429              : pub enum AcceptorProposerMessage {
     430              :     Greeting(AcceptorGreeting),
     431              :     VoteResponse(VoteResponse),
     432              :     AppendResponse(AppendResponse),
     433              : }
     434              : 
     435              : impl AcceptorProposerMessage {
     436              :     /// Serialize acceptor -> proposer message.
     437       101772 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     438       101772 :         match self {
     439        77950 :             AcceptorProposerMessage::Greeting(msg) => {
     440        77950 :                 buf.put_u64_le('g' as u64);
     441        77950 :                 buf.put_u64_le(msg.term);
     442        77950 :                 buf.put_u64_le(msg.node_id.0);
     443        77950 :             }
     444        12928 :             AcceptorProposerMessage::VoteResponse(msg) => {
     445        12928 :                 buf.put_u64_le('v' as u64);
     446        12928 :                 buf.put_u64_le(msg.term);
     447        12928 :                 buf.put_u64_le(msg.vote_given);
     448        12928 :                 buf.put_u64_le(msg.flush_lsn.into());
     449        12928 :                 buf.put_u64_le(msg.truncate_lsn.into());
     450        12928 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     451        71743 :                 for e in &msg.term_history.0 {
     452        58815 :                     buf.put_u64_le(e.term);
     453        58815 :                     buf.put_u64_le(e.lsn.into());
     454        58815 :                 }
     455        12928 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     456              :             }
     457        10894 :             AcceptorProposerMessage::AppendResponse(msg) => {
     458        10894 :                 buf.put_u64_le('a' as u64);
     459        10894 :                 buf.put_u64_le(msg.term);
     460        10894 :                 buf.put_u64_le(msg.flush_lsn.into());
     461        10894 :                 buf.put_u64_le(msg.commit_lsn.into());
     462        10894 :                 buf.put_i64_le(msg.hs_feedback.ts);
     463        10894 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     464        10894 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     465              : 
     466              :                 // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     467              :                 // if it is not present.
     468        10894 :                 if let Some(ref msg) = msg.pageserver_feedback {
     469            0 :                     msg.serialize(buf);
     470        10894 :                 }
     471              :             }
     472              :         }
     473              : 
     474       101772 :         Ok(())
     475       101772 :     }
     476              : }
     477              : 
     478              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     479              : /// It controls all WAL disk writes and updates of control file.
     480              : ///
     481              : /// Currently safekeeper processes:
     482              : /// - messages from compute (proposers) and provides replies
     483              : /// - messages from broker peers
     484              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     485              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     486              :     /// determines last_log_term switch point.
     487              :     pub term_start_lsn: Lsn,
     488              : 
     489              :     pub state: TimelineState<CTRL>, // persistent state storage
     490              :     pub wal_store: WAL,
     491              : 
     492              :     node_id: NodeId, // safekeeper's node id
     493              : }
     494              : 
     495              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     496              : where
     497              :     CTRL: control_file::Storage,
     498              :     WAL: wal_storage::Storage,
     499              : {
     500              :     /// Accepts a control file storage containing the safekeeper state.
     501              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     502              :     /// and `server` (`wal_seg_size` inside it) fields.
     503        34774 :     pub fn new(
     504        34774 :         state: TimelineState<CTRL>,
     505        34774 :         wal_store: WAL,
     506        34774 :         node_id: NodeId,
     507        34774 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     508        34774 :         if state.tenant_id == TenantId::from([0u8; 16])
     509        34774 :             || state.timeline_id == TimelineId::from([0u8; 16])
     510              :         {
     511            0 :             bail!(
     512            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     513            0 :                 state.tenant_id,
     514            0 :                 state.timeline_id
     515            0 :             );
     516        34774 :         }
     517        34774 : 
     518        34774 :         Ok(SafeKeeper {
     519        34774 :             term_start_lsn: Lsn(0),
     520        34774 :             state,
     521        34774 :             wal_store,
     522        34774 :             node_id,
     523        34774 :         })
     524        34774 :     }
     525              : 
     526              :     /// Get history of term switches for the available WAL
     527        16845 :     fn get_term_history(&self) -> TermHistory {
     528        16845 :         self.state
     529        16845 :             .acceptor_state
     530        16845 :             .term_history
     531        16845 :             .up_to(self.flush_lsn())
     532        16845 :     }
     533              : 
     534           43 :     pub fn get_last_log_term(&self) -> Term {
     535           43 :         self.state
     536           43 :             .acceptor_state
     537           43 :             .get_last_log_term(self.flush_lsn())
     538           43 :     }
     539              : 
     540              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     541        51399 :     pub fn flush_lsn(&self) -> Lsn {
     542        51399 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     543        51399 :     }
     544              : 
     545              :     /// Process message from proposer and possibly form reply. Concurrent
     546              :     /// callers must exclude each other.
     547       118844 :     pub async fn process_msg(
     548       118844 :         &mut self,
     549       118844 :         msg: &ProposerAcceptorMessage,
     550       118844 :     ) -> Result<Option<AcceptorProposerMessage>> {
     551       118844 :         match msg {
     552        77950 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     553        12930 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     554         3915 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     555            4 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     556            4 :                 self.handle_append_request(msg, true).await
     557              :             }
     558        13151 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     559        13151 :                 self.handle_append_request(msg, false).await
     560              :             }
     561        10894 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     562              :         }
     563       118844 :     }
     564              : 
     565              :     /// Handle initial message from proposer: check its sanity and send my
     566              :     /// current term.
     567        77950 :     async fn handle_greeting(
     568        77950 :         &mut self,
     569        77950 :         msg: &ProposerGreeting,
     570        77950 :     ) -> Result<Option<AcceptorProposerMessage>> {
     571        77950 :         // Check protocol compatibility
     572        77950 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     573            0 :             bail!(
     574            0 :                 "incompatible protocol version {}, expected {}",
     575            0 :                 msg.protocol_version,
     576            0 :                 SK_PROTOCOL_VERSION
     577            0 :             );
     578        77950 :         }
     579        77950 :         /* Postgres major version mismatch is treated as fatal error
     580        77950 :          * because safekeepers parse WAL headers and the format
     581        77950 :          * may change between versions.
     582        77950 :          */
     583        77950 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     584            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     585              :         {
     586            0 :             bail!(
     587            0 :                 "incompatible server version {}, expected {}",
     588            0 :                 msg.pg_version,
     589            0 :                 self.state.server.pg_version
     590            0 :             );
     591        77950 :         }
     592        77950 : 
     593        77950 :         if msg.tenant_id != self.state.tenant_id {
     594            0 :             bail!(
     595            0 :                 "invalid tenant ID, got {}, expected {}",
     596            0 :                 msg.tenant_id,
     597            0 :                 self.state.tenant_id
     598            0 :             );
     599        77950 :         }
     600        77950 :         if msg.timeline_id != self.state.timeline_id {
     601            0 :             bail!(
     602            0 :                 "invalid timeline ID, got {}, expected {}",
     603            0 :                 msg.timeline_id,
     604            0 :                 self.state.timeline_id
     605            0 :             );
     606        77950 :         }
     607        77950 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     608            0 :             bail!(
     609            0 :                 "invalid wal_seg_size, got {}, expected {}",
     610            0 :                 msg.wal_seg_size,
     611            0 :                 self.state.server.wal_seg_size
     612            0 :             );
     613        77950 :         }
     614        77950 : 
     615        77950 :         // system_id will be updated on mismatch
     616        77950 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     617        77950 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     618            0 :             if self.state.server.system_id != 0 {
     619            0 :                 warn!(
     620            0 :                     "unexpected system ID arrived, got {}, expected {}",
     621            0 :                     msg.system_id, self.state.server.system_id
     622              :                 );
     623            0 :             }
     624              : 
     625            0 :             let mut state = self.state.start_change();
     626            0 :             state.server.system_id = msg.system_id;
     627            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     628            0 :                 state.server.pg_version = msg.pg_version;
     629            0 :             }
     630            0 :             self.state.finish_change(&state).await?;
     631        77950 :         }
     632              : 
     633        77950 :         info!(
     634            0 :             "processed greeting from walproposer {}, sending term {:?}",
     635         2416 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     636            0 :             self.state.acceptor_state.term
     637              :         );
     638        77950 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     639        77950 :             term: self.state.acceptor_state.term,
     640        77950 :             node_id: self.node_id,
     641        77950 :         })))
     642        77950 :     }
     643              : 
     644              :     /// Give vote for the given term, if we haven't done that previously.
     645        12930 :     async fn handle_vote_request(
     646        12930 :         &mut self,
     647        12930 :         msg: &VoteRequest,
     648        12930 :     ) -> Result<Option<AcceptorProposerMessage>> {
     649        12930 :         // Once voted, we won't accept data from older proposers; flush
     650        12930 :         // everything we've already received so that new proposer starts
     651        12930 :         // streaming at end of our WAL, without overlap. Currently we truncate
     652        12930 :         // WAL at streaming point, so this avoids truncating already committed
     653        12930 :         // WAL.
     654        12930 :         //
     655        12930 :         // TODO: it would be smoother to not truncate committed piece at
     656        12930 :         // handle_elected instead. Currently not a big deal, as proposer is the
     657        12930 :         // only source of WAL; with peer2peer recovery it would be more
     658        12930 :         // important.
     659        12930 :         self.wal_store.flush_wal().await?;
     660              :         // initialize with refusal
     661        12930 :         let mut resp = VoteResponse {
     662        12930 :             term: self.state.acceptor_state.term,
     663        12930 :             vote_given: false as u64,
     664        12930 :             flush_lsn: self.flush_lsn(),
     665        12930 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
     666        12930 :             term_history: self.get_term_history(),
     667        12930 :             timeline_start_lsn: self.state.timeline_start_lsn,
     668        12930 :         };
     669        12930 :         if self.state.acceptor_state.term < msg.term {
     670        12281 :             let mut state = self.state.start_change();
     671        12281 :             state.acceptor_state.term = msg.term;
     672        12281 :             // persist vote before sending it out
     673        12281 :             self.state.finish_change(&state).await?;
     674              : 
     675        12281 :             resp.term = self.state.acceptor_state.term;
     676        12281 :             resp.vote_given = true as u64;
     677          649 :         }
     678        12930 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     679        12930 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     680        12930 :     }
     681              : 
     682              :     /// Form AppendResponse from current state.
     683        10897 :     fn append_response(&self) -> AppendResponse {
     684        10897 :         let ar = AppendResponse {
     685        10897 :             term: self.state.acceptor_state.term,
     686        10897 :             flush_lsn: self.flush_lsn(),
     687        10897 :             commit_lsn: self.state.commit_lsn,
     688        10897 :             // will be filled by the upper code to avoid bothering safekeeper
     689        10897 :             hs_feedback: HotStandbyFeedback::empty(),
     690        10897 :             pageserver_feedback: None,
     691        10897 :         };
     692        10897 :         trace!("formed AppendResponse {:?}", ar);
     693        10897 :         ar
     694        10897 :     }
     695              : 
     696         3915 :     async fn handle_elected(
     697         3915 :         &mut self,
     698         3915 :         msg: &ProposerElected,
     699         3915 :     ) -> Result<Option<AcceptorProposerMessage>> {
     700         3915 :         let _timer = MISC_OPERATION_SECONDS
     701         3915 :             .with_label_values(&["handle_elected"])
     702         3915 :             .start_timer();
     703         3915 : 
     704         3915 :         info!(
     705            0 :             "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
     706            0 :             msg,
     707            0 :             self.state.acceptor_state.term,
     708            0 :             self.get_last_log_term(),
     709            0 :             self.flush_lsn()
     710              :         );
     711         3915 :         if self.state.acceptor_state.term < msg.term {
     712            2 :             let mut state = self.state.start_change();
     713            2 :             state.acceptor_state.term = msg.term;
     714            2 :             self.state.finish_change(&state).await?;
     715         3913 :         }
     716              : 
     717              :         // If our term is higher, ignore the message (next feedback will inform the compute)
     718         3915 :         if self.state.acceptor_state.term > msg.term {
     719            0 :             return Ok(None);
     720         3915 :         }
     721         3915 : 
     722         3915 :         // Before truncating WAL check-cross the check divergence point received
     723         3915 :         // from the walproposer.
     724         3915 :         let sk_th = self.get_term_history();
     725         3915 :         let last_common_point = match TermHistory::find_highest_common_point(
     726         3915 :             &msg.term_history,
     727         3915 :             &sk_th,
     728         3915 :             self.flush_lsn(),
     729         3915 :         ) {
     730              :             // No common point. Expect streaming from the beginning of the
     731              :             // history like walproposer while we don't have proper init.
     732          566 :             None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
     733          566 :                 "empty walproposer term history {:?}",
     734          566 :                 msg.term_history
     735          566 :             ))?,
     736         3349 :             Some(lcp) => lcp,
     737              :         };
     738              :         // This is expected to happen in a rare race when another connection
     739              :         // from the same walproposer writes + flushes WAL after this connection
     740              :         // sent flush_lsn in VoteRequest; for instance, very late
     741              :         // ProposerElected message delivery after another connection was
     742              :         // established and wrote WAL. In such cases error is transient;
     743              :         // reconnection makes safekeeper send newest term history and flush_lsn
     744              :         // and walproposer recalculates the streaming point. OTOH repeating
     745              :         // error indicates a serious bug.
     746         3915 :         if last_common_point.lsn != msg.start_streaming_at {
     747            0 :             bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
     748            0 :                     last_common_point, msg.start_streaming_at,
     749            0 :                     self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
     750            0 :             );
     751         3915 :         }
     752         3915 : 
     753         3915 :         // We are also expected to never attempt to truncate committed data.
     754         3915 :         assert!(
     755         3915 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
     756            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
     757            0 :             msg.start_streaming_at, self.state.inmem.commit_lsn,
     758            0 :             self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
     759              :         );
     760              : 
     761              :         // Before first WAL write initialize its segment. It makes first segment
     762              :         // pg_waldump'able because stream from compute doesn't include its
     763              :         // segment and page headers.
     764              :         //
     765              :         // If we fail before first WAL write flush this action would be
     766              :         // repeated, that's ok because it is idempotent.
     767         3915 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
     768          565 :             self.wal_store
     769          565 :                 .initialize_first_segment(msg.start_streaming_at)
     770            0 :                 .await?;
     771         3350 :         }
     772              : 
     773              :         // truncate wal, update the LSNs
     774         3915 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     775              : 
     776              :         // and now adopt term history from proposer
     777              :         {
     778         3915 :             let mut state = self.state.start_change();
     779         3915 : 
     780         3915 :             // Here we learn initial LSN for the first time, set fields
     781         3915 :             // interested in that.
     782         3915 : 
     783         3915 :             if state.timeline_start_lsn == Lsn(0) {
     784              :                 // Remember point where WAL begins globally.
     785          565 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     786          565 :                 info!(
     787            0 :                     "setting timeline_start_lsn to {:?}",
     788              :                     state.timeline_start_lsn
     789              :                 );
     790         3350 :             }
     791         3915 :             if state.peer_horizon_lsn == Lsn(0) {
     792          565 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
     793          565 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
     794          565 :                 state.peer_horizon_lsn = msg.timeline_start_lsn;
     795         3350 :             }
     796         3915 :             if state.local_start_lsn == Lsn(0) {
     797          565 :                 state.local_start_lsn = msg.start_streaming_at;
     798          565 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     799         3350 :             }
     800              :             // Initializing commit_lsn before acking first flushed record is
     801              :             // important to let find_end_of_wal skip the hole in the beginning
     802              :             // of the first segment.
     803              :             //
     804              :             // NB: on new clusters, this happens at the same time as
     805              :             // timeline_start_lsn initialization, it is taken outside to provide
     806              :             // upgrade.
     807         3915 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
     808         3915 : 
     809         3915 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     810         3915 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
     811         3915 :             // similar for remote_consistent_lsn
     812         3915 :             state.remote_consistent_lsn =
     813         3915 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
     814         3915 : 
     815         3915 :             state.acceptor_state.term_history = msg.term_history.clone();
     816         3915 :             self.state.finish_change(&state).await?;
     817              :         }
     818              : 
     819         3915 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     820              : 
     821              :         // Cache LSN where term starts to immediately fsync control file with
     822              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
     823              :         // persisted commit_lsn on majority of safekeepers aligns.
     824         3915 :         self.term_start_lsn = match msg.term_history.0.last() {
     825            0 :             None => bail!("proposer elected with empty term history"),
     826         3915 :             Some(term_lsn_start) => term_lsn_start.lsn,
     827         3915 :         };
     828         3915 : 
     829         3915 :         Ok(None)
     830         3915 :     }
     831              : 
     832              :     /// Advance commit_lsn taking into account what we have locally.
     833              :     ///
     834              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     835              :     /// already been done outside.
     836         6728 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     837         6728 :         // Both peers and walproposer communicate this value, we might already
     838         6728 :         // have a fresher (higher) version.
     839         6728 :         candidate = max(candidate, self.state.inmem.commit_lsn);
     840         6728 :         let commit_lsn = min(candidate, self.flush_lsn());
     841         6728 :         assert!(
     842         6728 :             commit_lsn >= self.state.inmem.commit_lsn,
     843            0 :             "commit_lsn monotonicity violated: old={} new={}",
     844              :             self.state.inmem.commit_lsn,
     845              :             commit_lsn
     846              :         );
     847              : 
     848         6728 :         self.state.inmem.commit_lsn = commit_lsn;
     849         6728 : 
     850         6728 :         // If new commit_lsn reached term switch, force sync of control
     851         6728 :         // file: walproposer in sync mode is very interested when this
     852         6728 :         // happens. Note: this is for sync-safekeepers mode only, as
     853         6728 :         // otherwise commit_lsn might jump over term_start_lsn.
     854         6728 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
     855          461 :             self.state.flush().await?;
     856         6267 :         }
     857              : 
     858         6728 :         Ok(())
     859         6728 :     }
     860              : 
     861              :     /// Handle request to append WAL.
     862              :     #[allow(clippy::comparison_chain)]
     863        13155 :     async fn handle_append_request(
     864        13155 :         &mut self,
     865        13155 :         msg: &AppendRequest,
     866        13155 :         require_flush: bool,
     867        13155 :     ) -> Result<Option<AcceptorProposerMessage>> {
     868        13155 :         if self.state.acceptor_state.term < msg.h.term {
     869            0 :             bail!("got AppendRequest before ProposerElected");
     870        13155 :         }
     871        13155 : 
     872        13155 :         // If our term is higher, immediately refuse the message.
     873        13155 :         if self.state.acceptor_state.term > msg.h.term {
     874            0 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     875            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     876        13155 :         }
     877        13155 : 
     878        13155 :         // Disallow any non-sequential writes, which can result in gaps or
     879        13155 :         // overwrites. If we need to move the pointer, ProposerElected message
     880        13155 :         // should have truncated WAL first accordingly. Note that the first
     881        13155 :         // condition (WAL rewrite) is quite expected in real world; it happens
     882        13155 :         // when walproposer reconnects to safekeeper and writes some more data
     883        13155 :         // while first connection still gets some packets later. It might be
     884        13155 :         // better to not log this as error! above.
     885        13155 :         let write_lsn = self.wal_store.write_lsn();
     886        13155 :         if write_lsn > msg.h.begin_lsn {
     887            1 :             bail!(
     888            1 :                 "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
     889            1 :                 write_lsn,
     890            1 :                 msg.h.begin_lsn
     891            1 :             );
     892        13154 :         }
     893        13154 :         if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
     894            0 :             bail!(
     895            0 :                 "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
     896            0 :                 write_lsn,
     897            0 :                 msg.h.begin_lsn,
     898            0 :             );
     899        13154 :         }
     900        13154 : 
     901        13154 :         // Now we know that we are in the same term as the proposer,
     902        13154 :         // processing the message.
     903        13154 : 
     904        13154 :         self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
     905        13154 : 
     906        13154 :         // do the job
     907        13154 :         if !msg.wal_data.is_empty() {
     908         2313 :             self.wal_store
     909         2313 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     910            0 :                 .await?;
     911        10841 :         }
     912              : 
     913              :         // flush wal to the disk, if required
     914        13154 :         if require_flush {
     915            3 :             self.wal_store.flush_wal().await?;
     916        13151 :         }
     917              : 
     918              :         // Update commit_lsn.
     919        13154 :         if msg.h.commit_lsn != Lsn(0) {
     920         6728 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     921         6426 :         }
     922              :         // Value calculated by walproposer can always lag:
     923              :         // - safekeepers can forget inmem value and send to proposer lower
     924              :         //   persisted one on restart;
     925              :         // - if we make safekeepers always send persistent value,
     926              :         //   any compute restart would pull it down.
     927              :         // Thus, take max before adopting.
     928        13154 :         self.state.inmem.peer_horizon_lsn =
     929        13154 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
     930        13154 : 
     931        13154 :         // Update truncate and commit LSN in control file.
     932        13154 :         // To avoid negative impact on performance of extra fsync, do it only
     933        13154 :         // when commit_lsn delta exceeds WAL segment size.
     934        13154 :         if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
     935        13154 :             < self.state.inmem.commit_lsn
     936              :         {
     937            0 :             self.state.flush().await?;
     938        13154 :         }
     939              : 
     940        13154 :         trace!(
     941            0 :             "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
     942            0 :             msg.wal_data.len(),
     943              :             msg.h.begin_lsn,
     944              :             msg.h.end_lsn,
     945              :             msg.h.commit_lsn,
     946              :             msg.h.truncate_lsn,
     947              :             require_flush,
     948              :         );
     949              : 
     950              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
     951        13154 :         if !require_flush {
     952        13151 :             return Ok(None);
     953            3 :         }
     954            3 : 
     955            3 :         let resp = self.append_response();
     956            3 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
     957        13155 :     }
     958              : 
     959              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
     960        10894 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
     961        10894 :         self.wal_store.flush_wal().await?;
     962        10894 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
     963        10894 :             self.append_response(),
     964        10894 :         )))
     965        10894 :     }
     966              : 
     967              :     /// Update commit_lsn from peer safekeeper data.
     968            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     969            0 :         if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
     970              :             // Note: the check is too restrictive, generally we can update local
     971              :             // commit_lsn if our history matches (is part of) history of advanced
     972              :             // commit_lsn provider.
     973            0 :             if sk_info.last_log_term == self.get_last_log_term() {
     974            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
     975            0 :             }
     976            0 :         }
     977            0 :         Ok(())
     978            0 :     }
     979              : }
     980              : 
     981              : #[cfg(test)]
     982              : mod tests {
     983              :     use futures::future::BoxFuture;
     984              :     use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
     985              : 
     986              :     use super::*;
     987              :     use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
     988              :     use std::{ops::Deref, str::FromStr, time::Instant};
     989              : 
     990              :     // fake storage for tests
     991              :     struct InMemoryState {
     992              :         persisted_state: TimelinePersistentState,
     993              :     }
     994              : 
     995              :     impl control_file::Storage for InMemoryState {
     996            5 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     997            5 :             self.persisted_state = s.clone();
     998            5 :             Ok(())
     999            5 :         }
    1000              : 
    1001            0 :         fn last_persist_at(&self) -> Instant {
    1002            0 :             Instant::now()
    1003            0 :         }
    1004              :     }
    1005              : 
    1006              :     impl Deref for InMemoryState {
    1007              :         type Target = TimelinePersistentState;
    1008              : 
    1009           89 :         fn deref(&self) -> &Self::Target {
    1010           89 :             &self.persisted_state
    1011           89 :         }
    1012              :     }
    1013              : 
    1014            3 :     fn test_sk_state() -> TimelinePersistentState {
    1015            3 :         let mut state = TimelinePersistentState::empty();
    1016            3 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1017            3 :         state.tenant_id = TenantId::from([1u8; 16]);
    1018            3 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1019            3 :         state
    1020            3 :     }
    1021              : 
    1022              :     struct DummyWalStore {
    1023              :         lsn: Lsn,
    1024              :     }
    1025              : 
    1026              :     impl wal_storage::Storage for DummyWalStore {
    1027            4 :         fn write_lsn(&self) -> Lsn {
    1028            4 :             self.lsn
    1029            4 :         }
    1030              : 
    1031           15 :         fn flush_lsn(&self) -> Lsn {
    1032           15 :             self.lsn
    1033           15 :         }
    1034              : 
    1035            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
    1036            2 :             Ok(())
    1037            2 :         }
    1038              : 
    1039            3 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1040            3 :             self.lsn = startpos + buf.len() as u64;
    1041            3 :             Ok(())
    1042            3 :         }
    1043              : 
    1044            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1045            2 :             self.lsn = end_pos;
    1046            2 :             Ok(())
    1047            2 :         }
    1048              : 
    1049            5 :         async fn flush_wal(&mut self) -> Result<()> {
    1050            5 :             Ok(())
    1051            5 :         }
    1052              : 
    1053            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1054            0 :             Box::pin(async { Ok(()) })
    1055            0 :         }
    1056              : 
    1057            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1058            0 :             crate::metrics::WalStorageMetrics::default()
    1059            0 :         }
    1060              :     }
    1061              : 
    1062              :     #[tokio::test]
    1063            1 :     async fn test_voting() {
    1064            1 :         let storage = InMemoryState {
    1065            1 :             persisted_state: test_sk_state(),
    1066            1 :         };
    1067            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1068            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1069            1 : 
    1070            1 :         // check voting for 1 is ok
    1071            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1072            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1073            1 :         match vote_resp.unwrap() {
    1074            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1075            1 :             r => panic!("unexpected response: {:?}", r),
    1076            1 :         }
    1077            1 : 
    1078            1 :         // reboot...
    1079            1 :         let state = sk.state.deref().clone();
    1080            1 :         let storage = InMemoryState {
    1081            1 :             persisted_state: state,
    1082            1 :         };
    1083            1 : 
    1084            1 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1085            1 : 
    1086            1 :         // and ensure voting second time for 1 is not ok
    1087            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1088            1 :         match vote_resp.unwrap() {
    1089            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1090            1 :             r => panic!("unexpected response: {:?}", r),
    1091            1 :         }
    1092            1 :     }
    1093              : 
    1094              :     #[tokio::test]
    1095            1 :     async fn test_last_log_term_switch() {
    1096            1 :         let storage = InMemoryState {
    1097            1 :             persisted_state: test_sk_state(),
    1098            1 :         };
    1099            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1100            1 : 
    1101            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1102            1 : 
    1103            1 :         let mut ar_hdr = AppendRequestHeader {
    1104            1 :             term: 2,
    1105            1 :             term_start_lsn: Lsn(3),
    1106            1 :             begin_lsn: Lsn(1),
    1107            1 :             end_lsn: Lsn(2),
    1108            1 :             commit_lsn: Lsn(0),
    1109            1 :             truncate_lsn: Lsn(0),
    1110            1 :             proposer_uuid: [0; 16],
    1111            1 :         };
    1112            1 :         let mut append_request = AppendRequest {
    1113            1 :             h: ar_hdr.clone(),
    1114            1 :             wal_data: Bytes::from_static(b"b"),
    1115            1 :         };
    1116            1 : 
    1117            1 :         let pem = ProposerElected {
    1118            1 :             term: 2,
    1119            1 :             start_streaming_at: Lsn(1),
    1120            1 :             term_history: TermHistory(vec![
    1121            1 :                 TermLsn {
    1122            1 :                     term: 1,
    1123            1 :                     lsn: Lsn(1),
    1124            1 :                 },
    1125            1 :                 TermLsn {
    1126            1 :                     term: 2,
    1127            1 :                     lsn: Lsn(3),
    1128            1 :                 },
    1129            1 :             ]),
    1130            1 :             timeline_start_lsn: Lsn(1),
    1131            1 :         };
    1132            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1133            1 :             .await
    1134            1 :             .unwrap();
    1135            1 : 
    1136            1 :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1137            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1138            1 :             .await
    1139            1 :             .unwrap();
    1140            1 :         assert_eq!(sk.get_last_log_term(), 1);
    1141            1 : 
    1142            1 :         // but record at term_start_lsn does the switch
    1143            1 :         ar_hdr.begin_lsn = Lsn(2);
    1144            1 :         ar_hdr.end_lsn = Lsn(3);
    1145            1 :         append_request = AppendRequest {
    1146            1 :             h: ar_hdr,
    1147            1 :             wal_data: Bytes::from_static(b"b"),
    1148            1 :         };
    1149            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1150            1 :             .await
    1151            1 :             .unwrap();
    1152            1 :         assert_eq!(sk.get_last_log_term(), 2);
    1153            1 :     }
    1154              : 
    1155              :     #[tokio::test]
    1156            1 :     async fn test_non_consecutive_write() {
    1157            1 :         let storage = InMemoryState {
    1158            1 :             persisted_state: test_sk_state(),
    1159            1 :         };
    1160            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1161            1 : 
    1162            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1163            1 : 
    1164            1 :         let pem = ProposerElected {
    1165            1 :             term: 1,
    1166            1 :             start_streaming_at: Lsn(1),
    1167            1 :             term_history: TermHistory(vec![TermLsn {
    1168            1 :                 term: 1,
    1169            1 :                 lsn: Lsn(1),
    1170            1 :             }]),
    1171            1 :             timeline_start_lsn: Lsn(1),
    1172            1 :         };
    1173            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1174            1 :             .await
    1175            1 :             .unwrap();
    1176            1 : 
    1177            1 :         let ar_hdr = AppendRequestHeader {
    1178            1 :             term: 1,
    1179            1 :             term_start_lsn: Lsn(3),
    1180            1 :             begin_lsn: Lsn(1),
    1181            1 :             end_lsn: Lsn(2),
    1182            1 :             commit_lsn: Lsn(0),
    1183            1 :             truncate_lsn: Lsn(0),
    1184            1 :             proposer_uuid: [0; 16],
    1185            1 :         };
    1186            1 :         let append_request = AppendRequest {
    1187            1 :             h: ar_hdr.clone(),
    1188            1 :             wal_data: Bytes::from_static(b"b"),
    1189            1 :         };
    1190            1 : 
    1191            1 :         // do write ending at 2, it should be ok
    1192            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1193            1 :             .await
    1194            1 :             .unwrap();
    1195            1 :         let mut ar_hrd2 = ar_hdr.clone();
    1196            1 :         ar_hrd2.begin_lsn = Lsn(4);
    1197            1 :         ar_hrd2.end_lsn = Lsn(5);
    1198            1 :         let append_request = AppendRequest {
    1199            1 :             h: ar_hdr,
    1200            1 :             wal_data: Bytes::from_static(b"b"),
    1201            1 :         };
    1202            1 :         // and now starting at 4, it must fail
    1203            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1204            1 :             .await
    1205            1 :             .unwrap_err();
    1206            1 :     }
    1207              : 
    1208              :     #[test]
    1209            1 :     fn test_find_highest_common_point_none() {
    1210            1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1211            1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1212            1 :         assert_eq!(
    1213            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1214            1 :             None
    1215            1 :         );
    1216            1 :     }
    1217              : 
    1218              :     #[test]
    1219            1 :     fn test_find_highest_common_point_middle() {
    1220            1 :         let prop_th = TermHistory(vec![
    1221            1 :             (1, Lsn(10)).into(),
    1222            1 :             (2, Lsn(20)).into(),
    1223            1 :             (4, Lsn(40)).into(),
    1224            1 :         ]);
    1225            1 :         let sk_th = TermHistory(vec![
    1226            1 :             (1, Lsn(10)).into(),
    1227            1 :             (2, Lsn(20)).into(),
    1228            1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1229            1 :         ]);
    1230            1 :         assert_eq!(
    1231            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1232            1 :             Some(TermLsn {
    1233            1 :                 term: 2,
    1234            1 :                 lsn: Lsn(30),
    1235            1 :             })
    1236            1 :         );
    1237            1 :     }
    1238              : 
    1239              :     #[test]
    1240            1 :     fn test_find_highest_common_point_sk_end() {
    1241            1 :         let prop_th = TermHistory(vec![
    1242            1 :             (1, Lsn(10)).into(),
    1243            1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1244            1 :             (4, Lsn(40)).into(),
    1245            1 :         ]);
    1246            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1247            1 :         assert_eq!(
    1248            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1249            1 :             Some(TermLsn {
    1250            1 :                 term: 2,
    1251            1 :                 lsn: Lsn(32),
    1252            1 :             })
    1253            1 :         );
    1254            1 :     }
    1255              : 
    1256              :     #[test]
    1257            1 :     fn test_find_highest_common_point_walprop() {
    1258            1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1259            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1260            1 :         assert_eq!(
    1261            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1262            1 :             Some(TermLsn {
    1263            1 :                 term: 2,
    1264            1 :                 lsn: Lsn(32),
    1265            1 :             })
    1266            1 :         );
    1267            1 :     }
    1268              : 
    1269              :     #[test]
    1270            1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1271              :         use utils::Hex;
    1272            1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1273            1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1274            1 :         let state = TimelinePersistentState {
    1275            1 :             tenant_id,
    1276            1 :             timeline_id,
    1277            1 :             acceptor_state: AcceptorState {
    1278            1 :                 term: 42,
    1279            1 :                 term_history: TermHistory(vec![TermLsn {
    1280            1 :                     lsn: Lsn(0x1),
    1281            1 :                     term: 41,
    1282            1 :                 }]),
    1283            1 :             },
    1284            1 :             server: ServerInfo {
    1285            1 :                 pg_version: 14,
    1286            1 :                 system_id: 0x1234567887654321,
    1287            1 :                 wal_seg_size: 0x12345678,
    1288            1 :             },
    1289            1 :             proposer_uuid: {
    1290            1 :                 let mut arr = timeline_id.as_arr();
    1291            1 :                 arr.reverse();
    1292            1 :                 arr
    1293            1 :             },
    1294            1 :             timeline_start_lsn: Lsn(0x12345600),
    1295            1 :             local_start_lsn: Lsn(0x12),
    1296            1 :             commit_lsn: Lsn(1234567800),
    1297            1 :             backup_lsn: Lsn(1234567300),
    1298            1 :             peer_horizon_lsn: Lsn(9999999),
    1299            1 :             remote_consistent_lsn: Lsn(1234560000),
    1300            1 :             peers: PersistedPeers(vec![(
    1301            1 :                 NodeId(1),
    1302            1 :                 PersistedPeerInfo {
    1303            1 :                     backup_lsn: Lsn(1234567000),
    1304            1 :                     term: 42,
    1305            1 :                     flush_lsn: Lsn(1234567800 - 8),
    1306            1 :                     commit_lsn: Lsn(1234567600),
    1307            1 :                 },
    1308            1 :             )]),
    1309            1 :             partial_backup: crate::wal_backup_partial::State::default(),
    1310            1 :             eviction_state: EvictionState::Present,
    1311            1 :         };
    1312            1 : 
    1313            1 :         let ser = state.ser().unwrap();
    1314            1 : 
    1315            1 :         #[rustfmt::skip]
    1316            1 :         let expected = [
    1317            1 :             // tenant_id as length prefixed hex
    1318            1 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1319            1 :             0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
    1320            1 :             // timeline_id as length prefixed hex
    1321            1 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1322            1 :             0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
    1323            1 :             // term
    1324            1 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1325            1 :             // length prefix
    1326            1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1327            1 :             // unsure why this order is swapped
    1328            1 :             0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1329            1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1330            1 :             // pg_version
    1331            1 :             0x0e, 0x00, 0x00, 0x00,
    1332            1 :             // systemid
    1333            1 :             0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
    1334            1 :             // wal_seg_size
    1335            1 :             0x78, 0x56, 0x34, 0x12,
    1336            1 :             // pguuid as length prefixed hex
    1337            1 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1338            1 :             0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
    1339            1 : 
    1340            1 :             // timeline_start_lsn
    1341            1 :             0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
    1342            1 :             0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1343            1 :             0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1344            1 :             0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1345            1 :             0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
    1346            1 :             0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1347            1 :             // length prefix for persistentpeers
    1348            1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1349            1 :             // nodeid
    1350            1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1351            1 :             // backuplsn
    1352            1 :             0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1353            1 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1354            1 :             0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1355            1 :             0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1356            1 :             // partial_backup
    1357            1 :             0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1358            1 :             // eviction_state
    1359            1 :             0x00, 0x00, 0x00, 0x00,
    1360            1 :         ];
    1361            1 : 
    1362            1 :         assert_eq!(Hex(&ser), Hex(&expected));
    1363              : 
    1364            1 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1365            1 : 
    1366            1 :         assert_eq!(deser, state);
    1367            1 :     }
    1368              : }
        

Generated by: LCOV version 2.1-beta