LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 85.0 % 853 725
Test Date: 2025-01-30 15:18:43 Functions: 67.6 % 179 121

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use anyhow::{bail, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt};
       5              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6              : 
       7              : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
       8              : use safekeeper_api::models::HotStandbyFeedback;
       9              : use safekeeper_api::Term;
      10              : use serde::{Deserialize, Serialize};
      11              : use std::cmp::max;
      12              : use std::cmp::min;
      13              : use std::fmt;
      14              : use std::io::Read;
      15              : use storage_broker::proto::SafekeeperTimelineInfo;
      16              : 
      17              : use tracing::*;
      18              : 
      19              : use crate::control_file;
      20              : use crate::metrics::MISC_OPERATION_SECONDS;
      21              : 
      22              : use crate::state::TimelineState;
      23              : use crate::wal_storage;
      24              : use pq_proto::SystemId;
      25              : use utils::pageserver_feedback::PageserverFeedback;
      26              : use utils::{
      27              :     bin_ser::LeSer,
      28              :     id::{NodeId, TenantId, TimelineId},
      29              :     lsn::Lsn,
      30              : };
      31              : 
      32              : pub const SK_PROTOCOL_VERSION: u32 = 2;
      33              : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      34              : 
      35            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      36              : pub struct TermLsn {
      37              :     pub term: Term,
      38              :     pub lsn: Lsn,
      39              : }
      40              : 
      41              : // Creation from tuple provides less typing (e.g. for unit tests).
      42              : impl From<(Term, Lsn)> for TermLsn {
      43         1230 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      44         1230 :         TermLsn {
      45         1230 :             term: pair.0,
      46         1230 :             lsn: pair.1,
      47         1230 :         }
      48         1230 :     }
      49              : }
      50              : 
      51            0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      52              : pub struct TermHistory(pub Vec<TermLsn>);
      53              : 
      54              : impl TermHistory {
      55         1440 :     pub fn empty() -> TermHistory {
      56         1440 :         TermHistory(Vec::new())
      57         1440 :     }
      58              : 
      59              :     // Parse TermHistory as n_entries followed by TermLsn pairs
      60          832 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      61          832 :         if bytes.remaining() < 4 {
      62            0 :             bail!("TermHistory misses len");
      63          832 :         }
      64          832 :         let n_entries = bytes.get_u32_le();
      65          832 :         let mut res = Vec::with_capacity(n_entries as usize);
      66          832 :         for _ in 0..n_entries {
      67         5704 :             if bytes.remaining() < 16 {
      68            0 :                 bail!("TermHistory is incomplete");
      69         5704 :             }
      70         5704 :             res.push(TermLsn {
      71         5704 :                 term: bytes.get_u64_le(),
      72         5704 :                 lsn: bytes.get_u64_le().into(),
      73         5704 :             })
      74              :         }
      75          832 :         Ok(TermHistory(res))
      76          832 :     }
      77              : 
      78              :     /// Return copy of self with switches happening strictly after up_to
      79              :     /// truncated.
      80         5420 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      81         5420 :         let mut res = Vec::with_capacity(self.0.len());
      82        21727 :         for e in &self.0 {
      83        16315 :             if e.lsn > up_to {
      84            8 :                 break;
      85        16307 :             }
      86        16307 :             res.push(*e);
      87              :         }
      88         5420 :         TermHistory(res)
      89         5420 :     }
      90              : 
      91              :     /// Find point of divergence between leader (walproposer) term history and
      92              :     /// safekeeper. Arguments are not symmetric as proposer history ends at
      93              :     /// +infinity while safekeeper at flush_lsn.
      94              :     /// C version is at walproposer SendProposerElected.
      95          841 :     pub fn find_highest_common_point(
      96          841 :         prop_th: &TermHistory,
      97          841 :         sk_th: &TermHistory,
      98          841 :         sk_wal_end: Lsn,
      99          841 :     ) -> Option<TermLsn> {
     100          841 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     101              : 
     102          841 :         if let Some(sk_th_last) = sk_th.last() {
     103          671 :             assert!(
     104          671 :                 sk_th_last.lsn <= sk_wal_end,
     105            0 :                 "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
     106              :                 sk_th_last,
     107              :                 sk_wal_end
     108              :             );
     109          170 :         }
     110              : 
     111              :         // find last common term, if any...
     112          841 :         let mut last_common_idx = None;
     113         4814 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     114         4814 :             if prop_th[i].term != sk_th[i].term {
     115           10 :                 break;
     116         4804 :             }
     117         4804 :             // If term is the same, LSN must be equal as well.
     118         4804 :             assert!(
     119         4804 :                 prop_th[i].lsn == sk_th[i].lsn,
     120            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     121            0 :                 prop_th[i].term,
     122            0 :                 prop_th[i].lsn,
     123            0 :                 sk_th[i].lsn
     124              :             );
     125         4804 :             last_common_idx = Some(i);
     126              :         }
     127          841 :         let last_common_idx = last_common_idx?;
     128              :         // Now find where it ends at both prop and sk and take min. End of
     129              :         // (common) term is the start of the next except it is the last one;
     130              :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     131              :         // +infinity, so we just take flush_lsn then.
     132          669 :         if last_common_idx == prop_th.len() - 1 {
     133           63 :             Some(TermLsn {
     134           63 :                 term: prop_th[last_common_idx].term,
     135           63 :                 lsn: sk_wal_end,
     136           63 :             })
     137              :         } else {
     138          606 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     139          606 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     140            8 :                 sk_th[last_common_idx + 1].lsn
     141              :             } else {
     142          598 :                 sk_wal_end
     143              :             };
     144          606 :             Some(TermLsn {
     145          606 :                 term: prop_th[last_common_idx].term,
     146          606 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     147          606 :             })
     148              :         }
     149          841 :     }
     150              : }
     151              : 
     152              : /// Display only latest entries for Debug.
     153              : impl fmt::Debug for TermHistory {
     154          371 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     155          371 :         let n_printed = 20;
     156          371 :         write!(
     157          371 :             fmt,
     158          371 :             "{}{:?}",
     159          371 :             if self.0.len() > n_printed { "... " } else { "" },
     160          371 :             self.0
     161          371 :                 .iter()
     162          371 :                 .rev()
     163          371 :                 .take(n_printed)
     164         1321 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     165          371 :                 .collect::<Vec<_>>()
     166          371 :         )
     167          371 :     }
     168              : }
     169              : 
     170              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     171              : pub type PgUuid = [u8; 16];
     172              : 
     173              : /// Persistent consensus state of the acceptor.
     174            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     175              : pub struct AcceptorState {
     176              :     /// acceptor's last term it voted for (advanced in 1 phase)
     177              :     pub term: Term,
     178              :     /// History of term switches for safekeeper's WAL.
     179              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     180              :     /// from the proposer before recovery.
     181              :     pub term_history: TermHistory,
     182              : }
     183              : 
     184              : impl AcceptorState {
     185              :     /// acceptor's last_log_term is the term of the highest entry in the log
     186         1275 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     187         1275 :         let th = self.term_history.up_to(flush_lsn);
     188         1275 :         match th.0.last() {
     189         1269 :             Some(e) => e.term,
     190            6 :             None => 0,
     191              :         }
     192         1275 :     }
     193              : }
     194              : 
     195              : // protocol messages
     196              : 
     197              : /// Initial Proposer -> Acceptor message
     198            0 : #[derive(Debug, Deserialize)]
     199              : pub struct ProposerGreeting {
     200              :     /// proposer-acceptor protocol version
     201              :     pub protocol_version: u32,
     202              :     /// Postgres server version
     203              :     pub pg_version: u32,
     204              :     pub proposer_id: PgUuid,
     205              :     pub system_id: SystemId,
     206              :     pub timeline_id: TimelineId,
     207              :     pub tenant_id: TenantId,
     208              :     pub tli: TimeLineID,
     209              :     pub wal_seg_size: u32,
     210              : }
     211              : 
     212              : /// Acceptor -> Proposer initial response: the highest term known to me
     213              : /// (acceptor voted for).
     214              : #[derive(Debug, Serialize)]
     215              : pub struct AcceptorGreeting {
     216              :     term: u64,
     217              :     node_id: NodeId,
     218              : }
     219              : 
     220              : /// Vote request sent from proposer to safekeepers
     221            0 : #[derive(Debug, Deserialize)]
     222              : pub struct VoteRequest {
     223              :     pub term: Term,
     224              : }
     225              : 
     226              : /// Vote itself, sent from safekeeper to proposer
     227              : #[derive(Debug, Serialize)]
     228              : pub struct VoteResponse {
     229              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     230              :     vote_given: u64, // fixme u64 due to padding
     231              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     232              :     // proposer to choose the most advanced one.
     233              :     pub flush_lsn: Lsn,
     234              :     truncate_lsn: Lsn,
     235              :     pub term_history: TermHistory,
     236              :     timeline_start_lsn: Lsn,
     237              : }
     238              : 
     239              : /*
     240              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     241              :  * term history to it.
     242              :  */
     243              : #[derive(Debug)]
     244              : pub struct ProposerElected {
     245              :     pub term: Term,
     246              :     pub start_streaming_at: Lsn,
     247              :     pub term_history: TermHistory,
     248              :     pub timeline_start_lsn: Lsn,
     249              : }
     250              : 
     251              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     252              : /// communicates commit_lsn.
     253              : #[derive(Debug)]
     254              : pub struct AppendRequest {
     255              :     pub h: AppendRequestHeader,
     256              :     pub wal_data: Bytes,
     257              : }
     258            0 : #[derive(Debug, Clone, Deserialize)]
     259              : pub struct AppendRequestHeader {
     260              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     261              :     pub term: Term,
     262              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     263              :     // switch can be taken from ProposerElected (as well as from term history).
     264              :     pub term_start_lsn: Lsn,
     265              :     /// start position of message in WAL
     266              :     pub begin_lsn: Lsn,
     267              :     /// end position of message in WAL
     268              :     pub end_lsn: Lsn,
     269              :     /// LSN committed by quorum of safekeepers
     270              :     pub commit_lsn: Lsn,
     271              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     272              :     pub truncate_lsn: Lsn,
     273              :     // only for logging/debugging
     274              :     pub proposer_uuid: PgUuid,
     275              : }
     276              : 
     277              : /// Report safekeeper state to proposer
     278              : #[derive(Debug, Serialize, Clone)]
     279              : pub struct AppendResponse {
     280              :     // Current term of the safekeeper; if it is higher than proposer's, the
     281              :     // compute is out of date.
     282              :     pub term: Term,
     283              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     284              :     // term history this value comes, either checking history directly or
     285              :     // observing term being set to one for which WAL truncation is known to have
     286              :     // happened.
     287              :     pub flush_lsn: Lsn,
     288              :     // We report back our awareness about which WAL is committed, as this is
     289              :     // a criterion for walproposer --sync mode exit
     290              :     pub commit_lsn: Lsn,
     291              :     pub hs_feedback: HotStandbyFeedback,
     292              :     pub pageserver_feedback: Option<PageserverFeedback>,
     293              : }
     294              : 
     295              : impl AppendResponse {
     296            0 :     fn term_only(term: Term) -> AppendResponse {
     297            0 :         AppendResponse {
     298            0 :             term,
     299            0 :             flush_lsn: Lsn(0),
     300            0 :             commit_lsn: Lsn(0),
     301            0 :             hs_feedback: HotStandbyFeedback::empty(),
     302            0 :             pageserver_feedback: None,
     303            0 :         }
     304            0 :     }
     305              : }
     306              : 
     307              : /// Proposer -> Acceptor messages
     308              : #[derive(Debug)]
     309              : pub enum ProposerAcceptorMessage {
     310              :     Greeting(ProposerGreeting),
     311              :     VoteRequest(VoteRequest),
     312              :     Elected(ProposerElected),
     313              :     AppendRequest(AppendRequest),
     314              :     NoFlushAppendRequest(AppendRequest),
     315              :     FlushWAL,
     316              : }
     317              : 
     318              : impl ProposerAcceptorMessage {
     319              :     /// Parse proposer message.
     320        26580 :     pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
     321        26580 :         if proto_version != SK_PROTOCOL_VERSION {
     322            0 :             bail!(
     323            0 :                 "incompatible protocol version {}, expected {}",
     324            0 :                 proto_version,
     325            0 :                 SK_PROTOCOL_VERSION
     326            0 :             );
     327        26580 :         }
     328        26580 :         // xxx using Reader is inefficient but easy to work with bincode
     329        26580 :         let mut stream = msg_bytes.reader();
     330              :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     331        26580 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     332        26580 :         match tag {
     333              :             'g' => {
     334        19090 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     335        19090 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     336              :             }
     337              :             'v' => {
     338         3306 :                 let msg = VoteRequest::des_from(&mut stream)?;
     339         3306 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     340              :             }
     341              :             'e' => {
     342          832 :                 let mut msg_bytes = stream.into_inner();
     343          832 :                 if msg_bytes.remaining() < 16 {
     344            0 :                     bail!("ProposerElected message is not complete");
     345          832 :                 }
     346          832 :                 let term = msg_bytes.get_u64_le();
     347          832 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     348          832 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     349          832 :                 if msg_bytes.remaining() < 8 {
     350            0 :                     bail!("ProposerElected message is not complete");
     351          832 :                 }
     352          832 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     353          832 :                 let msg = ProposerElected {
     354          832 :                     term,
     355          832 :                     start_streaming_at,
     356          832 :                     timeline_start_lsn,
     357          832 :                     term_history,
     358          832 :                 };
     359          832 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     360              :             }
     361              :             'a' => {
     362              :                 // read header followed by wal data
     363         3352 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     364         3352 :                 let rec_size = hdr
     365         3352 :                     .end_lsn
     366         3352 :                     .checked_sub(hdr.begin_lsn)
     367         3352 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     368              :                     .0 as usize;
     369         3352 :                 if rec_size > MAX_SEND_SIZE {
     370            0 :                     bail!(
     371            0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     372            0 :                         MAX_SEND_SIZE
     373            0 :                     );
     374         3352 :                 }
     375         3352 : 
     376         3352 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     377         3352 :                 stream.read_exact(&mut wal_data_vec)?;
     378         3352 :                 let wal_data = Bytes::from(wal_data_vec);
     379         3352 :                 let msg = AppendRequest { h: hdr, wal_data };
     380         3352 : 
     381         3352 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     382              :             }
     383            0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     384              :         }
     385        26580 :     }
     386              : 
     387              :     /// The memory size of the message, including byte slices.
     388          600 :     pub fn size(&self) -> usize {
     389              :         const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
     390              : 
     391              :         // For most types, the size is just the base enum size including the nested structs. Some
     392              :         // types also contain byte slices; add them.
     393              :         //
     394              :         // We explicitly list all fields, to draw attention here when new fields are added.
     395          600 :         let mut size = BASE_SIZE;
     396          600 :         size += match self {
     397              :             Self::Greeting(ProposerGreeting {
     398              :                 protocol_version: _,
     399              :                 pg_version: _,
     400              :                 proposer_id: _,
     401              :                 system_id: _,
     402              :                 timeline_id: _,
     403              :                 tenant_id: _,
     404              :                 tli: _,
     405              :                 wal_seg_size: _,
     406            0 :             }) => 0,
     407              : 
     408            0 :             Self::VoteRequest(VoteRequest { term: _ }) => 0,
     409              : 
     410              :             Self::Elected(ProposerElected {
     411              :                 term: _,
     412              :                 start_streaming_at: _,
     413              :                 term_history: _,
     414              :                 timeline_start_lsn: _,
     415            0 :             }) => 0,
     416              : 
     417              :             Self::AppendRequest(AppendRequest {
     418              :                 h:
     419              :                     AppendRequestHeader {
     420              :                         term: _,
     421              :                         term_start_lsn: _,
     422              :                         begin_lsn: _,
     423              :                         end_lsn: _,
     424              :                         commit_lsn: _,
     425              :                         truncate_lsn: _,
     426              :                         proposer_uuid: _,
     427              :                     },
     428          600 :                 wal_data,
     429          600 :             }) => wal_data.len(),
     430              : 
     431              :             Self::NoFlushAppendRequest(AppendRequest {
     432              :                 h:
     433              :                     AppendRequestHeader {
     434              :                         term: _,
     435              :                         term_start_lsn: _,
     436              :                         begin_lsn: _,
     437              :                         end_lsn: _,
     438              :                         commit_lsn: _,
     439              :                         truncate_lsn: _,
     440              :                         proposer_uuid: _,
     441              :                     },
     442            0 :                 wal_data,
     443            0 :             }) => wal_data.len(),
     444              : 
     445            0 :             Self::FlushWAL => 0,
     446              :         };
     447              : 
     448          600 :         size
     449          600 :     }
     450              : }
     451              : 
     452              : /// Acceptor -> Proposer messages
     453              : #[derive(Debug)]
     454              : pub enum AcceptorProposerMessage {
     455              :     Greeting(AcceptorGreeting),
     456              :     VoteResponse(VoteResponse),
     457              :     AppendResponse(AppendResponse),
     458              : }
     459              : 
     460              : impl AcceptorProposerMessage {
     461              :     /// Serialize acceptor -> proposer message.
     462        25192 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     463        25192 :         match self {
     464        19090 :             AcceptorProposerMessage::Greeting(msg) => {
     465        19090 :                 buf.put_u64_le('g' as u64);
     466        19090 :                 buf.put_u64_le(msg.term);
     467        19090 :                 buf.put_u64_le(msg.node_id.0);
     468        19090 :             }
     469         3306 :             AcceptorProposerMessage::VoteResponse(msg) => {
     470         3306 :                 buf.put_u64_le('v' as u64);
     471         3306 :                 buf.put_u64_le(msg.term);
     472         3306 :                 buf.put_u64_le(msg.vote_given);
     473         3306 :                 buf.put_u64_le(msg.flush_lsn.into());
     474         3306 :                 buf.put_u64_le(msg.truncate_lsn.into());
     475         3306 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     476        13341 :                 for e in &msg.term_history.0 {
     477        10035 :                     buf.put_u64_le(e.term);
     478        10035 :                     buf.put_u64_le(e.lsn.into());
     479        10035 :                 }
     480         3306 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     481              :             }
     482         2796 :             AcceptorProposerMessage::AppendResponse(msg) => {
     483         2796 :                 buf.put_u64_le('a' as u64);
     484         2796 :                 buf.put_u64_le(msg.term);
     485         2796 :                 buf.put_u64_le(msg.flush_lsn.into());
     486         2796 :                 buf.put_u64_le(msg.commit_lsn.into());
     487         2796 :                 buf.put_i64_le(msg.hs_feedback.ts);
     488         2796 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     489         2796 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     490              : 
     491              :                 // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     492              :                 // if it is not present.
     493         2796 :                 if let Some(ref msg) = msg.pageserver_feedback {
     494            0 :                     msg.serialize(buf);
     495         2796 :                 }
     496              :             }
     497              :         }
     498              : 
     499        25192 :         Ok(())
     500        25192 :     }
     501              : }
     502              : 
     503              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     504              : /// It controls all WAL disk writes and updates of control file.
     505              : ///
     506              : /// Currently safekeeper processes:
     507              : /// - messages from compute (proposers) and provides replies
     508              : /// - messages from broker peers
     509              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     510              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     511              :     /// determines last_log_term switch point.
     512              :     pub term_start_lsn: Lsn,
     513              : 
     514              :     pub state: TimelineState<CTRL>, // persistent state storage
     515              :     pub wal_store: WAL,
     516              : 
     517              :     node_id: NodeId, // safekeeper's node id
     518              : }
     519              : 
     520              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     521              : where
     522              :     CTRL: control_file::Storage,
     523              :     WAL: wal_storage::Storage,
     524              : {
     525              :     /// Accepts a control file storage containing the safekeeper state.
     526              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     527              :     /// and `server` (`wal_seg_size` inside it) fields.
     528         8439 :     pub fn new(
     529         8439 :         state: TimelineState<CTRL>,
     530         8439 :         wal_store: WAL,
     531         8439 :         node_id: NodeId,
     532         8439 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     533         8439 :         if state.tenant_id == TenantId::from([0u8; 16])
     534         8439 :             || state.timeline_id == TimelineId::from([0u8; 16])
     535              :         {
     536            0 :             bail!(
     537            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     538            0 :                 state.tenant_id,
     539            0 :                 state.timeline_id
     540            0 :             );
     541         8439 :         }
     542         8439 : 
     543         8439 :         Ok(SafeKeeper {
     544         8439 :             term_start_lsn: Lsn(0),
     545         8439 :             state,
     546         8439 :             wal_store,
     547         8439 :             node_id,
     548         8439 :         })
     549         8439 :     }
     550              : 
     551              :     /// Get history of term switches for the available WAL
     552         4145 :     fn get_term_history(&self) -> TermHistory {
     553         4145 :         self.state
     554         4145 :             .acceptor_state
     555         4145 :             .term_history
     556         4145 :             .up_to(self.flush_lsn())
     557         4145 :     }
     558              : 
     559           43 :     pub fn get_last_log_term(&self) -> Term {
     560           43 :         self.state
     561           43 :             .acceptor_state
     562           43 :             .get_last_log_term(self.flush_lsn())
     563           43 :     }
     564              : 
     565              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     566        18334 :     pub fn flush_lsn(&self) -> Lsn {
     567        18334 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     568        18334 :     }
     569              : 
     570              :     /// Process message from proposer and possibly form reply. Concurrent
     571              :     /// callers must exclude each other.
     572        30587 :     pub async fn process_msg(
     573        30587 :         &mut self,
     574        30587 :         msg: &ProposerAcceptorMessage,
     575        30587 :     ) -> Result<Option<AcceptorProposerMessage>> {
     576        30587 :         match msg {
     577        19090 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     578         3308 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     579          837 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     580            4 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     581            4 :                 self.handle_append_request(msg, true).await
     582              :             }
     583         3952 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     584         3952 :                 self.handle_append_request(msg, false).await
     585              :             }
     586         3396 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     587              :         }
     588        30587 :     }
     589              : 
     590              :     /// Handle initial message from proposer: check its sanity and send my
     591              :     /// current term.
     592        19090 :     async fn handle_greeting(
     593        19090 :         &mut self,
     594        19090 :         msg: &ProposerGreeting,
     595        19090 :     ) -> Result<Option<AcceptorProposerMessage>> {
     596        19090 :         // Check protocol compatibility
     597        19090 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     598            0 :             bail!(
     599            0 :                 "incompatible protocol version {}, expected {}",
     600            0 :                 msg.protocol_version,
     601            0 :                 SK_PROTOCOL_VERSION
     602            0 :             );
     603        19090 :         }
     604        19090 :         /* Postgres major version mismatch is treated as fatal error
     605        19090 :          * because safekeepers parse WAL headers and the format
     606        19090 :          * may change between versions.
     607        19090 :          */
     608        19090 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     609            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     610              :         {
     611            0 :             bail!(
     612            0 :                 "incompatible server version {}, expected {}",
     613            0 :                 msg.pg_version,
     614            0 :                 self.state.server.pg_version
     615            0 :             );
     616        19090 :         }
     617        19090 : 
     618        19090 :         if msg.tenant_id != self.state.tenant_id {
     619            0 :             bail!(
     620            0 :                 "invalid tenant ID, got {}, expected {}",
     621            0 :                 msg.tenant_id,
     622            0 :                 self.state.tenant_id
     623            0 :             );
     624        19090 :         }
     625        19090 :         if msg.timeline_id != self.state.timeline_id {
     626            0 :             bail!(
     627            0 :                 "invalid timeline ID, got {}, expected {}",
     628            0 :                 msg.timeline_id,
     629            0 :                 self.state.timeline_id
     630            0 :             );
     631        19090 :         }
     632        19090 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     633            0 :             bail!(
     634            0 :                 "invalid wal_seg_size, got {}, expected {}",
     635            0 :                 msg.wal_seg_size,
     636            0 :                 self.state.server.wal_seg_size
     637            0 :             );
     638        19090 :         }
     639        19090 : 
     640        19090 :         // system_id will be updated on mismatch
     641        19090 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     642        19090 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     643            0 :             if self.state.server.system_id != 0 {
     644            0 :                 warn!(
     645            0 :                     "unexpected system ID arrived, got {}, expected {}",
     646            0 :                     msg.system_id, self.state.server.system_id
     647              :                 );
     648            0 :             }
     649              : 
     650            0 :             let mut state = self.state.start_change();
     651            0 :             state.server.system_id = msg.system_id;
     652            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     653            0 :                 state.server.pg_version = msg.pg_version;
     654            0 :             }
     655            0 :             self.state.finish_change(&state).await?;
     656        19090 :         }
     657              : 
     658        19090 :         info!(
     659            0 :             "processed greeting from walproposer {}, sending term {:?}",
     660         2416 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     661            0 :             self.state.acceptor_state.term
     662              :         );
     663        19090 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     664        19090 :             term: self.state.acceptor_state.term,
     665        19090 :             node_id: self.node_id,
     666        19090 :         })))
     667        19090 :     }
     668              : 
     669              :     /// Give vote for the given term, if we haven't done that previously.
     670         3308 :     async fn handle_vote_request(
     671         3308 :         &mut self,
     672         3308 :         msg: &VoteRequest,
     673         3308 :     ) -> Result<Option<AcceptorProposerMessage>> {
     674         3308 :         // Once voted, we won't accept data from older proposers; flush
     675         3308 :         // everything we've already received so that new proposer starts
     676         3308 :         // streaming at end of our WAL, without overlap. Currently we truncate
     677         3308 :         // WAL at streaming point, so this avoids truncating already committed
     678         3308 :         // WAL.
     679         3308 :         //
     680         3308 :         // TODO: it would be smoother to not truncate committed piece at
     681         3308 :         // handle_elected instead. Currently not a big deal, as proposer is the
     682         3308 :         // only source of WAL; with peer2peer recovery it would be more
     683         3308 :         // important.
     684         3308 :         self.wal_store.flush_wal().await?;
     685              :         // initialize with refusal
     686         3308 :         let mut resp = VoteResponse {
     687         3308 :             term: self.state.acceptor_state.term,
     688         3308 :             vote_given: false as u64,
     689         3308 :             flush_lsn: self.flush_lsn(),
     690         3308 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
     691         3308 :             term_history: self.get_term_history(),
     692         3308 :             timeline_start_lsn: self.state.timeline_start_lsn,
     693         3308 :         };
     694         3308 :         if self.state.acceptor_state.term < msg.term {
     695         3132 :             let mut state = self.state.start_change();
     696         3132 :             state.acceptor_state.term = msg.term;
     697         3132 :             // persist vote before sending it out
     698         3132 :             self.state.finish_change(&state).await?;
     699              : 
     700         3132 :             resp.term = self.state.acceptor_state.term;
     701         3132 :             resp.vote_given = true as u64;
     702          176 :         }
     703         3308 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     704         3308 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     705         3308 :     }
     706              : 
     707              :     /// Form AppendResponse from current state.
     708         3399 :     fn append_response(&self) -> AppendResponse {
     709         3399 :         let ar = AppendResponse {
     710         3399 :             term: self.state.acceptor_state.term,
     711         3399 :             flush_lsn: self.flush_lsn(),
     712         3399 :             commit_lsn: self.state.commit_lsn,
     713         3399 :             // will be filled by the upper code to avoid bothering safekeeper
     714         3399 :             hs_feedback: HotStandbyFeedback::empty(),
     715         3399 :             pageserver_feedback: None,
     716         3399 :         };
     717         3399 :         trace!("formed AppendResponse {:?}", ar);
     718         3399 :         ar
     719         3399 :     }
     720              : 
     721          837 :     async fn handle_elected(
     722          837 :         &mut self,
     723          837 :         msg: &ProposerElected,
     724          837 :     ) -> Result<Option<AcceptorProposerMessage>> {
     725          837 :         let _timer = MISC_OPERATION_SECONDS
     726          837 :             .with_label_values(&["handle_elected"])
     727          837 :             .start_timer();
     728          837 : 
     729          837 :         info!(
     730            0 :             "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
     731            0 :             msg,
     732            0 :             self.state.acceptor_state.term,
     733            0 :             self.get_last_log_term(),
     734            0 :             self.flush_lsn()
     735              :         );
     736          837 :         if self.state.acceptor_state.term < msg.term {
     737            5 :             let mut state = self.state.start_change();
     738            5 :             state.acceptor_state.term = msg.term;
     739            5 :             self.state.finish_change(&state).await?;
     740          832 :         }
     741              : 
     742              :         // If our term is higher, ignore the message (next feedback will inform the compute)
     743          837 :         if self.state.acceptor_state.term > msg.term {
     744            0 :             return Ok(None);
     745          837 :         }
     746          837 : 
     747          837 :         // Before truncating WAL check-cross the check divergence point received
     748          837 :         // from the walproposer.
     749          837 :         let sk_th = self.get_term_history();
     750          837 :         let last_common_point = match TermHistory::find_highest_common_point(
     751          837 :             &msg.term_history,
     752          837 :             &sk_th,
     753          837 :             self.flush_lsn(),
     754          837 :         ) {
     755              :             // No common point. Expect streaming from the beginning of the
     756              :             // history like walproposer while we don't have proper init.
     757          171 :             None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
     758          171 :                 "empty walproposer term history {:?}",
     759          171 :                 msg.term_history
     760          171 :             ))?,
     761          666 :             Some(lcp) => lcp,
     762              :         };
     763              :         // This is expected to happen in a rare race when another connection
     764              :         // from the same walproposer writes + flushes WAL after this connection
     765              :         // sent flush_lsn in VoteRequest; for instance, very late
     766              :         // ProposerElected message delivery after another connection was
     767              :         // established and wrote WAL. In such cases error is transient;
     768              :         // reconnection makes safekeeper send newest term history and flush_lsn
     769              :         // and walproposer recalculates the streaming point. OTOH repeating
     770              :         // error indicates a serious bug.
     771          837 :         if last_common_point.lsn != msg.start_streaming_at {
     772            0 :             bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
     773            0 :                     last_common_point, msg.start_streaming_at,
     774            0 :                     self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
     775            0 :             );
     776          837 :         }
     777          837 : 
     778          837 :         // We are also expected to never attempt to truncate committed data.
     779          837 :         assert!(
     780          837 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
     781            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
     782            0 :             msg.start_streaming_at, self.state.inmem.commit_lsn,
     783            0 :             self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
     784              :         );
     785              : 
     786              :         // Before first WAL write initialize its segment. It makes first segment
     787              :         // pg_waldump'able because stream from compute doesn't include its
     788              :         // segment and page headers.
     789              :         //
     790              :         // If we fail before first WAL write flush this action would be
     791              :         // repeated, that's ok because it is idempotent.
     792          837 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
     793          170 :             self.wal_store
     794          170 :                 .initialize_first_segment(msg.start_streaming_at)
     795          170 :                 .await?;
     796          667 :         }
     797              : 
     798              :         // truncate wal, update the LSNs
     799          837 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     800              : 
     801              :         // and now adopt term history from proposer
     802              :         {
     803          837 :             let mut state = self.state.start_change();
     804          837 : 
     805          837 :             // Here we learn initial LSN for the first time, set fields
     806          837 :             // interested in that.
     807          837 : 
     808          837 :             if state.timeline_start_lsn == Lsn(0) {
     809              :                 // Remember point where WAL begins globally.
     810          170 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     811          170 :                 info!(
     812            0 :                     "setting timeline_start_lsn to {:?}",
     813              :                     state.timeline_start_lsn
     814              :                 );
     815          667 :             }
     816          837 :             if state.peer_horizon_lsn == Lsn(0) {
     817          170 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
     818          170 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
     819          170 :                 state.peer_horizon_lsn = msg.timeline_start_lsn;
     820          667 :             }
     821          837 :             if state.local_start_lsn == Lsn(0) {
     822          170 :                 state.local_start_lsn = msg.start_streaming_at;
     823          170 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     824          667 :             }
     825              :             // Initializing commit_lsn before acking first flushed record is
     826              :             // important to let find_end_of_wal skip the hole in the beginning
     827              :             // of the first segment.
     828              :             //
     829              :             // NB: on new clusters, this happens at the same time as
     830              :             // timeline_start_lsn initialization, it is taken outside to provide
     831              :             // upgrade.
     832          837 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
     833          837 : 
     834          837 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     835          837 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
     836          837 :             // similar for remote_consistent_lsn
     837          837 :             state.remote_consistent_lsn =
     838          837 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
     839          837 : 
     840          837 :             state.acceptor_state.term_history = msg.term_history.clone();
     841          837 :             self.state.finish_change(&state).await?;
     842              :         }
     843              : 
     844          837 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     845              : 
     846              :         // Cache LSN where term starts to immediately fsync control file with
     847              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
     848              :         // persisted commit_lsn on majority of safekeepers aligns.
     849          837 :         self.term_start_lsn = match msg.term_history.0.last() {
     850            0 :             None => bail!("proposer elected with empty term history"),
     851          837 :             Some(term_lsn_start) => term_lsn_start.lsn,
     852          837 :         };
     853          837 : 
     854          837 :         Ok(None)
     855          837 :     }
     856              : 
     857              :     /// Advance commit_lsn taking into account what we have locally.
     858              :     ///
     859              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     860              :     /// already been done outside.
     861         2609 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     862         2609 :         // Both peers and walproposer communicate this value, we might already
     863         2609 :         // have a fresher (higher) version.
     864         2609 :         candidate = max(candidate, self.state.inmem.commit_lsn);
     865         2609 :         let commit_lsn = min(candidate, self.flush_lsn());
     866         2609 :         assert!(
     867         2609 :             commit_lsn >= self.state.inmem.commit_lsn,
     868            0 :             "commit_lsn monotonicity violated: old={} new={}",
     869              :             self.state.inmem.commit_lsn,
     870              :             commit_lsn
     871              :         );
     872              : 
     873         2609 :         self.state.inmem.commit_lsn = commit_lsn;
     874         2609 : 
     875         2609 :         // If new commit_lsn reached term switch, force sync of control
     876         2609 :         // file: walproposer in sync mode is very interested when this
     877         2609 :         // happens. Note: this is for sync-safekeepers mode only, as
     878         2609 :         // otherwise commit_lsn might jump over term_start_lsn.
     879         2609 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
     880          124 :             self.state.flush().await?;
     881         2485 :         }
     882              : 
     883         2609 :         Ok(())
     884         2609 :     }
     885              : 
     886              :     /// Handle request to append WAL.
     887              :     #[allow(clippy::comparison_chain)]
     888         3956 :     async fn handle_append_request(
     889         3956 :         &mut self,
     890         3956 :         msg: &AppendRequest,
     891         3956 :         require_flush: bool,
     892         3956 :     ) -> Result<Option<AcceptorProposerMessage>> {
     893         3956 :         if self.state.acceptor_state.term < msg.h.term {
     894            0 :             bail!("got AppendRequest before ProposerElected");
     895         3956 :         }
     896         3956 : 
     897         3956 :         // If our term is higher, immediately refuse the message.
     898         3956 :         if self.state.acceptor_state.term > msg.h.term {
     899            0 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     900            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     901         3956 :         }
     902         3956 : 
     903         3956 :         // Disallow any non-sequential writes, which can result in gaps or
     904         3956 :         // overwrites. If we need to move the pointer, ProposerElected message
     905         3956 :         // should have truncated WAL first accordingly. Note that the first
     906         3956 :         // condition (WAL rewrite) is quite expected in real world; it happens
     907         3956 :         // when walproposer reconnects to safekeeper and writes some more data
     908         3956 :         // while first connection still gets some packets later. It might be
     909         3956 :         // better to not log this as error! above.
     910         3956 :         let write_lsn = self.wal_store.write_lsn();
     911         3956 :         let flush_lsn = self.wal_store.flush_lsn();
     912         3956 :         if write_lsn > msg.h.begin_lsn {
     913            1 :             bail!(
     914            1 :                 "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
     915            1 :                 write_lsn,
     916            1 :                 msg.h.begin_lsn
     917            1 :             );
     918         3955 :         }
     919         3955 :         if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
     920            0 :             bail!(
     921            0 :                 "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
     922            0 :                 write_lsn,
     923            0 :                 msg.h.begin_lsn,
     924            0 :             );
     925         3955 :         }
     926         3955 : 
     927         3955 :         // Now we know that we are in the same term as the proposer,
     928         3955 :         // processing the message.
     929         3955 : 
     930         3955 :         self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
     931         3955 : 
     932         3955 :         // do the job
     933         3955 :         if !msg.wal_data.is_empty() {
     934         1428 :             self.wal_store
     935         1428 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     936         1428 :                 .await?;
     937         2527 :         }
     938              : 
     939              :         // flush wal to the disk, if required
     940         3955 :         if require_flush {
     941            3 :             self.wal_store.flush_wal().await?;
     942         3952 :         }
     943              : 
     944              :         // Update commit_lsn. It will be flushed to the control file regularly by the timeline
     945              :         // manager, off of the WAL ingest hot path.
     946         3955 :         if msg.h.commit_lsn != Lsn(0) {
     947         2609 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     948         1346 :         }
     949              :         // Value calculated by walproposer can always lag:
     950              :         // - safekeepers can forget inmem value and send to proposer lower
     951              :         //   persisted one on restart;
     952              :         // - if we make safekeepers always send persistent value,
     953              :         //   any compute restart would pull it down.
     954              :         // Thus, take max before adopting.
     955         3955 :         self.state.inmem.peer_horizon_lsn =
     956         3955 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
     957         3955 : 
     958         3955 :         trace!(
     959            0 :             "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
     960            0 :             msg.wal_data.len(),
     961              :             msg.h.begin_lsn,
     962              :             msg.h.end_lsn,
     963              :             msg.h.commit_lsn,
     964              :             msg.h.truncate_lsn,
     965              :             require_flush,
     966              :         );
     967              : 
     968              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
     969              :         // This is the common case for !require_flush, but a flush can still
     970              :         // happen on segment bounds.
     971         3955 :         if !require_flush && flush_lsn == self.flush_lsn() {
     972         3952 :             return Ok(None);
     973            3 :         }
     974            3 : 
     975            3 :         let resp = self.append_response();
     976            3 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
     977         3956 :     }
     978              : 
     979              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
     980         3396 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
     981         3396 :         self.wal_store.flush_wal().await?;
     982         3396 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
     983         3396 :             self.append_response(),
     984         3396 :         )))
     985         3396 :     }
     986              : 
     987              :     /// Update commit_lsn from peer safekeeper data.
     988            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     989            0 :         if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
     990              :             // Note: the check is too restrictive, generally we can update local
     991              :             // commit_lsn if our history matches (is part of) history of advanced
     992              :             // commit_lsn provider.
     993            0 :             if sk_info.last_log_term == self.get_last_log_term() {
     994            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
     995            0 :             }
     996            0 :         }
     997            0 :         Ok(())
     998            0 :     }
     999              : }
    1000              : 
    1001              : #[cfg(test)]
    1002              : mod tests {
    1003              :     use futures::future::BoxFuture;
    1004              : 
    1005              :     use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
    1006              :     use safekeeper_api::{
    1007              :         membership::{Configuration, MemberSet, SafekeeperId},
    1008              :         ServerInfo,
    1009              :     };
    1010              : 
    1011              :     use super::*;
    1012              :     use crate::state::{EvictionState, TimelinePersistentState};
    1013              :     use std::{
    1014              :         ops::Deref,
    1015              :         str::FromStr,
    1016              :         time::{Instant, UNIX_EPOCH},
    1017              :     };
    1018              : 
    1019              :     // fake storage for tests
    1020              :     struct InMemoryState {
    1021              :         persisted_state: TimelinePersistentState,
    1022              :     }
    1023              : 
    1024              :     impl control_file::Storage for InMemoryState {
    1025            5 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
    1026            5 :             self.persisted_state = s.clone();
    1027            5 :             Ok(())
    1028            5 :         }
    1029              : 
    1030            0 :         fn last_persist_at(&self) -> Instant {
    1031            0 :             Instant::now()
    1032            0 :         }
    1033              :     }
    1034              : 
    1035              :     impl Deref for InMemoryState {
    1036              :         type Target = TimelinePersistentState;
    1037              : 
    1038           83 :         fn deref(&self) -> &Self::Target {
    1039           83 :             &self.persisted_state
    1040           83 :         }
    1041              :     }
    1042              : 
    1043            3 :     fn test_sk_state() -> TimelinePersistentState {
    1044            3 :         let mut state = TimelinePersistentState::empty();
    1045            3 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1046            3 :         state.tenant_id = TenantId::from([1u8; 16]);
    1047            3 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1048            3 :         state
    1049            3 :     }
    1050              : 
    1051              :     struct DummyWalStore {
    1052              :         lsn: Lsn,
    1053              :     }
    1054              : 
    1055              :     impl wal_storage::Storage for DummyWalStore {
    1056            4 :         fn write_lsn(&self) -> Lsn {
    1057            4 :             self.lsn
    1058            4 :         }
    1059              : 
    1060           19 :         fn flush_lsn(&self) -> Lsn {
    1061           19 :             self.lsn
    1062           19 :         }
    1063              : 
    1064            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
    1065            2 :             Ok(())
    1066            2 :         }
    1067              : 
    1068            3 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1069            3 :             self.lsn = startpos + buf.len() as u64;
    1070            3 :             Ok(())
    1071            3 :         }
    1072              : 
    1073            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1074            2 :             self.lsn = end_pos;
    1075            2 :             Ok(())
    1076            2 :         }
    1077              : 
    1078            5 :         async fn flush_wal(&mut self) -> Result<()> {
    1079            5 :             Ok(())
    1080            5 :         }
    1081              : 
    1082            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1083            0 :             Box::pin(async { Ok(()) })
    1084            0 :         }
    1085              : 
    1086            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1087            0 :             crate::metrics::WalStorageMetrics::default()
    1088            0 :         }
    1089              :     }
    1090              : 
    1091              :     #[tokio::test]
    1092            1 :     async fn test_voting() {
    1093            1 :         let storage = InMemoryState {
    1094            1 :             persisted_state: test_sk_state(),
    1095            1 :         };
    1096            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1097            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1098            1 : 
    1099            1 :         // check voting for 1 is ok
    1100            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1101            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1102            1 :         match vote_resp.unwrap() {
    1103            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1104            1 :             r => panic!("unexpected response: {:?}", r),
    1105            1 :         }
    1106            1 : 
    1107            1 :         // reboot...
    1108            1 :         let state = sk.state.deref().clone();
    1109            1 :         let storage = InMemoryState {
    1110            1 :             persisted_state: state,
    1111            1 :         };
    1112            1 : 
    1113            1 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1114            1 : 
    1115            1 :         // and ensure voting second time for 1 is not ok
    1116            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1117            1 :         match vote_resp.unwrap() {
    1118            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1119            1 :             r => panic!("unexpected response: {:?}", r),
    1120            1 :         }
    1121            1 :     }
    1122              : 
    1123              :     #[tokio::test]
    1124            1 :     async fn test_last_log_term_switch() {
    1125            1 :         let storage = InMemoryState {
    1126            1 :             persisted_state: test_sk_state(),
    1127            1 :         };
    1128            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1129            1 : 
    1130            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1131            1 : 
    1132            1 :         let mut ar_hdr = AppendRequestHeader {
    1133            1 :             term: 2,
    1134            1 :             term_start_lsn: Lsn(3),
    1135            1 :             begin_lsn: Lsn(1),
    1136            1 :             end_lsn: Lsn(2),
    1137            1 :             commit_lsn: Lsn(0),
    1138            1 :             truncate_lsn: Lsn(0),
    1139            1 :             proposer_uuid: [0; 16],
    1140            1 :         };
    1141            1 :         let mut append_request = AppendRequest {
    1142            1 :             h: ar_hdr.clone(),
    1143            1 :             wal_data: Bytes::from_static(b"b"),
    1144            1 :         };
    1145            1 : 
    1146            1 :         let pem = ProposerElected {
    1147            1 :             term: 2,
    1148            1 :             start_streaming_at: Lsn(1),
    1149            1 :             term_history: TermHistory(vec![
    1150            1 :                 TermLsn {
    1151            1 :                     term: 1,
    1152            1 :                     lsn: Lsn(1),
    1153            1 :                 },
    1154            1 :                 TermLsn {
    1155            1 :                     term: 2,
    1156            1 :                     lsn: Lsn(3),
    1157            1 :                 },
    1158            1 :             ]),
    1159            1 :             timeline_start_lsn: Lsn(1),
    1160            1 :         };
    1161            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1162            1 :             .await
    1163            1 :             .unwrap();
    1164            1 : 
    1165            1 :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1166            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1167            1 :             .await
    1168            1 :             .unwrap();
    1169            1 :         assert_eq!(sk.get_last_log_term(), 1);
    1170            1 : 
    1171            1 :         // but record at term_start_lsn does the switch
    1172            1 :         ar_hdr.begin_lsn = Lsn(2);
    1173            1 :         ar_hdr.end_lsn = Lsn(3);
    1174            1 :         append_request = AppendRequest {
    1175            1 :             h: ar_hdr,
    1176            1 :             wal_data: Bytes::from_static(b"b"),
    1177            1 :         };
    1178            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1179            1 :             .await
    1180            1 :             .unwrap();
    1181            1 :         assert_eq!(sk.get_last_log_term(), 2);
    1182            1 :     }
    1183              : 
    1184              :     #[tokio::test]
    1185            1 :     async fn test_non_consecutive_write() {
    1186            1 :         let storage = InMemoryState {
    1187            1 :             persisted_state: test_sk_state(),
    1188            1 :         };
    1189            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1190            1 : 
    1191            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1192            1 : 
    1193            1 :         let pem = ProposerElected {
    1194            1 :             term: 1,
    1195            1 :             start_streaming_at: Lsn(1),
    1196            1 :             term_history: TermHistory(vec![TermLsn {
    1197            1 :                 term: 1,
    1198            1 :                 lsn: Lsn(1),
    1199            1 :             }]),
    1200            1 :             timeline_start_lsn: Lsn(1),
    1201            1 :         };
    1202            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1203            1 :             .await
    1204            1 :             .unwrap();
    1205            1 : 
    1206            1 :         let ar_hdr = AppendRequestHeader {
    1207            1 :             term: 1,
    1208            1 :             term_start_lsn: Lsn(3),
    1209            1 :             begin_lsn: Lsn(1),
    1210            1 :             end_lsn: Lsn(2),
    1211            1 :             commit_lsn: Lsn(0),
    1212            1 :             truncate_lsn: Lsn(0),
    1213            1 :             proposer_uuid: [0; 16],
    1214            1 :         };
    1215            1 :         let append_request = AppendRequest {
    1216            1 :             h: ar_hdr.clone(),
    1217            1 :             wal_data: Bytes::from_static(b"b"),
    1218            1 :         };
    1219            1 : 
    1220            1 :         // do write ending at 2, it should be ok
    1221            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1222            1 :             .await
    1223            1 :             .unwrap();
    1224            1 :         let mut ar_hrd2 = ar_hdr.clone();
    1225            1 :         ar_hrd2.begin_lsn = Lsn(4);
    1226            1 :         ar_hrd2.end_lsn = Lsn(5);
    1227            1 :         let append_request = AppendRequest {
    1228            1 :             h: ar_hdr,
    1229            1 :             wal_data: Bytes::from_static(b"b"),
    1230            1 :         };
    1231            1 :         // and now starting at 4, it must fail
    1232            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1233            1 :             .await
    1234            1 :             .unwrap_err();
    1235            1 :     }
    1236              : 
    1237              :     #[test]
    1238            1 :     fn test_find_highest_common_point_none() {
    1239            1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1240            1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1241            1 :         assert_eq!(
    1242            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1243            1 :             None
    1244            1 :         );
    1245            1 :     }
    1246              : 
    1247              :     #[test]
    1248            1 :     fn test_find_highest_common_point_middle() {
    1249            1 :         let prop_th = TermHistory(vec![
    1250            1 :             (1, Lsn(10)).into(),
    1251            1 :             (2, Lsn(20)).into(),
    1252            1 :             (4, Lsn(40)).into(),
    1253            1 :         ]);
    1254            1 :         let sk_th = TermHistory(vec![
    1255            1 :             (1, Lsn(10)).into(),
    1256            1 :             (2, Lsn(20)).into(),
    1257            1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1258            1 :         ]);
    1259            1 :         assert_eq!(
    1260            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1261            1 :             Some(TermLsn {
    1262            1 :                 term: 2,
    1263            1 :                 lsn: Lsn(30),
    1264            1 :             })
    1265            1 :         );
    1266            1 :     }
    1267              : 
    1268              :     #[test]
    1269            1 :     fn test_find_highest_common_point_sk_end() {
    1270            1 :         let prop_th = TermHistory(vec![
    1271            1 :             (1, Lsn(10)).into(),
    1272            1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1273            1 :             (4, Lsn(40)).into(),
    1274            1 :         ]);
    1275            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1276            1 :         assert_eq!(
    1277            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1278            1 :             Some(TermLsn {
    1279            1 :                 term: 2,
    1280            1 :                 lsn: Lsn(32),
    1281            1 :             })
    1282            1 :         );
    1283            1 :     }
    1284              : 
    1285              :     #[test]
    1286            1 :     fn test_find_highest_common_point_walprop() {
    1287            1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1288            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1289            1 :         assert_eq!(
    1290            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1291            1 :             Some(TermLsn {
    1292            1 :                 term: 2,
    1293            1 :                 lsn: Lsn(32),
    1294            1 :             })
    1295            1 :         );
    1296            1 :     }
    1297              : 
    1298              :     #[test]
    1299            1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1300            1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1301            1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1302            1 :         let state = TimelinePersistentState {
    1303            1 :             tenant_id,
    1304            1 :             timeline_id,
    1305            1 :             mconf: Configuration {
    1306            1 :                 generation: 42,
    1307            1 :                 members: MemberSet::new(vec![SafekeeperId {
    1308            1 :                     id: NodeId(1),
    1309            1 :                     host: "hehe.org".to_owned(),
    1310            1 :                     pg_port: 5432,
    1311            1 :                 }])
    1312            1 :                 .expect("duplicate member"),
    1313            1 :                 new_members: None,
    1314            1 :             },
    1315            1 :             acceptor_state: AcceptorState {
    1316            1 :                 term: 42,
    1317            1 :                 term_history: TermHistory(vec![TermLsn {
    1318            1 :                     lsn: Lsn(0x1),
    1319            1 :                     term: 41,
    1320            1 :                 }]),
    1321            1 :             },
    1322            1 :             server: ServerInfo {
    1323            1 :                 pg_version: 14,
    1324            1 :                 system_id: 0x1234567887654321,
    1325            1 :                 wal_seg_size: 0x12345678,
    1326            1 :             },
    1327            1 :             proposer_uuid: {
    1328            1 :                 let mut arr = timeline_id.as_arr();
    1329            1 :                 arr.reverse();
    1330            1 :                 arr
    1331            1 :             },
    1332            1 :             timeline_start_lsn: Lsn(0x12345600),
    1333            1 :             local_start_lsn: Lsn(0x12),
    1334            1 :             commit_lsn: Lsn(1234567800),
    1335            1 :             backup_lsn: Lsn(1234567300),
    1336            1 :             peer_horizon_lsn: Lsn(9999999),
    1337            1 :             remote_consistent_lsn: Lsn(1234560000),
    1338            1 :             partial_backup: crate::wal_backup_partial::State::default(),
    1339            1 :             eviction_state: EvictionState::Present,
    1340            1 :             creation_ts: UNIX_EPOCH,
    1341            1 :         };
    1342            1 : 
    1343            1 :         let ser = state.ser().unwrap();
    1344            1 : 
    1345            1 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1346            1 : 
    1347            1 :         assert_eq!(deser, state);
    1348            1 :     }
    1349              : }
        

Generated by: LCOV version 2.1-beta