LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 5187d4b6d9cfe1c429baf0147b0578521d04e1ed.info Lines: 86.1 % 805 693
Test Date: 2024-06-26 21:48:01 Functions: 52.6 % 213 112

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use anyhow::{bail, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt};
       5              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6              : 
       7              : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
       8              : use serde::{Deserialize, Serialize};
       9              : use std::cmp::max;
      10              : use std::cmp::min;
      11              : use std::fmt;
      12              : use std::io::Read;
      13              : use storage_broker::proto::SafekeeperTimelineInfo;
      14              : 
      15              : use tracing::*;
      16              : 
      17              : use crate::control_file;
      18              : use crate::send_wal::HotStandbyFeedback;
      19              : 
      20              : use crate::state::TimelineState;
      21              : use crate::wal_storage;
      22              : use pq_proto::SystemId;
      23              : use utils::pageserver_feedback::PageserverFeedback;
      24              : use utils::{
      25              :     bin_ser::LeSer,
      26              :     id::{NodeId, TenantId, TimelineId},
      27              :     lsn::Lsn,
      28              : };
      29              : 
      30              : const SK_PROTOCOL_VERSION: u32 = 2;
      31              : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      32              : 
      33              : /// Consensus logical timestamp.
      34              : pub type Term = u64;
      35              : pub const INVALID_TERM: Term = 0;
      36              : 
      37            8 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      38              : pub struct TermLsn {
      39              :     pub term: Term,
      40              :     pub lsn: Lsn,
      41              : }
      42              : 
      43              : // Creation from tuple provides less typing (e.g. for unit tests).
      44              : impl From<(Term, Lsn)> for TermLsn {
      45           36 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      46           36 :         TermLsn {
      47           36 :             term: pair.0,
      48           36 :             lsn: pair.1,
      49           36 :         }
      50           36 :     }
      51              : }
      52              : 
      53           12 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      54              : pub struct TermHistory(pub Vec<TermLsn>);
      55              : 
      56              : impl TermHistory {
      57        11413 :     pub fn empty() -> TermHistory {
      58        11413 :         TermHistory(Vec::new())
      59        11413 :     }
      60              : 
      61              :     // Parse TermHistory as n_entries followed by TermLsn pairs
      62         6492 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      63         6492 :         if bytes.remaining() < 4 {
      64            0 :             bail!("TermHistory misses len");
      65         6492 :         }
      66         6492 :         let n_entries = bytes.get_u32_le();
      67         6492 :         let mut res = Vec::with_capacity(n_entries as usize);
      68         6492 :         for _ in 0..n_entries {
      69        55263 :             if bytes.remaining() < 16 {
      70            0 :                 bail!("TermHistory is incomplete");
      71        55263 :             }
      72        55263 :             res.push(TermLsn {
      73        55263 :                 term: bytes.get_u64_le(),
      74        55263 :                 lsn: bytes.get_u64_le().into(),
      75        55263 :             })
      76              :         }
      77         6492 :         Ok(TermHistory(res))
      78         6492 :     }
      79              : 
      80              :     /// Return copy of self with switches happening strictly after up_to
      81              :     /// truncated.
      82        30951 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      83        30951 :         let mut res = Vec::with_capacity(self.0.len());
      84       169997 :         for e in &self.0 {
      85       139102 :             if e.lsn > up_to {
      86           56 :                 break;
      87       139046 :             }
      88       139046 :             res.push(*e);
      89              :         }
      90        30951 :         TermHistory(res)
      91        30951 :     }
      92              : 
      93              :     /// Find point of divergence between leader (walproposer) term history and
      94              :     /// safekeeper. Arguments are not symmetrics as proposer history ends at
      95              :     /// +infinity while safekeeper at flush_lsn.
      96              :     /// C version is at walproposer SendProposerElected.
      97            8 :     pub fn find_highest_common_point(
      98            8 :         prop_th: &TermHistory,
      99            8 :         sk_th: &TermHistory,
     100            8 :         sk_wal_end: Lsn,
     101            8 :     ) -> Option<TermLsn> {
     102            8 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     103              : 
     104            8 :         if let Some(sk_th_last) = sk_th.last() {
     105            8 :             assert!(
     106            8 :                 sk_th_last.lsn <= sk_wal_end,
     107            0 :                 "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
     108              :                 sk_th_last,
     109              :                 sk_wal_end
     110              :             );
     111            0 :         }
     112              : 
     113              :         // find last common term, if any...
     114            8 :         let mut last_common_idx = None;
     115           16 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     116           16 :             if prop_th[i].term != sk_th[i].term {
     117            4 :                 break;
     118           12 :             }
     119           12 :             // If term is the same, LSN must be equal as well.
     120           12 :             assert!(
     121           12 :                 prop_th[i].lsn == sk_th[i].lsn,
     122            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     123            0 :                 prop_th[i].term,
     124            0 :                 prop_th[i].lsn,
     125            0 :                 sk_th[i].lsn
     126              :             );
     127           12 :             last_common_idx = Some(i);
     128              :         }
     129            8 :         let last_common_idx = match last_common_idx {
     130            2 :             None => return None, // no common point
     131            6 :             Some(lci) => lci,
     132            6 :         };
     133            6 :         // Now find where it ends at both prop and sk and take min. End of
     134            6 :         // (common) term is the start of the next except it is the last one;
     135            6 :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     136            6 :         // +infinity, so we just take flush_lsn then.
     137            6 :         if last_common_idx == prop_th.len() - 1 {
     138            2 :             Some(TermLsn {
     139            2 :                 term: prop_th[last_common_idx].term,
     140            2 :                 lsn: sk_wal_end,
     141            2 :             })
     142              :         } else {
     143            4 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     144            4 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     145            2 :                 sk_th[last_common_idx + 1].lsn
     146              :             } else {
     147            2 :                 sk_wal_end
     148              :             };
     149            4 :             Some(TermLsn {
     150            4 :                 term: prop_th[last_common_idx].term,
     151            4 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     152            4 :             })
     153              :         }
     154            8 :     }
     155              : }
     156              : 
     157              : /// Display only latest entries for Debug.
     158              : impl fmt::Debug for TermHistory {
     159          400 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     160          400 :         let n_printed = 20;
     161          400 :         write!(
     162          400 :             fmt,
     163          400 :             "{}{:?}",
     164          400 :             if self.0.len() > n_printed { "... " } else { "" },
     165          400 :             self.0
     166          400 :                 .iter()
     167          400 :                 .rev()
     168          400 :                 .take(n_printed)
     169         2216 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     170          400 :                 .collect::<Vec<_>>()
     171          400 :         )
     172          400 :     }
     173              : }
     174              : 
     175              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     176              : pub type PgUuid = [u8; 16];
     177              : 
     178              : /// Persistent consensus state of the acceptor.
     179           12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     180              : pub struct AcceptorState {
     181              :     /// acceptor's last term it voted for (advanced in 1 phase)
     182              :     pub term: Term,
     183              :     /// History of term switches for safekeeper's WAL.
     184              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     185              :     /// from the proposer before recovery.
     186              :     pub term_history: TermHistory,
     187              : }
     188              : 
     189              : impl AcceptorState {
     190              :     /// acceptor's last_log_term is the term of the highest entry in the log
     191         6498 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     192         6498 :         let th = self.term_history.up_to(flush_lsn);
     193         6498 :         match th.0.last() {
     194         5433 :             Some(e) => e.term,
     195         1065 :             None => 0,
     196              :         }
     197         6498 :     }
     198              : }
     199              : 
     200              : /// Information about Postgres. Safekeeper gets it once and then verifies
     201              : /// all further connections from computes match.
     202            8 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     203              : pub struct ServerInfo {
     204              :     /// Postgres server version
     205              :     pub pg_version: u32,
     206              :     pub system_id: SystemId,
     207              :     pub wal_seg_size: u32,
     208              : }
     209              : 
     210            4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     211              : pub struct PersistedPeerInfo {
     212              :     /// LSN up to which safekeeper offloaded WAL to s3.
     213              :     pub backup_lsn: Lsn,
     214              :     /// Term of the last entry.
     215              :     pub term: Term,
     216              :     /// LSN of the last record.
     217              :     pub flush_lsn: Lsn,
     218              :     /// Up to which LSN safekeeper regards its WAL as committed.
     219              :     pub commit_lsn: Lsn,
     220              : }
     221              : 
     222              : impl PersistedPeerInfo {
     223            0 :     pub fn new() -> Self {
     224            0 :         Self {
     225            0 :             backup_lsn: Lsn::INVALID,
     226            0 :             term: INVALID_TERM,
     227            0 :             flush_lsn: Lsn(0),
     228            0 :             commit_lsn: Lsn(0),
     229            0 :         }
     230            0 :     }
     231              : }
     232              : 
     233              : // make clippy happy
     234              : impl Default for PersistedPeerInfo {
     235            0 :     fn default() -> Self {
     236            0 :         Self::new()
     237            0 :     }
     238              : }
     239              : 
     240              : // protocol messages
     241              : 
     242              : /// Initial Proposer -> Acceptor message
     243       157810 : #[derive(Debug, Deserialize)]
     244              : pub struct ProposerGreeting {
     245              :     /// proposer-acceptor protocol version
     246              :     pub protocol_version: u32,
     247              :     /// Postgres server version
     248              :     pub pg_version: u32,
     249              :     pub proposer_id: PgUuid,
     250              :     pub system_id: SystemId,
     251              :     pub timeline_id: TimelineId,
     252              :     pub tenant_id: TenantId,
     253              :     pub tli: TimeLineID,
     254              :     pub wal_seg_size: u32,
     255              : }
     256              : 
     257              : /// Acceptor -> Proposer initial response: the highest term known to me
     258              : /// (acceptor voted for).
     259              : #[derive(Debug, Serialize)]
     260              : pub struct AcceptorGreeting {
     261              :     term: u64,
     262              :     node_id: NodeId,
     263              : }
     264              : 
     265              : /// Vote request sent from proposer to safekeepers
     266        24449 : #[derive(Debug, Deserialize)]
     267              : pub struct VoteRequest {
     268              :     pub term: Term,
     269              : }
     270              : 
     271              : /// Vote itself, sent from safekeeper to proposer
     272              : #[derive(Debug, Serialize)]
     273              : pub struct VoteResponse {
     274              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     275              :     vote_given: u64, // fixme u64 due to padding
     276              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     277              :     // proposer to choose the most advanced one.
     278              :     pub flush_lsn: Lsn,
     279              :     truncate_lsn: Lsn,
     280              :     pub term_history: TermHistory,
     281              :     timeline_start_lsn: Lsn,
     282              : }
     283              : 
     284              : /*
     285              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     286              :  * term history to it.
     287              :  */
     288              : #[derive(Debug)]
     289              : pub struct ProposerElected {
     290              :     pub term: Term,
     291              :     pub start_streaming_at: Lsn,
     292              :     pub term_history: TermHistory,
     293              :     pub timeline_start_lsn: Lsn,
     294              : }
     295              : 
     296              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     297              : /// communicates commit_lsn.
     298              : #[derive(Debug)]
     299              : pub struct AppendRequest {
     300              :     pub h: AppendRequestHeader,
     301              :     pub wal_data: Bytes,
     302              : }
     303        21672 : #[derive(Debug, Clone, Deserialize)]
     304              : pub struct AppendRequestHeader {
     305              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     306              :     pub term: Term,
     307              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     308              :     // switch can be taken from ProposerElected (as well as from term history).
     309              :     pub term_start_lsn: Lsn,
     310              :     /// start position of message in WAL
     311              :     pub begin_lsn: Lsn,
     312              :     /// end position of message in WAL
     313              :     pub end_lsn: Lsn,
     314              :     /// LSN committed by quorum of safekeepers
     315              :     pub commit_lsn: Lsn,
     316              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     317              :     pub truncate_lsn: Lsn,
     318              :     // only for logging/debugging
     319              :     pub proposer_uuid: PgUuid,
     320              : }
     321              : 
     322              : /// Report safekeeper state to proposer
     323              : #[derive(Debug, Serialize, Clone)]
     324              : pub struct AppendResponse {
     325              :     // Current term of the safekeeper; if it is higher than proposer's, the
     326              :     // compute is out of date.
     327              :     pub term: Term,
     328              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     329              :     // term history this value comes, either checking history directly or
     330              :     // observing term being set to one for which WAL truncation is known to have
     331              :     // happened.
     332              :     pub flush_lsn: Lsn,
     333              :     // We report back our awareness about which WAL is committed, as this is
     334              :     // a criterion for walproposer --sync mode exit
     335              :     pub commit_lsn: Lsn,
     336              :     pub hs_feedback: HotStandbyFeedback,
     337              :     pub pageserver_feedback: Option<PageserverFeedback>,
     338              : }
     339              : 
     340              : impl AppendResponse {
     341            0 :     fn term_only(term: Term) -> AppendResponse {
     342            0 :         AppendResponse {
     343            0 :             term,
     344            0 :             flush_lsn: Lsn(0),
     345            0 :             commit_lsn: Lsn(0),
     346            0 :             hs_feedback: HotStandbyFeedback::empty(),
     347            0 :             pageserver_feedback: None,
     348            0 :         }
     349            0 :     }
     350              : }
     351              : 
     352              : /// Proposer -> Acceptor messages
     353              : #[derive(Debug)]
     354              : pub enum ProposerAcceptorMessage {
     355              :     Greeting(ProposerGreeting),
     356              :     VoteRequest(VoteRequest),
     357              :     Elected(ProposerElected),
     358              :     AppendRequest(AppendRequest),
     359              :     NoFlushAppendRequest(AppendRequest),
     360              :     FlushWAL,
     361              : }
     362              : 
     363              : impl ProposerAcceptorMessage {
     364              :     /// Parse proposer message.
     365       210423 :     pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
     366       210423 :         // xxx using Reader is inefficient but easy to work with bincode
     367       210423 :         let mut stream = msg_bytes.reader();
     368              :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     369       210423 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     370       210423 :         match tag {
     371              :             'g' => {
     372       157810 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     373       157810 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     374              :             }
     375              :             'v' => {
     376        24449 :                 let msg = VoteRequest::des_from(&mut stream)?;
     377        24449 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     378              :             }
     379              :             'e' => {
     380         6492 :                 let mut msg_bytes = stream.into_inner();
     381         6492 :                 if msg_bytes.remaining() < 16 {
     382            0 :                     bail!("ProposerElected message is not complete");
     383         6492 :                 }
     384         6492 :                 let term = msg_bytes.get_u64_le();
     385         6492 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     386         6492 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     387         6492 :                 if msg_bytes.remaining() < 8 {
     388            0 :                     bail!("ProposerElected message is not complete");
     389         6492 :                 }
     390         6492 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     391         6492 :                 let msg = ProposerElected {
     392         6492 :                     term,
     393         6492 :                     start_streaming_at,
     394         6492 :                     timeline_start_lsn,
     395         6492 :                     term_history,
     396         6492 :                 };
     397         6492 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     398              :             }
     399              :             'a' => {
     400              :                 // read header followed by wal data
     401        21672 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     402        21672 :                 let rec_size = hdr
     403        21672 :                     .end_lsn
     404        21672 :                     .checked_sub(hdr.begin_lsn)
     405        21672 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     406              :                     .0 as usize;
     407        21672 :                 if rec_size > MAX_SEND_SIZE {
     408            0 :                     bail!(
     409            0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     410            0 :                         MAX_SEND_SIZE
     411            0 :                     );
     412        21672 :                 }
     413        21672 : 
     414        21672 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     415        21672 :                 stream.read_exact(&mut wal_data_vec)?;
     416        21672 :                 let wal_data = Bytes::from(wal_data_vec);
     417        21672 :                 let msg = AppendRequest { h: hdr, wal_data };
     418        21672 : 
     419        21672 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     420              :             }
     421            0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     422              :         }
     423       210423 :     }
     424              : }
     425              : 
     426              : /// Acceptor -> Proposer messages
     427              : #[derive(Debug)]
     428              : pub enum AcceptorProposerMessage {
     429              :     Greeting(AcceptorGreeting),
     430              :     VoteResponse(VoteResponse),
     431              :     AppendResponse(AppendResponse),
     432              : }
     433              : 
     434              : impl AcceptorProposerMessage {
     435              :     /// Serialize acceptor -> proposer message.
     436       200133 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     437       200133 :         match self {
     438       157810 :             AcceptorProposerMessage::Greeting(msg) => {
     439       157810 :                 buf.put_u64_le('g' as u64);
     440       157810 :                 buf.put_u64_le(msg.term);
     441       157810 :                 buf.put_u64_le(msg.node_id.0);
     442       157810 :             }
     443        24449 :             AcceptorProposerMessage::VoteResponse(msg) => {
     444        24449 :                 buf.put_u64_le('v' as u64);
     445        24449 :                 buf.put_u64_le(msg.term);
     446        24449 :                 buf.put_u64_le(msg.vote_given);
     447        24449 :                 buf.put_u64_le(msg.flush_lsn.into());
     448        24449 :                 buf.put_u64_le(msg.truncate_lsn.into());
     449        24449 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     450       115441 :                 for e in &msg.term_history.0 {
     451        90992 :                     buf.put_u64_le(e.term);
     452        90992 :                     buf.put_u64_le(e.lsn.into());
     453        90992 :                 }
     454        24449 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     455              :             }
     456        17874 :             AcceptorProposerMessage::AppendResponse(msg) => {
     457        17874 :                 buf.put_u64_le('a' as u64);
     458        17874 :                 buf.put_u64_le(msg.term);
     459        17874 :                 buf.put_u64_le(msg.flush_lsn.into());
     460        17874 :                 buf.put_u64_le(msg.commit_lsn.into());
     461        17874 :                 buf.put_i64_le(msg.hs_feedback.ts);
     462        17874 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     463        17874 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     464              : 
     465              :                 // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     466              :                 // if it is not present.
     467        17874 :                 if let Some(ref msg) = msg.pageserver_feedback {
     468            0 :                     msg.serialize(buf);
     469        17874 :                 }
     470              :             }
     471              :         }
     472              : 
     473       200133 :         Ok(())
     474       200133 :     }
     475              : }
     476              : 
     477              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     478              : /// It controls all WAL disk writes and updates of control file.
     479              : ///
     480              : /// Currently safekeeper processes:
     481              : /// - messages from compute (proposers) and provides replies
     482              : /// - messages from broker peers
     483              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     484              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     485              :     /// determines last_log_term switch point.
     486              :     pub term_start_lsn: Lsn,
     487              : 
     488              :     pub state: TimelineState<CTRL>, // persistent state storage
     489              :     pub wal_store: WAL,
     490              : 
     491              :     node_id: NodeId, // safekeeper's node id
     492              : }
     493              : 
     494              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     495              : where
     496              :     CTRL: control_file::Storage,
     497              :     WAL: wal_storage::Storage,
     498              : {
     499              :     /// Accepts a control file storage containing the safekeeper state.
     500              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     501              :     /// and `server` (`wal_seg_size` inside it) fields.
     502        70719 :     pub fn new(
     503        70719 :         state: TimelineState<CTRL>,
     504        70719 :         wal_store: WAL,
     505        70719 :         node_id: NodeId,
     506        70719 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     507        70719 :         if state.tenant_id == TenantId::from([0u8; 16])
     508        70719 :             || state.timeline_id == TimelineId::from([0u8; 16])
     509              :         {
     510            0 :             bail!(
     511            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     512            0 :                 state.tenant_id,
     513            0 :                 state.timeline_id
     514            0 :             );
     515        70719 :         }
     516        70719 : 
     517        70719 :         Ok(SafeKeeper {
     518        70719 :             term_start_lsn: Lsn(0),
     519        70719 :             state,
     520        70719 :             wal_store,
     521        70719 :             node_id,
     522        70719 :         })
     523        70719 :     }
     524              : 
     525              :     /// Get history of term switches for the available WAL
     526        24453 :     fn get_term_history(&self) -> TermHistory {
     527        24453 :         self.state
     528        24453 :             .acceptor_state
     529        24453 :             .term_history
     530        24453 :             .up_to(self.flush_lsn())
     531        24453 :     }
     532              : 
     533         6498 :     pub fn get_last_log_term(&self) -> Term {
     534         6498 :         self.state
     535         6498 :             .acceptor_state
     536         6498 :             .get_last_log_term(self.flush_lsn())
     537         6498 :     }
     538              : 
     539              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     540        84486 :     pub fn flush_lsn(&self) -> Lsn {
     541        84486 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     542        84486 :     }
     543              : 
     544              :     /// Process message from proposer and possibly form reply. Concurrent
     545              :     /// callers must exclude each other.
     546       228307 :     pub async fn process_msg(
     547       228307 :         &mut self,
     548       228307 :         msg: &ProposerAcceptorMessage,
     549       228307 :     ) -> Result<Option<AcceptorProposerMessage>> {
     550       228307 :         match msg {
     551       157810 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     552        24453 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     553         6494 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     554            4 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     555            4 :                 self.handle_append_request(msg, true).await
     556              :             }
     557        21672 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     558        21672 :                 self.handle_append_request(msg, false).await
     559              :             }
     560        17874 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     561              :         }
     562       228307 :     }
     563              : 
     564              :     /// Handle initial message from proposer: check its sanity and send my
     565              :     /// current term.
     566       157810 :     async fn handle_greeting(
     567       157810 :         &mut self,
     568       157810 :         msg: &ProposerGreeting,
     569       157810 :     ) -> Result<Option<AcceptorProposerMessage>> {
     570       157810 :         // Check protocol compatibility
     571       157810 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     572            0 :             bail!(
     573            0 :                 "incompatible protocol version {}, expected {}",
     574            0 :                 msg.protocol_version,
     575            0 :                 SK_PROTOCOL_VERSION
     576            0 :             );
     577       157810 :         }
     578       157810 :         /* Postgres major version mismatch is treated as fatal error
     579       157810 :          * because safekeepers parse WAL headers and the format
     580       157810 :          * may change between versions.
     581       157810 :          */
     582       157810 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     583            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     584              :         {
     585            0 :             bail!(
     586            0 :                 "incompatible server version {}, expected {}",
     587            0 :                 msg.pg_version,
     588            0 :                 self.state.server.pg_version
     589            0 :             );
     590       157810 :         }
     591       157810 : 
     592       157810 :         if msg.tenant_id != self.state.tenant_id {
     593            0 :             bail!(
     594            0 :                 "invalid tenant ID, got {}, expected {}",
     595            0 :                 msg.tenant_id,
     596            0 :                 self.state.tenant_id
     597            0 :             );
     598       157810 :         }
     599       157810 :         if msg.timeline_id != self.state.timeline_id {
     600            0 :             bail!(
     601            0 :                 "invalid timeline ID, got {}, expected {}",
     602            0 :                 msg.timeline_id,
     603            0 :                 self.state.timeline_id
     604            0 :             );
     605       157810 :         }
     606       157810 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     607            0 :             bail!(
     608            0 :                 "invalid wal_seg_size, got {}, expected {}",
     609            0 :                 msg.wal_seg_size,
     610            0 :                 self.state.server.wal_seg_size
     611            0 :             );
     612       157810 :         }
     613       157810 : 
     614       157810 :         // system_id will be updated on mismatch
     615       157810 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     616       157810 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     617            0 :             if self.state.server.system_id != 0 {
     618            0 :                 warn!(
     619            0 :                     "unexpected system ID arrived, got {}, expected {}",
     620            0 :                     msg.system_id, self.state.server.system_id
     621              :                 );
     622            0 :             }
     623              : 
     624            0 :             let mut state = self.state.start_change();
     625            0 :             state.server.system_id = msg.system_id;
     626            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     627            0 :                 state.server.pg_version = msg.pg_version;
     628            0 :             }
     629            0 :             self.state.finish_change(&state).await?;
     630       157810 :         }
     631              : 
     632       157810 :         info!(
     633            0 :             "processed greeting from walproposer {}, sending term {:?}",
     634         4832 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     635            0 :             self.state.acceptor_state.term
     636              :         );
     637       157810 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     638       157810 :             term: self.state.acceptor_state.term,
     639       157810 :             node_id: self.node_id,
     640       157810 :         })))
     641       157810 :     }
     642              : 
     643              :     /// Give vote for the given term, if we haven't done that previously.
     644        24453 :     async fn handle_vote_request(
     645        24453 :         &mut self,
     646        24453 :         msg: &VoteRequest,
     647        24453 :     ) -> Result<Option<AcceptorProposerMessage>> {
     648        24453 :         // Once voted, we won't accept data from older proposers; flush
     649        24453 :         // everything we've already received so that new proposer starts
     650        24453 :         // streaming at end of our WAL, without overlap. Currently we truncate
     651        24453 :         // WAL at streaming point, so this avoids truncating already committed
     652        24453 :         // WAL.
     653        24453 :         //
     654        24453 :         // TODO: it would be smoother to not truncate committed piece at
     655        24453 :         // handle_elected instead. Currently not a big deal, as proposer is the
     656        24453 :         // only source of WAL; with peer2peer recovery it would be more
     657        24453 :         // important.
     658        24453 :         self.wal_store.flush_wal().await?;
     659              :         // initialize with refusal
     660        24453 :         let mut resp = VoteResponse {
     661        24453 :             term: self.state.acceptor_state.term,
     662        24453 :             vote_given: false as u64,
     663        24453 :             flush_lsn: self.flush_lsn(),
     664        24453 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
     665        24453 :             term_history: self.get_term_history(),
     666        24453 :             timeline_start_lsn: self.state.timeline_start_lsn,
     667        24453 :         };
     668        24453 :         if self.state.acceptor_state.term < msg.term {
     669        23292 :             let mut state = self.state.start_change();
     670        23292 :             state.acceptor_state.term = msg.term;
     671        23292 :             // persist vote before sending it out
     672        23292 :             self.state.finish_change(&state).await?;
     673              : 
     674        23292 :             resp.term = self.state.acceptor_state.term;
     675        23292 :             resp.vote_given = true as u64;
     676         1161 :         }
     677        24453 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     678        24453 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     679        24453 :     }
     680              : 
     681              :     /// Form AppendResponse from current state.
     682        17878 :     fn append_response(&self) -> AppendResponse {
     683        17878 :         let ar = AppendResponse {
     684        17878 :             term: self.state.acceptor_state.term,
     685        17878 :             flush_lsn: self.flush_lsn(),
     686        17878 :             commit_lsn: self.state.commit_lsn,
     687        17878 :             // will be filled by the upper code to avoid bothering safekeeper
     688        17878 :             hs_feedback: HotStandbyFeedback::empty(),
     689        17878 :             pageserver_feedback: None,
     690        17878 :         };
     691        17878 :         trace!("formed AppendResponse {:?}", ar);
     692        17878 :         ar
     693        17878 :     }
     694              : 
     695         6494 :     async fn handle_elected(
     696         6494 :         &mut self,
     697         6494 :         msg: &ProposerElected,
     698         6494 :     ) -> Result<Option<AcceptorProposerMessage>> {
     699         6494 :         info!("received ProposerElected {:?}", msg);
     700         6494 :         if self.state.acceptor_state.term < msg.term {
     701            2 :             let mut state = self.state.start_change();
     702            2 :             state.acceptor_state.term = msg.term;
     703            2 :             self.state.finish_change(&state).await?;
     704         6492 :         }
     705              : 
     706              :         // If our term is higher, ignore the message (next feedback will inform the compute)
     707         6494 :         if self.state.acceptor_state.term > msg.term {
     708            0 :             return Ok(None);
     709         6494 :         }
     710         6494 : 
     711         6494 :         // This might happen in a rare race when another (old) connection from
     712         6494 :         // the same walproposer writes + flushes WAL after this connection
     713         6494 :         // already sent flush_lsn in VoteRequest. It is generally safe to
     714         6494 :         // proceed, but to prevent commit_lsn surprisingly going down we should
     715         6494 :         // either refuse the session (simpler) or skip the part we already have
     716         6494 :         // from the stream (can be implemented).
     717         6494 :         if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at {
     718            0 :             bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
     719            0 :                    msg.term, self.flush_lsn(), msg.start_streaming_at)
     720         6494 :         }
     721         6494 :         // Otherwise we must never attempt to truncate committed data.
     722         6494 :         assert!(
     723         6494 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
     724            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
     725              :             msg.start_streaming_at,
     726              :             self.state.inmem.commit_lsn
     727              :         );
     728              : 
     729              :         // Before first WAL write initialize its segment. It makes first segment
     730              :         // pg_waldump'able because stream from compute doesn't include its
     731              :         // segment and page headers.
     732              :         //
     733              :         // If we fail before first WAL write flush this action would be
     734              :         // repeated, that's ok because it is idempotent.
     735         6494 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
     736         1063 :             self.wal_store
     737         1063 :                 .initialize_first_segment(msg.start_streaming_at)
     738            0 :                 .await?;
     739         5431 :         }
     740              : 
     741              :         // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
     742              :         // intersection of our history and history from msg
     743              : 
     744              :         // truncate wal, update the LSNs
     745         6494 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     746              : 
     747              :         // and now adopt term history from proposer
     748              :         {
     749         6494 :             let mut state = self.state.start_change();
     750         6494 : 
     751         6494 :             // Here we learn initial LSN for the first time, set fields
     752         6494 :             // interested in that.
     753         6494 : 
     754         6494 :             if state.timeline_start_lsn == Lsn(0) {
     755              :                 // Remember point where WAL begins globally.
     756         1063 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     757         1063 :                 info!(
     758            0 :                     "setting timeline_start_lsn to {:?}",
     759              :                     state.timeline_start_lsn
     760              :                 );
     761         5431 :             }
     762         6494 :             if state.peer_horizon_lsn == Lsn(0) {
     763         1063 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
     764         1063 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
     765         1063 :                 state.peer_horizon_lsn = msg.timeline_start_lsn;
     766         5431 :             }
     767         6494 :             if state.local_start_lsn == Lsn(0) {
     768         1063 :                 state.local_start_lsn = msg.start_streaming_at;
     769         1063 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     770         5431 :             }
     771              :             // Initializing commit_lsn before acking first flushed record is
     772              :             // important to let find_end_of_wal skip the hole in the beginning
     773              :             // of the first segment.
     774              :             //
     775              :             // NB: on new clusters, this happens at the same time as
     776              :             // timeline_start_lsn initialization, it is taken outside to provide
     777              :             // upgrade.
     778         6494 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
     779         6494 : 
     780         6494 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     781         6494 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
     782         6494 :             // similar for remote_consistent_lsn
     783         6494 :             state.remote_consistent_lsn =
     784         6494 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
     785         6494 : 
     786         6494 :             state.acceptor_state.term_history = msg.term_history.clone();
     787         6494 :             self.state.finish_change(&state).await?;
     788              :         }
     789              : 
     790         6494 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     791              : 
     792              :         // Cache LSN where term starts to immediately fsync control file with
     793              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
     794              :         // persisted commit_lsn on majority of safekeepers aligns.
     795         6494 :         self.term_start_lsn = match msg.term_history.0.last() {
     796            0 :             None => bail!("proposer elected with empty term history"),
     797         6494 :             Some(term_lsn_start) => term_lsn_start.lsn,
     798         6494 :         };
     799         6494 : 
     800         6494 :         Ok(None)
     801         6494 :     }
     802              : 
     803              :     /// Advance commit_lsn taking into account what we have locally.
     804              :     ///
     805              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     806              :     /// already been done outside.
     807        10813 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     808        10813 :         // Both peers and walproposer communicate this value, we might already
     809        10813 :         // have a fresher (higher) version.
     810        10813 :         candidate = max(candidate, self.state.inmem.commit_lsn);
     811        10813 :         let commit_lsn = min(candidate, self.flush_lsn());
     812        10813 :         assert!(
     813        10813 :             commit_lsn >= self.state.inmem.commit_lsn,
     814            0 :             "commit_lsn monotonicity violated: old={} new={}",
     815              :             self.state.inmem.commit_lsn,
     816              :             commit_lsn
     817              :         );
     818              : 
     819        10813 :         self.state.inmem.commit_lsn = commit_lsn;
     820        10813 : 
     821        10813 :         // If new commit_lsn reached term switch, force sync of control
     822        10813 :         // file: walproposer in sync mode is very interested when this
     823        10813 :         // happens. Note: this is for sync-safekeepers mode only, as
     824        10813 :         // otherwise commit_lsn might jump over term_start_lsn.
     825        10813 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
     826          803 :             self.state.flush().await?;
     827        10010 :         }
     828              : 
     829        10813 :         Ok(())
     830        10813 :     }
     831              : 
     832              :     /// Handle request to append WAL.
     833              :     #[allow(clippy::comparison_chain)]
     834        21676 :     async fn handle_append_request(
     835        21676 :         &mut self,
     836        21676 :         msg: &AppendRequest,
     837        21676 :         require_flush: bool,
     838        21676 :     ) -> Result<Option<AcceptorProposerMessage>> {
     839        21676 :         if self.state.acceptor_state.term < msg.h.term {
     840            0 :             bail!("got AppendRequest before ProposerElected");
     841        21676 :         }
     842        21676 : 
     843        21676 :         // If our term is higher, immediately refuse the message.
     844        21676 :         if self.state.acceptor_state.term > msg.h.term {
     845            0 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     846            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     847        21676 :         }
     848        21676 : 
     849        21676 :         // Now we know that we are in the same term as the proposer,
     850        21676 :         // processing the message.
     851        21676 : 
     852        21676 :         self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
     853        21676 : 
     854        21676 :         // do the job
     855        21676 :         if !msg.wal_data.is_empty() {
     856         3975 :             self.wal_store
     857         3975 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     858            0 :                 .await?;
     859        17701 :         }
     860              : 
     861              :         // flush wal to the disk, if required
     862        21676 :         if require_flush {
     863            4 :             self.wal_store.flush_wal().await?;
     864        21672 :         }
     865              : 
     866              :         // Update commit_lsn.
     867        21676 :         if msg.h.commit_lsn != Lsn(0) {
     868        10813 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     869        10863 :         }
     870              :         // Value calculated by walproposer can always lag:
     871              :         // - safekeepers can forget inmem value and send to proposer lower
     872              :         //   persisted one on restart;
     873              :         // - if we make safekeepers always send persistent value,
     874              :         //   any compute restart would pull it down.
     875              :         // Thus, take max before adopting.
     876        21676 :         self.state.inmem.peer_horizon_lsn =
     877        21676 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
     878        21676 : 
     879        21676 :         // Update truncate and commit LSN in control file.
     880        21676 :         // To avoid negative impact on performance of extra fsync, do it only
     881        21676 :         // when commit_lsn delta exceeds WAL segment size.
     882        21676 :         if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
     883        21676 :             < self.state.inmem.commit_lsn
     884              :         {
     885            0 :             self.state.flush().await?;
     886        21676 :         }
     887              : 
     888        21676 :         trace!(
     889            0 :             "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
     890            0 :             msg.wal_data.len(),
     891              :             msg.h.end_lsn,
     892              :             msg.h.commit_lsn,
     893              :             msg.h.truncate_lsn,
     894              :             require_flush,
     895              :         );
     896              : 
     897              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
     898        21676 :         if !require_flush {
     899        21672 :             return Ok(None);
     900            4 :         }
     901            4 : 
     902            4 :         let resp = self.append_response();
     903            4 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
     904        21676 :     }
     905              : 
     906              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
     907        17874 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
     908        17874 :         self.wal_store.flush_wal().await?;
     909        17874 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
     910        17874 :             self.append_response(),
     911        17874 :         )))
     912        17874 :     }
     913              : 
     914              :     /// Update commit_lsn from peer safekeeper data.
     915            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     916            0 :         if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
     917              :             // Note: the check is too restrictive, generally we can update local
     918              :             // commit_lsn if our history matches (is part of) history of advanced
     919              :             // commit_lsn provider.
     920            0 :             if sk_info.last_log_term == self.get_last_log_term() {
     921            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
     922            0 :             }
     923            0 :         }
     924            0 :         Ok(())
     925            0 :     }
     926              : }
     927              : 
     928              : #[cfg(test)]
     929              : mod tests {
     930              :     use futures::future::BoxFuture;
     931              :     use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
     932              : 
     933              :     use super::*;
     934              :     use crate::{
     935              :         state::{EvictionState, PersistedPeers, TimelinePersistentState},
     936              :         wal_storage::Storage,
     937              :     };
     938              :     use std::{ops::Deref, str::FromStr, time::Instant};
     939              : 
     940              :     // fake storage for tests
     941              :     struct InMemoryState {
     942              :         persisted_state: TimelinePersistentState,
     943              :     }
     944              : 
     945              :     #[async_trait::async_trait]
     946              :     impl control_file::Storage for InMemoryState {
     947            6 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     948            6 :             self.persisted_state = s.clone();
     949            6 :             Ok(())
     950            6 :         }
     951              : 
     952            0 :         fn last_persist_at(&self) -> Instant {
     953            0 :             Instant::now()
     954            0 :         }
     955              :     }
     956              : 
     957              :     impl Deref for InMemoryState {
     958              :         type Target = TimelinePersistentState;
     959              : 
     960          120 :         fn deref(&self) -> &Self::Target {
     961          120 :             &self.persisted_state
     962          120 :         }
     963              :     }
     964              : 
     965            4 :     fn test_sk_state() -> TimelinePersistentState {
     966            4 :         let mut state = TimelinePersistentState::empty();
     967            4 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
     968            4 :         state.tenant_id = TenantId::from([1u8; 16]);
     969            4 :         state.timeline_id = TimelineId::from([1u8; 16]);
     970            4 :         state
     971            4 :     }
     972              : 
     973              :     struct DummyWalStore {
     974              :         lsn: Lsn,
     975              :     }
     976              : 
     977              :     #[async_trait::async_trait]
     978              :     impl wal_storage::Storage for DummyWalStore {
     979           20 :         fn flush_lsn(&self) -> Lsn {
     980           20 :             self.lsn
     981           20 :         }
     982              : 
     983            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
     984            2 :             Ok(())
     985            2 :         }
     986              : 
     987            4 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     988            4 :             self.lsn = startpos + buf.len() as u64;
     989            4 :             Ok(())
     990            4 :         }
     991              : 
     992            4 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     993            4 :             self.lsn = end_pos;
     994            4 :             Ok(())
     995            4 :         }
     996              : 
     997            8 :         async fn flush_wal(&mut self) -> Result<()> {
     998            8 :             Ok(())
     999            8 :         }
    1000              : 
    1001            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1002            0 :             Box::pin(async { Ok(()) })
    1003            0 :         }
    1004              : 
    1005            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1006            0 :             crate::metrics::WalStorageMetrics::default()
    1007            0 :         }
    1008              :     }
    1009              : 
    1010              :     #[tokio::test]
    1011            2 :     async fn test_voting() {
    1012            2 :         let storage = InMemoryState {
    1013            2 :             persisted_state: test_sk_state(),
    1014            2 :         };
    1015            2 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1016            2 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1017            2 : 
    1018            2 :         // check voting for 1 is ok
    1019            2 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1020            2 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1021            2 :         match vote_resp.unwrap() {
    1022            2 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1023            2 :             r => panic!("unexpected response: {:?}", r),
    1024            2 :         }
    1025            2 : 
    1026            2 :         // reboot...
    1027            2 :         let state = sk.state.deref().clone();
    1028            2 :         let storage = InMemoryState {
    1029            2 :             persisted_state: state,
    1030            2 :         };
    1031            2 : 
    1032            2 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1033            2 : 
    1034            2 :         // and ensure voting second time for 1 is not ok
    1035            2 :         vote_resp = sk.process_msg(&vote_request).await;
    1036            2 :         match vote_resp.unwrap() {
    1037            2 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1038            2 :             r => panic!("unexpected response: {:?}", r),
    1039            2 :         }
    1040            2 :     }
    1041              : 
    1042              :     #[tokio::test]
    1043            2 :     async fn test_last_log_term_switch() {
    1044            2 :         let storage = InMemoryState {
    1045            2 :             persisted_state: test_sk_state(),
    1046            2 :         };
    1047            2 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1048            2 : 
    1049            2 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1050            2 : 
    1051            2 :         let mut ar_hdr = AppendRequestHeader {
    1052            2 :             term: 1,
    1053            2 :             term_start_lsn: Lsn(3),
    1054            2 :             begin_lsn: Lsn(1),
    1055            2 :             end_lsn: Lsn(2),
    1056            2 :             commit_lsn: Lsn(0),
    1057            2 :             truncate_lsn: Lsn(0),
    1058            2 :             proposer_uuid: [0; 16],
    1059            2 :         };
    1060            2 :         let mut append_request = AppendRequest {
    1061            2 :             h: ar_hdr.clone(),
    1062            2 :             wal_data: Bytes::from_static(b"b"),
    1063            2 :         };
    1064            2 : 
    1065            2 :         let pem = ProposerElected {
    1066            2 :             term: 1,
    1067            2 :             start_streaming_at: Lsn(1),
    1068            2 :             term_history: TermHistory(vec![TermLsn {
    1069            2 :                 term: 1,
    1070            2 :                 lsn: Lsn(3),
    1071            2 :             }]),
    1072            2 :             timeline_start_lsn: Lsn(0),
    1073            2 :         };
    1074            2 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1075            2 :             .await
    1076            2 :             .unwrap();
    1077            2 : 
    1078            2 :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1079            2 :         let resp = sk
    1080            2 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1081            2 :             .await;
    1082            2 :         assert!(resp.is_ok());
    1083            2 :         assert_eq!(sk.get_last_log_term(), 0);
    1084            2 : 
    1085            2 :         // but record at term_start_lsn does the switch
    1086            2 :         ar_hdr.begin_lsn = Lsn(2);
    1087            2 :         ar_hdr.end_lsn = Lsn(3);
    1088            2 :         append_request = AppendRequest {
    1089            2 :             h: ar_hdr,
    1090            2 :             wal_data: Bytes::from_static(b"b"),
    1091            2 :         };
    1092            2 :         let resp = sk
    1093            2 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1094            2 :             .await;
    1095            2 :         assert!(resp.is_ok());
    1096            2 :         sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
    1097            2 :         assert_eq!(sk.get_last_log_term(), 1);
    1098            2 :     }
    1099              : 
    1100              :     #[test]
    1101            2 :     fn test_find_highest_common_point_none() {
    1102            2 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1103            2 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1104            2 :         assert_eq!(
    1105            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1106            2 :             None
    1107            2 :         );
    1108            2 :     }
    1109              : 
    1110              :     #[test]
    1111            2 :     fn test_find_highest_common_point_middle() {
    1112            2 :         let prop_th = TermHistory(vec![
    1113            2 :             (1, Lsn(10)).into(),
    1114            2 :             (2, Lsn(20)).into(),
    1115            2 :             (4, Lsn(40)).into(),
    1116            2 :         ]);
    1117            2 :         let sk_th = TermHistory(vec![
    1118            2 :             (1, Lsn(10)).into(),
    1119            2 :             (2, Lsn(20)).into(),
    1120            2 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1121            2 :         ]);
    1122            2 :         assert_eq!(
    1123            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1124            2 :             Some(TermLsn {
    1125            2 :                 term: 2,
    1126            2 :                 lsn: Lsn(30),
    1127            2 :             })
    1128            2 :         );
    1129            2 :     }
    1130              : 
    1131              :     #[test]
    1132            2 :     fn test_find_highest_common_point_sk_end() {
    1133            2 :         let prop_th = TermHistory(vec![
    1134            2 :             (1, Lsn(10)).into(),
    1135            2 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1136            2 :             (4, Lsn(40)).into(),
    1137            2 :         ]);
    1138            2 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1139            2 :         assert_eq!(
    1140            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1141            2 :             Some(TermLsn {
    1142            2 :                 term: 2,
    1143            2 :                 lsn: Lsn(32),
    1144            2 :             })
    1145            2 :         );
    1146            2 :     }
    1147              : 
    1148              :     #[test]
    1149            2 :     fn test_find_highest_common_point_walprop() {
    1150            2 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1151            2 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1152            2 :         assert_eq!(
    1153            2 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1154            2 :             Some(TermLsn {
    1155            2 :                 term: 2,
    1156            2 :                 lsn: Lsn(32),
    1157            2 :             })
    1158            2 :         );
    1159            2 :     }
    1160              : 
    1161              :     #[test]
    1162            2 :     fn test_sk_state_bincode_serde_roundtrip() {
    1163            2 :         use utils::Hex;
    1164            2 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1165            2 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1166            2 :         let state = TimelinePersistentState {
    1167            2 :             tenant_id,
    1168            2 :             timeline_id,
    1169            2 :             acceptor_state: AcceptorState {
    1170            2 :                 term: 42,
    1171            2 :                 term_history: TermHistory(vec![TermLsn {
    1172            2 :                     lsn: Lsn(0x1),
    1173            2 :                     term: 41,
    1174            2 :                 }]),
    1175            2 :             },
    1176            2 :             server: ServerInfo {
    1177            2 :                 pg_version: 14,
    1178            2 :                 system_id: 0x1234567887654321,
    1179            2 :                 wal_seg_size: 0x12345678,
    1180            2 :             },
    1181            2 :             proposer_uuid: {
    1182            2 :                 let mut arr = timeline_id.as_arr();
    1183            2 :                 arr.reverse();
    1184            2 :                 arr
    1185            2 :             },
    1186            2 :             timeline_start_lsn: Lsn(0x12345600),
    1187            2 :             local_start_lsn: Lsn(0x12),
    1188            2 :             commit_lsn: Lsn(1234567800),
    1189            2 :             backup_lsn: Lsn(1234567300),
    1190            2 :             peer_horizon_lsn: Lsn(9999999),
    1191            2 :             remote_consistent_lsn: Lsn(1234560000),
    1192            2 :             peers: PersistedPeers(vec![(
    1193            2 :                 NodeId(1),
    1194            2 :                 PersistedPeerInfo {
    1195            2 :                     backup_lsn: Lsn(1234567000),
    1196            2 :                     term: 42,
    1197            2 :                     flush_lsn: Lsn(1234567800 - 8),
    1198            2 :                     commit_lsn: Lsn(1234567600),
    1199            2 :                 },
    1200            2 :             )]),
    1201            2 :             partial_backup: crate::wal_backup_partial::State::default(),
    1202            2 :             eviction_state: EvictionState::Present,
    1203            2 :         };
    1204            2 : 
    1205            2 :         let ser = state.ser().unwrap();
    1206            2 : 
    1207            2 :         #[rustfmt::skip]
    1208            2 :         let expected = [
    1209            2 :             // tenant_id as length prefixed hex
    1210            2 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1211            2 :             0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
    1212            2 :             // timeline_id as length prefixed hex
    1213            2 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1214            2 :             0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
    1215            2 :             // term
    1216            2 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1217            2 :             // length prefix
    1218            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1219            2 :             // unsure why this order is swapped
    1220            2 :             0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1221            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1222            2 :             // pg_version
    1223            2 :             0x0e, 0x00, 0x00, 0x00,
    1224            2 :             // systemid
    1225            2 :             0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
    1226            2 :             // wal_seg_size
    1227            2 :             0x78, 0x56, 0x34, 0x12,
    1228            2 :             // pguuid as length prefixed hex
    1229            2 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1230            2 :             0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
    1231            2 : 
    1232            2 :             // timeline_start_lsn
    1233            2 :             0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
    1234            2 :             0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1235            2 :             0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1236            2 :             0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1237            2 :             0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
    1238            2 :             0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1239            2 :             // length prefix for persistentpeers
    1240            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1241            2 :             // nodeid
    1242            2 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1243            2 :             // backuplsn
    1244            2 :             0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1245            2 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1246            2 :             0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1247            2 :             0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1248            2 :             // partial_backup
    1249            2 :             0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1250            2 :             // eviction_state
    1251            2 :             0x00, 0x00, 0x00, 0x00,
    1252            2 :         ];
    1253            2 : 
    1254            2 :         assert_eq!(Hex(&ser), Hex(&expected));
    1255              : 
    1256            2 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1257            2 : 
    1258            2 :         assert_eq!(deser, state);
    1259            2 :     }
    1260              : }
        

Generated by: LCOV version 2.1-beta