LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 17080b14f46954d6812ea0a7dad4b2247e0840a8.info Lines: 71.6 % 1046 749
Test Date: 2025-07-08 18:30:10 Functions: 71.2 % 177 126

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use std::cmp::{max, min};
       4              : use std::fmt;
       5              : use std::io::Read;
       6              : use std::str::FromStr;
       7              : 
       8              : use anyhow::{Context, Result, bail};
       9              : use byteorder::{LittleEndian, ReadBytesExt};
      10              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      11              : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
      12              : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
      13              : use pq_proto::SystemId;
      14              : use safekeeper_api::membership::{
      15              :     INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
      16              : };
      17              : use safekeeper_api::models::HotStandbyFeedback;
      18              : use safekeeper_api::{Term, membership};
      19              : use serde::{Deserialize, Serialize};
      20              : use storage_broker::proto::SafekeeperTimelineInfo;
      21              : use tracing::*;
      22              : use utils::bin_ser::LeSer;
      23              : use utils::id::{NodeId, TenantId, TimelineId};
      24              : use utils::lsn::Lsn;
      25              : use utils::pageserver_feedback::PageserverFeedback;
      26              : 
      27              : use crate::metrics::MISC_OPERATION_SECONDS;
      28              : use crate::state::TimelineState;
      29              : use crate::{control_file, wal_storage};
      30              : 
      31              : pub const SK_PROTO_VERSION_2: u32 = 2;
      32              : pub const SK_PROTO_VERSION_3: u32 = 3;
      33              : pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
      34              : 
      35            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      36              : pub struct TermLsn {
      37              :     pub term: Term,
      38              :     pub lsn: Lsn,
      39              : }
      40              : 
      41              : // Creation from tuple provides less typing (e.g. for unit tests).
      42              : impl From<(Term, Lsn)> for TermLsn {
      43         1279 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      44         1279 :         TermLsn {
      45         1279 :             term: pair.0,
      46         1279 :             lsn: pair.1,
      47         1279 :         }
      48         1279 :     }
      49              : }
      50              : 
      51            0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      52              : pub struct TermHistory(pub Vec<TermLsn>);
      53              : 
      54              : impl TermHistory {
      55         1435 :     pub fn empty() -> TermHistory {
      56         1435 :         TermHistory(Vec::new())
      57         1435 :     }
      58              : 
      59              :     // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
      60          780 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      61          780 :         let n_entries = bytes
      62          780 :             .get_u32_f()
      63          780 :             .with_context(|| "TermHistory misses len")?;
      64          780 :         let mut res = Vec::with_capacity(n_entries as usize);
      65         7709 :         for i in 0..n_entries {
      66         7709 :             let term = bytes
      67         7709 :                 .get_u64_f()
      68         7709 :                 .with_context(|| format!("TermHistory pos {i} misses term"))?;
      69         7709 :             let lsn = bytes
      70         7709 :                 .get_u64_f()
      71         7709 :                 .with_context(|| format!("TermHistory pos {i} misses lsn"))?
      72         7709 :                 .into();
      73         7709 :             res.push(TermLsn { term, lsn })
      74              :         }
      75          780 :         Ok(TermHistory(res))
      76          780 :     }
      77              : 
      78              :     // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
      79              :     // TODO remove once v2 protocol is fully dropped.
      80            0 :     pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
      81            0 :         if bytes.remaining() < 4 {
      82            0 :             bail!("TermHistory misses len");
      83            0 :         }
      84            0 :         let n_entries = bytes.get_u32_le();
      85            0 :         let mut res = Vec::with_capacity(n_entries as usize);
      86            0 :         for _ in 0..n_entries {
      87            0 :             if bytes.remaining() < 16 {
      88            0 :                 bail!("TermHistory is incomplete");
      89            0 :             }
      90            0 :             res.push(TermLsn {
      91            0 :                 term: bytes.get_u64_le(),
      92            0 :                 lsn: bytes.get_u64_le().into(),
      93            0 :             })
      94              :         }
      95            0 :         Ok(TermHistory(res))
      96            0 :     }
      97              : 
      98              :     /// Return copy of self with switches happening strictly after up_to
      99              :     /// truncated.
     100         5033 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
     101         5033 :         let mut res = Vec::with_capacity(self.0.len());
     102        25722 :         for e in &self.0 {
     103        20693 :             if e.lsn > up_to {
     104            4 :                 break;
     105        20689 :             }
     106        20689 :             res.push(*e);
     107              :         }
     108         5033 :         TermHistory(res)
     109         5033 :     }
     110              : 
     111              :     /// Find point of divergence between leader (walproposer) term history and
     112              :     /// safekeeper. Arguments are not symmetric as proposer history ends at
     113              :     /// +infinity while safekeeper at flush_lsn.
     114              :     /// C version is at walproposer SendProposerElected.
     115          791 :     pub fn find_highest_common_point(
     116          791 :         prop_th: &TermHistory,
     117          791 :         sk_th: &TermHistory,
     118          791 :         sk_wal_end: Lsn,
     119          791 :     ) -> Option<TermLsn> {
     120          791 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     121              : 
     122          791 :         if let Some(sk_th_last) = sk_th.last() {
     123          652 :             assert!(
     124          652 :                 sk_th_last.lsn <= sk_wal_end,
     125            0 :                 "safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
     126              :             );
     127          139 :         }
     128              : 
     129              :         // find last common term, if any...
     130          791 :         let mut last_common_idx = None;
     131         6874 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     132         6874 :             if prop_th[i].term != sk_th[i].term {
     133            5 :                 break;
     134         6869 :             }
     135              :             // If term is the same, LSN must be equal as well.
     136         6869 :             assert!(
     137         6869 :                 prop_th[i].lsn == sk_th[i].lsn,
     138            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     139            0 :                 prop_th[i].term,
     140            0 :                 prop_th[i].lsn,
     141            0 :                 sk_th[i].lsn
     142              :             );
     143         6869 :             last_common_idx = Some(i);
     144              :         }
     145          791 :         let last_common_idx = last_common_idx?;
     146              :         // Now find where it ends at both prop and sk and take min. End of
     147              :         // (common) term is the start of the next except it is the last one;
     148              :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     149              :         // +infinity, so we just take flush_lsn then.
     150          651 :         if last_common_idx == prop_th.len() - 1 {
     151           51 :             Some(TermLsn {
     152           51 :                 term: prop_th[last_common_idx].term,
     153           51 :                 lsn: sk_wal_end,
     154           51 :             })
     155              :         } else {
     156          600 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     157          600 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     158            4 :                 sk_th[last_common_idx + 1].lsn
     159              :             } else {
     160          596 :                 sk_wal_end
     161              :             };
     162          600 :             Some(TermLsn {
     163          600 :                 term: prop_th[last_common_idx].term,
     164          600 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     165          600 :             })
     166              :         }
     167          791 :     }
     168              : }
     169              : 
     170              : /// Display only latest entries for Debug.
     171              : impl fmt::Debug for TermHistory {
     172          340 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     173          340 :         let n_printed = 20;
     174          340 :         write!(
     175          340 :             fmt,
     176          340 :             "{}{:?}",
     177          340 :             if self.0.len() > n_printed { "... " } else { "" },
     178          340 :             self.0
     179          340 :                 .iter()
     180          340 :                 .rev()
     181          340 :                 .take(n_printed)
     182         1265 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     183          340 :                 .collect::<Vec<_>>()
     184              :         )
     185          340 :     }
     186              : }
     187              : 
     188              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     189              : pub type PgUuid = [u8; 16];
     190              : 
     191              : /// Persistent consensus state of the acceptor.
     192            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     193              : pub struct AcceptorState {
     194              :     /// acceptor's last term it voted for (advanced in 1 phase)
     195              :     pub term: Term,
     196              :     /// History of term switches for safekeeper's WAL.
     197              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     198              :     /// from the proposer before recovery.
     199              :     pub term_history: TermHistory,
     200              : }
     201              : 
     202              : impl AcceptorState {
     203              :     /// acceptor's last_log_term is the term of the highest entry in the log
     204         1344 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     205         1344 :         let th = self.term_history.up_to(flush_lsn);
     206         1344 :         match th.0.last() {
     207         1338 :             Some(e) => e.term,
     208            6 :             None => 0,
     209              :         }
     210         1344 :     }
     211              : }
     212              : 
     213              : // protocol messages
     214              : 
     215              : /// Initial Proposer -> Acceptor message
     216            0 : #[derive(Debug, Deserialize)]
     217              : pub struct ProposerGreeting {
     218              :     pub tenant_id: TenantId,
     219              :     pub timeline_id: TimelineId,
     220              :     pub mconf: membership::Configuration,
     221              :     /// Postgres server version
     222              :     pub pg_version: PgVersionId,
     223              :     pub system_id: SystemId,
     224              :     pub wal_seg_size: u32,
     225              : }
     226              : 
     227              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     228            0 : #[derive(Debug, Deserialize)]
     229              : pub struct ProposerGreetingV2 {
     230              :     /// proposer-acceptor protocol version
     231              :     pub protocol_version: u32,
     232              :     /// Postgres server version
     233              :     pub pg_version: PgVersionId,
     234              :     pub proposer_id: PgUuid,
     235              :     pub system_id: SystemId,
     236              :     pub timeline_id: TimelineId,
     237              :     pub tenant_id: TenantId,
     238              :     pub tli: TimeLineID,
     239              :     pub wal_seg_size: u32,
     240              : }
     241              : 
     242              : /// Acceptor -> Proposer initial response: the highest term known to me
     243              : /// (acceptor voted for).
     244              : #[derive(Debug, Serialize)]
     245              : pub struct AcceptorGreeting {
     246              :     node_id: NodeId,
     247              :     mconf: membership::Configuration,
     248              :     term: u64,
     249              : }
     250              : 
     251              : /// Vote request sent from proposer to safekeepers
     252              : #[derive(Debug)]
     253              : pub struct VoteRequest {
     254              :     pub generation: Generation,
     255              :     pub term: Term,
     256              : }
     257              : 
     258              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     259            0 : #[derive(Debug, Deserialize)]
     260              : pub struct VoteRequestV2 {
     261              :     pub term: Term,
     262              : }
     263              : 
     264              : /// Vote itself, sent from safekeeper to proposer
     265              : #[derive(Debug, Serialize)]
     266              : pub struct VoteResponse {
     267              :     generation: Generation, // membership conf generation
     268              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     269              :     vote_given: bool,
     270              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     271              :     // proposer to choose the most advanced one.
     272              :     pub flush_lsn: Lsn,
     273              :     truncate_lsn: Lsn,
     274              :     pub term_history: TermHistory,
     275              : }
     276              : 
     277              : /*
     278              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     279              :  * term history to it.
     280              :  */
     281              : #[derive(Debug, Clone)]
     282              : pub struct ProposerElected {
     283              :     pub generation: Generation, // membership conf generation
     284              :     pub term: Term,
     285              :     pub start_streaming_at: Lsn,
     286              :     pub term_history: TermHistory,
     287              : }
     288              : 
     289              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     290              : /// communicates commit_lsn.
     291              : #[derive(Debug)]
     292              : pub struct AppendRequest {
     293              :     pub h: AppendRequestHeader,
     294              :     pub wal_data: Bytes,
     295              : }
     296            0 : #[derive(Debug, Clone, Deserialize)]
     297              : pub struct AppendRequestHeader {
     298              :     pub generation: Generation, // membership conf generation
     299              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     300              :     pub term: Term,
     301              :     /// start position of message in WAL
     302              :     pub begin_lsn: Lsn,
     303              :     /// end position of message in WAL
     304              :     pub end_lsn: Lsn,
     305              :     /// LSN committed by quorum of safekeepers
     306              :     pub commit_lsn: Lsn,
     307              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     308              :     pub truncate_lsn: Lsn,
     309              : }
     310              : 
     311              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     312            0 : #[derive(Debug, Clone, Deserialize)]
     313              : pub struct AppendRequestHeaderV2 {
     314              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     315              :     pub term: Term,
     316              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     317              :     // switch can be taken from ProposerElected (as well as from term history).
     318              :     pub term_start_lsn: Lsn,
     319              :     /// start position of message in WAL
     320              :     pub begin_lsn: Lsn,
     321              :     /// end position of message in WAL
     322              :     pub end_lsn: Lsn,
     323              :     /// LSN committed by quorum of safekeepers
     324              :     pub commit_lsn: Lsn,
     325              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     326              :     pub truncate_lsn: Lsn,
     327              :     // only for logging/debugging
     328              :     pub proposer_uuid: PgUuid,
     329              : }
     330              : 
     331              : /// Report safekeeper state to proposer
     332              : #[derive(Debug, Serialize, Clone)]
     333              : pub struct AppendResponse {
     334              :     // Membership conf generation. Not strictly required because on mismatch
     335              :     // connection is reset, but let's sanity check it.
     336              :     generation: Generation,
     337              :     // Current term of the safekeeper; if it is higher than proposer's, the
     338              :     // compute is out of date.
     339              :     pub term: Term,
     340              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     341              :     // term history this value comes, either checking history directly or
     342              :     // observing term being set to one for which WAL truncation is known to have
     343              :     // happened.
     344              :     pub flush_lsn: Lsn,
     345              :     // We report back our awareness about which WAL is committed, as this is
     346              :     // a criterion for walproposer --sync mode exit
     347              :     pub commit_lsn: Lsn,
     348              :     pub hs_feedback: HotStandbyFeedback,
     349              :     pub pageserver_feedback: Option<PageserverFeedback>,
     350              : }
     351              : 
     352              : impl AppendResponse {
     353            0 :     fn term_only(generation: Generation, term: Term) -> AppendResponse {
     354            0 :         AppendResponse {
     355            0 :             generation,
     356            0 :             term,
     357            0 :             flush_lsn: Lsn(0),
     358            0 :             commit_lsn: Lsn(0),
     359            0 :             hs_feedback: HotStandbyFeedback::empty(),
     360            0 :             pageserver_feedback: None,
     361            0 :         }
     362            0 :     }
     363              : }
     364              : 
     365              : /// Proposer -> Acceptor messages
     366              : #[derive(Debug)]
     367              : pub enum ProposerAcceptorMessage {
     368              :     Greeting(ProposerGreeting),
     369              :     VoteRequest(VoteRequest),
     370              :     Elected(ProposerElected),
     371              :     AppendRequest(AppendRequest),
     372              :     NoFlushAppendRequest(AppendRequest),
     373              :     FlushWAL,
     374              : }
     375              : 
     376              : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
     377              : /// All reads are in network (big endian) order.
     378              : trait BytesF {
     379              :     fn get_u8_f(&mut self) -> Result<u8>;
     380              :     fn get_u16_f(&mut self) -> Result<u16>;
     381              :     fn get_u32_f(&mut self) -> Result<u32>;
     382              :     fn get_u64_f(&mut self) -> Result<u64>;
     383              : }
     384              : 
     385              : impl BytesF for Bytes {
     386        25867 :     fn get_u8_f(&mut self) -> Result<u8> {
     387        25867 :         if self.is_empty() {
     388            0 :             bail!("no bytes left, expected 1");
     389        25867 :         }
     390        25867 :         Ok(self.get_u8())
     391        25867 :     }
     392            0 :     fn get_u16_f(&mut self) -> Result<u16> {
     393            0 :         if self.remaining() < 2 {
     394            0 :             bail!("no bytes left, expected 2");
     395            0 :         }
     396            0 :         Ok(self.get_u16())
     397            0 :     }
     398       102555 :     fn get_u32_f(&mut self) -> Result<u32> {
     399       102555 :         if self.remaining() < 4 {
     400            0 :             bail!("only {} bytes left, expected 4", self.remaining());
     401       102555 :         }
     402       102555 :         Ok(self.get_u32())
     403       102555 :     }
     404        54905 :     fn get_u64_f(&mut self) -> Result<u64> {
     405        54905 :         if self.remaining() < 8 {
     406            0 :             bail!("only {} bytes left, expected 8", self.remaining());
     407        54905 :         }
     408        54905 :         Ok(self.get_u64())
     409        54905 :     }
     410              : }
     411              : 
     412              : impl ProposerAcceptorMessage {
     413              :     /// Read cstring from Bytes.
     414        37954 :     fn get_cstr(buf: &mut Bytes) -> Result<String> {
     415        37954 :         let pos = buf
     416        37954 :             .iter()
     417      1252482 :             .position(|x| *x == 0)
     418        37954 :             .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
     419        37954 :         let result = buf.split_to(pos);
     420        37954 :         buf.advance(1); // drop the null terminator
     421        37954 :         match std::str::from_utf8(&result) {
     422        37954 :             Ok(s) => Ok(s.to_string()),
     423            0 :             Err(e) => bail!("invalid utf8 in cstring: {}", e),
     424              :         }
     425        37954 :     }
     426              : 
     427              :     /// Read membership::Configuration from Bytes.
     428        18977 :     fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
     429        18977 :         let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
     430        18977 :         let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
     431              :         // Main member set must have at least someone in valid configuration.
     432              :         // Empty conf is allowed until we fully migrate.
     433        18977 :         if generation != INVALID_GENERATION && members_len == 0 {
     434            0 :             bail!("empty members_len");
     435        18977 :         }
     436        18977 :         let mut members = MemberSet::empty();
     437        18977 :         for i in 0..members_len {
     438            0 :             let id = buf
     439            0 :                 .get_u64_f()
     440            0 :                 .with_context(|| format!("reading member {i} node_id"))?;
     441            0 :             let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
     442            0 :             let pg_port = buf
     443            0 :                 .get_u16_f()
     444            0 :                 .with_context(|| format!("reading member {i} port"))?;
     445            0 :             let sk = SafekeeperId {
     446            0 :                 id: NodeId(id),
     447            0 :                 host,
     448            0 :                 pg_port,
     449            0 :             };
     450            0 :             members.add(sk)?;
     451              :         }
     452        18977 :         let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
     453              :         // Non joint conf.
     454        18977 :         if new_members_len == 0 {
     455        18977 :             Ok(membership::Configuration {
     456        18977 :                 generation,
     457        18977 :                 members,
     458        18977 :                 new_members: None,
     459        18977 :             })
     460              :         } else {
     461            0 :             let mut new_members = MemberSet::empty();
     462            0 :             for i in 0..new_members_len {
     463            0 :                 let id = buf
     464            0 :                     .get_u64_f()
     465            0 :                     .with_context(|| format!("reading new member {i} node_id"))?;
     466            0 :                 let host =
     467            0 :                     Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
     468            0 :                 let pg_port = buf
     469            0 :                     .get_u16_f()
     470            0 :                     .with_context(|| format!("reading new member {i} port"))?;
     471            0 :                 let sk = SafekeeperId {
     472            0 :                     id: NodeId(id),
     473            0 :                     host,
     474            0 :                     pg_port,
     475            0 :                 };
     476            0 :                 new_members.add(sk)?;
     477              :             }
     478            0 :             Ok(membership::Configuration {
     479            0 :                 generation,
     480            0 :                 members,
     481            0 :                 new_members: Some(new_members),
     482            0 :             })
     483              :         }
     484        18977 :     }
     485              : 
     486              :     /// Parse proposer message.
     487        25867 :     pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
     488        25867 :         if proto_version == SK_PROTO_VERSION_3 {
     489        25867 :             if msg_bytes.is_empty() {
     490            0 :                 bail!("ProposerAcceptorMessage is not complete: missing tag");
     491        25867 :             }
     492        25867 :             let tag = msg_bytes.get_u8_f().with_context(|| {
     493            0 :                 "ProposerAcceptorMessage is not complete: missing tag".to_string()
     494            0 :             })? as char;
     495        25867 :             match tag {
     496              :                 'g' => {
     497        18977 :                     let tenant_id_str =
     498        18977 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
     499        18977 :                     let tenant_id = TenantId::from_str(&tenant_id_str)?;
     500        18977 :                     let timeline_id_str =
     501        18977 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
     502        18977 :                     let timeline_id = TimelineId::from_str(&timeline_id_str)?;
     503        18977 :                     let mconf = Self::get_mconf(&mut msg_bytes)?;
     504        18977 :                     let pg_version = msg_bytes
     505        18977 :                         .get_u32_f()
     506        18977 :                         .with_context(|| "reading pg_version")?;
     507        18977 :                     let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
     508        18977 :                     let wal_seg_size = msg_bytes
     509        18977 :                         .get_u32_f()
     510        18977 :                         .with_context(|| "reading wal_seg_size")?;
     511        18977 :                     let g = ProposerGreeting {
     512        18977 :                         tenant_id,
     513        18977 :                         timeline_id,
     514        18977 :                         mconf,
     515        18977 :                         pg_version: PgVersionId::from_full_pg_version(pg_version),
     516        18977 :                         system_id,
     517        18977 :                         wal_seg_size,
     518        18977 :                     };
     519        18977 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     520              :                 }
     521              :                 'v' => {
     522         2900 :                     let generation = Generation::new(
     523         2900 :                         msg_bytes
     524         2900 :                             .get_u32_f()
     525         2900 :                             .with_context(|| "reading generation")?,
     526              :                     );
     527         2900 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     528         2900 :                     let v = VoteRequest { generation, term };
     529         2900 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     530              :                 }
     531              :                 'e' => {
     532          780 :                     let generation = Generation::new(
     533          780 :                         msg_bytes
     534          780 :                             .get_u32_f()
     535          780 :                             .with_context(|| "reading generation")?,
     536              :                     );
     537          780 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     538          780 :                     let start_streaming_at: Lsn = msg_bytes
     539          780 :                         .get_u64_f()
     540          780 :                         .with_context(|| "reading start_streaming_at")?
     541          780 :                         .into();
     542          780 :                     let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     543          780 :                     let msg = ProposerElected {
     544          780 :                         generation,
     545          780 :                         term,
     546          780 :                         start_streaming_at,
     547          780 :                         term_history,
     548          780 :                     };
     549          780 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     550              :                 }
     551              :                 'a' => {
     552         3210 :                     let generation = Generation::new(
     553         3210 :                         msg_bytes
     554         3210 :                             .get_u32_f()
     555         3210 :                             .with_context(|| "reading generation")?,
     556              :                     );
     557         3210 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     558         3210 :                     let begin_lsn: Lsn = msg_bytes
     559         3210 :                         .get_u64_f()
     560         3210 :                         .with_context(|| "reading begin_lsn")?
     561         3210 :                         .into();
     562         3210 :                     let end_lsn: Lsn = msg_bytes
     563         3210 :                         .get_u64_f()
     564         3210 :                         .with_context(|| "reading end_lsn")?
     565         3210 :                         .into();
     566         3210 :                     let commit_lsn: Lsn = msg_bytes
     567         3210 :                         .get_u64_f()
     568         3210 :                         .with_context(|| "reading commit_lsn")?
     569         3210 :                         .into();
     570         3210 :                     let truncate_lsn: Lsn = msg_bytes
     571         3210 :                         .get_u64_f()
     572         3210 :                         .with_context(|| "reading truncate_lsn")?
     573         3210 :                         .into();
     574         3210 :                     let hdr = AppendRequestHeader {
     575         3210 :                         generation,
     576         3210 :                         term,
     577         3210 :                         begin_lsn,
     578         3210 :                         end_lsn,
     579         3210 :                         commit_lsn,
     580         3210 :                         truncate_lsn,
     581         3210 :                     };
     582         3210 :                     let rec_size = hdr
     583         3210 :                         .end_lsn
     584         3210 :                         .checked_sub(hdr.begin_lsn)
     585         3210 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     586              :                         .0 as usize;
     587         3210 :                     if rec_size > MAX_SEND_SIZE {
     588            0 :                         bail!(
     589            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     590              :                             MAX_SEND_SIZE
     591              :                         );
     592         3210 :                     }
     593         3210 :                     if msg_bytes.remaining() < rec_size {
     594            0 :                         bail!(
     595            0 :                             "reading WAL: only {} bytes left, wanted {}",
     596            0 :                             msg_bytes.remaining(),
     597              :                             rec_size
     598              :                         );
     599         3210 :                     }
     600         3210 :                     let wal_data = msg_bytes.copy_to_bytes(rec_size);
     601         3210 :                     let msg = AppendRequest { h: hdr, wal_data };
     602              : 
     603         3210 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     604              :                 }
     605            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     606              :             }
     607            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     608              :             // xxx using Reader is inefficient but easy to work with bincode
     609            0 :             let mut stream = msg_bytes.reader();
     610              :             // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     611            0 :             let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     612            0 :             match tag {
     613              :                 'g' => {
     614            0 :                     let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
     615            0 :                     let g = ProposerGreeting {
     616            0 :                         tenant_id: msgv2.tenant_id,
     617            0 :                         timeline_id: msgv2.timeline_id,
     618            0 :                         mconf: membership::Configuration {
     619            0 :                             generation: INVALID_GENERATION,
     620            0 :                             members: MemberSet::empty(),
     621            0 :                             new_members: None,
     622            0 :                         },
     623            0 :                         pg_version: msgv2.pg_version,
     624            0 :                         system_id: msgv2.system_id,
     625            0 :                         wal_seg_size: msgv2.wal_seg_size,
     626            0 :                     };
     627            0 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     628              :                 }
     629              :                 'v' => {
     630            0 :                     let msg = VoteRequestV2::des_from(&mut stream)?;
     631            0 :                     let v = VoteRequest {
     632            0 :                         generation: INVALID_GENERATION,
     633            0 :                         term: msg.term,
     634            0 :                     };
     635            0 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     636              :                 }
     637              :                 'e' => {
     638            0 :                     let mut msg_bytes = stream.into_inner();
     639            0 :                     if msg_bytes.remaining() < 16 {
     640            0 :                         bail!("ProposerElected message is not complete");
     641            0 :                     }
     642            0 :                     let term = msg_bytes.get_u64_le();
     643            0 :                     let start_streaming_at = msg_bytes.get_u64_le().into();
     644            0 :                     let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
     645            0 :                     if msg_bytes.remaining() < 8 {
     646            0 :                         bail!("ProposerElected message is not complete");
     647            0 :                     }
     648            0 :                     let _timeline_start_lsn = msg_bytes.get_u64_le();
     649            0 :                     let msg = ProposerElected {
     650            0 :                         generation: INVALID_GENERATION,
     651            0 :                         term,
     652            0 :                         start_streaming_at,
     653            0 :                         term_history,
     654            0 :                     };
     655            0 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     656              :                 }
     657              :                 'a' => {
     658              :                     // read header followed by wal data
     659            0 :                     let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
     660            0 :                     let hdr = AppendRequestHeader {
     661            0 :                         generation: INVALID_GENERATION,
     662            0 :                         term: hdrv2.term,
     663            0 :                         begin_lsn: hdrv2.begin_lsn,
     664            0 :                         end_lsn: hdrv2.end_lsn,
     665            0 :                         commit_lsn: hdrv2.commit_lsn,
     666            0 :                         truncate_lsn: hdrv2.truncate_lsn,
     667            0 :                     };
     668            0 :                     let rec_size = hdr
     669            0 :                         .end_lsn
     670            0 :                         .checked_sub(hdr.begin_lsn)
     671            0 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     672              :                         .0 as usize;
     673            0 :                     if rec_size > MAX_SEND_SIZE {
     674            0 :                         bail!(
     675            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     676              :                             MAX_SEND_SIZE
     677              :                         );
     678            0 :                     }
     679              : 
     680            0 :                     let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     681            0 :                     stream.read_exact(&mut wal_data_vec)?;
     682            0 :                     let wal_data = Bytes::from(wal_data_vec);
     683              : 
     684            0 :                     let msg = AppendRequest { h: hdr, wal_data };
     685              : 
     686            0 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     687              :                 }
     688            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     689              :             }
     690              :         } else {
     691            0 :             bail!("unsupported protocol version {}", proto_version);
     692              :         }
     693        25867 :     }
     694              : 
     695              :     /// The memory size of the message, including byte slices.
     696          620 :     pub fn size(&self) -> usize {
     697              :         const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
     698              : 
     699              :         // For most types, the size is just the base enum size including the nested structs. Some
     700              :         // types also contain byte slices; add them.
     701              :         //
     702              :         // We explicitly list all fields, to draw attention here when new fields are added.
     703          620 :         let mut size = BASE_SIZE;
     704          620 :         size += match self {
     705            0 :             Self::Greeting(_) => 0,
     706              : 
     707            0 :             Self::VoteRequest(_) => 0,
     708              : 
     709            0 :             Self::Elected(_) => 0,
     710              : 
     711              :             Self::AppendRequest(AppendRequest {
     712              :                 h:
     713              :                     AppendRequestHeader {
     714              :                         generation: _,
     715              :                         term: _,
     716              :                         begin_lsn: _,
     717              :                         end_lsn: _,
     718              :                         commit_lsn: _,
     719              :                         truncate_lsn: _,
     720              :                     },
     721          620 :                 wal_data,
     722          620 :             }) => wal_data.len(),
     723              : 
     724              :             Self::NoFlushAppendRequest(AppendRequest {
     725              :                 h:
     726              :                     AppendRequestHeader {
     727              :                         generation: _,
     728              :                         term: _,
     729              :                         begin_lsn: _,
     730              :                         end_lsn: _,
     731              :                         commit_lsn: _,
     732              :                         truncate_lsn: _,
     733              :                     },
     734            0 :                 wal_data,
     735            0 :             }) => wal_data.len(),
     736              : 
     737            0 :             Self::FlushWAL => 0,
     738              :         };
     739              : 
     740          620 :         size
     741          620 :     }
     742              : }
     743              : 
     744              : /// Acceptor -> Proposer messages
     745              : #[derive(Debug)]
     746              : pub enum AcceptorProposerMessage {
     747              :     Greeting(AcceptorGreeting),
     748              :     VoteResponse(VoteResponse),
     749              :     AppendResponse(AppendResponse),
     750              : }
     751              : 
     752              : impl AcceptorProposerMessage {
     753            0 :     fn put_cstr(buf: &mut BytesMut, s: &str) {
     754            0 :         buf.put_slice(s.as_bytes());
     755            0 :         buf.put_u8(0); // null terminator
     756            0 :     }
     757              : 
     758              :     /// Serialize membership::Configuration into buf.
     759        18977 :     fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
     760        18977 :         buf.put_u32(mconf.generation.into_inner());
     761        18977 :         buf.put_u32(mconf.members.m.len() as u32);
     762        18977 :         for sk in &mconf.members.m {
     763            0 :             buf.put_u64(sk.id.0);
     764            0 :             Self::put_cstr(buf, &sk.host);
     765            0 :             buf.put_u16(sk.pg_port);
     766            0 :         }
     767        18977 :         if let Some(ref new_members) = mconf.new_members {
     768            0 :             buf.put_u32(new_members.m.len() as u32);
     769            0 :             for sk in &new_members.m {
     770            0 :                 buf.put_u64(sk.id.0);
     771            0 :                 Self::put_cstr(buf, &sk.host);
     772            0 :                 buf.put_u16(sk.pg_port);
     773            0 :             }
     774        18977 :         } else {
     775        18977 :             buf.put_u32(0);
     776        18977 :         }
     777        18977 :     }
     778              : 
     779              :     /// Serialize acceptor -> proposer message.
     780        24557 :     pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
     781        24557 :         if proto_version == SK_PROTO_VERSION_3 {
     782        24557 :             match self {
     783        18977 :                 AcceptorProposerMessage::Greeting(msg) => {
     784        18977 :                     buf.put_u8(b'g');
     785        18977 :                     buf.put_u64(msg.node_id.0);
     786        18977 :                     Self::serialize_mconf(buf, &msg.mconf);
     787        18977 :                     buf.put_u64(msg.term)
     788              :                 }
     789         2900 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     790         2900 :                     buf.put_u8(b'v');
     791         2900 :                     buf.put_u32(msg.generation.into_inner());
     792         2900 :                     buf.put_u64(msg.term);
     793         2900 :                     buf.put_u8(msg.vote_given as u8);
     794         2900 :                     buf.put_u64(msg.flush_lsn.into());
     795         2900 :                     buf.put_u64(msg.truncate_lsn.into());
     796         2900 :                     buf.put_u32(msg.term_history.0.len() as u32);
     797        15190 :                     for e in &msg.term_history.0 {
     798        12290 :                         buf.put_u64(e.term);
     799        12290 :                         buf.put_u64(e.lsn.into());
     800        12290 :                     }
     801              :                 }
     802         2680 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     803         2680 :                     buf.put_u8(b'a');
     804         2680 :                     buf.put_u32(msg.generation.into_inner());
     805         2680 :                     buf.put_u64(msg.term);
     806         2680 :                     buf.put_u64(msg.flush_lsn.into());
     807         2680 :                     buf.put_u64(msg.commit_lsn.into());
     808         2680 :                     buf.put_i64(msg.hs_feedback.ts);
     809         2680 :                     buf.put_u64(msg.hs_feedback.xmin);
     810         2680 :                     buf.put_u64(msg.hs_feedback.catalog_xmin);
     811              : 
     812              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     813              :                     // if it is not present.
     814         2680 :                     if let Some(ref msg) = msg.pageserver_feedback {
     815            0 :                         msg.serialize(buf);
     816         2680 :                     }
     817              :                 }
     818              :             }
     819        24557 :             Ok(())
     820              :         // TODO remove 3 after converting all msgs
     821            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     822            0 :             match self {
     823            0 :                 AcceptorProposerMessage::Greeting(msg) => {
     824            0 :                     buf.put_u64_le('g' as u64);
     825            0 :                     // v2 didn't have mconf and fields were reordered
     826            0 :                     buf.put_u64_le(msg.term);
     827            0 :                     buf.put_u64_le(msg.node_id.0);
     828            0 :                 }
     829            0 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     830              :                     // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
     831            0 :                     buf.put_u64_le('v' as u64);
     832            0 :                     buf.put_u64_le(msg.term);
     833            0 :                     buf.put_u64_le(msg.vote_given as u64);
     834            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     835            0 :                     buf.put_u64_le(msg.truncate_lsn.into());
     836            0 :                     buf.put_u32_le(msg.term_history.0.len() as u32);
     837            0 :                     for e in &msg.term_history.0 {
     838            0 :                         buf.put_u64_le(e.term);
     839            0 :                         buf.put_u64_le(e.lsn.into());
     840            0 :                     }
     841              :                     // removed timeline_start_lsn
     842            0 :                     buf.put_u64_le(0);
     843              :                 }
     844            0 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     845              :                     // v2 didn't have generation
     846            0 :                     buf.put_u64_le('a' as u64);
     847            0 :                     buf.put_u64_le(msg.term);
     848            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     849            0 :                     buf.put_u64_le(msg.commit_lsn.into());
     850            0 :                     buf.put_i64_le(msg.hs_feedback.ts);
     851            0 :                     buf.put_u64_le(msg.hs_feedback.xmin);
     852            0 :                     buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     853              : 
     854              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     855              :                     // if it is not present.
     856            0 :                     if let Some(ref msg) = msg.pageserver_feedback {
     857            0 :                         msg.serialize(buf);
     858            0 :                     }
     859              :                 }
     860              :             }
     861            0 :             Ok(())
     862              :         } else {
     863            0 :             bail!("unsupported protocol version {}", proto_version);
     864              :         }
     865        24557 :     }
     866              : }
     867              : 
     868              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     869              : /// It controls all WAL disk writes and updates of control file.
     870              : ///
     871              : /// Currently safekeeper processes:
     872              : /// - messages from compute (proposers) and provides replies
     873              : /// - messages from broker peers
     874              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     875              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     876              :     /// determines last_log_term switch point.
     877              :     pub term_start_lsn: Lsn,
     878              : 
     879              :     pub state: TimelineState<CTRL>, // persistent state storage
     880              :     pub wal_store: WAL,
     881              : 
     882              :     node_id: NodeId, // safekeeper's node id
     883              : }
     884              : 
     885              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     886              : where
     887              :     CTRL: control_file::Storage,
     888              :     WAL: wal_storage::Storage,
     889              : {
     890              :     /// Accepts a control file storage containing the safekeeper state.
     891              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     892              :     /// and `server` (`wal_seg_size` inside it) fields.
     893         8816 :     pub fn new(
     894         8816 :         state: TimelineState<CTRL>,
     895         8816 :         wal_store: WAL,
     896         8816 :         node_id: NodeId,
     897         8816 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     898         8816 :         if state.tenant_id == TenantId::from([0u8; 16])
     899         8816 :             || state.timeline_id == TimelineId::from([0u8; 16])
     900              :         {
     901            0 :             bail!(
     902            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     903            0 :                 state.tenant_id,
     904            0 :                 state.timeline_id
     905              :             );
     906            9 :         }
     907              : 
     908         8816 :         Ok(SafeKeeper {
     909         8816 :             term_start_lsn: Lsn(0),
     910         8816 :             state,
     911         8816 :             wal_store,
     912         8816 :             node_id,
     913         8816 :         })
     914            9 :     }
     915              : 
     916              :     /// Get history of term switches for the available WAL
     917         3689 :     fn get_term_history(&self) -> TermHistory {
     918         3689 :         self.state
     919         3689 :             .acceptor_state
     920         3689 :             .term_history
     921         3689 :             .up_to(self.flush_lsn())
     922            9 :     }
     923              : 
     924           43 :     pub fn get_last_log_term(&self) -> Term {
     925           43 :         self.state
     926           43 :             .acceptor_state
     927           43 :             .get_last_log_term(self.flush_lsn())
     928            2 :     }
     929              : 
     930              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     931        17115 :     pub fn flush_lsn(&self) -> Lsn {
     932        17115 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     933         1883 :     }
     934              : 
     935              :     /// Process message from proposer and possibly form reply. Concurrent
     936              :     /// callers must exclude each other.
     937        29803 :     pub async fn process_msg(
     938        29803 :         &mut self,
     939        29803 :         msg: &ProposerAcceptorMessage,
     940        29803 :     ) -> Result<Option<AcceptorProposerMessage>> {
     941        29803 :         match msg {
     942        18977 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     943         2903 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     944          788 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     945            5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     946            5 :                 self.handle_append_request(msg, true).await
     947              :             }
     948         3830 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     949         3830 :                 self.handle_append_request(msg, false).await
     950              :             }
     951         3300 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     952              :         }
     953         1256 :     }
     954              : 
     955              :     /// Handle initial message from proposer: check its sanity and send my
     956              :     /// current term.
     957        18977 :     async fn handle_greeting(
     958        18977 :         &mut self,
     959        18977 :         msg: &ProposerGreeting,
     960        18977 :     ) -> Result<Option<AcceptorProposerMessage>> {
     961              :         /* Postgres major version mismatch is treated as fatal error
     962              :          * because safekeepers parse WAL headers and the format
     963              :          * may change between versions.
     964              :          */
     965        18977 :         if PgMajorVersion::try_from(msg.pg_version)?
     966        18977 :             != PgMajorVersion::try_from(self.state.server.pg_version)?
     967            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     968              :         {
     969            0 :             bail!(
     970            0 :                 "incompatible server version {}, expected {}",
     971              :                 msg.pg_version,
     972            0 :                 self.state.server.pg_version
     973              :             );
     974            0 :         }
     975              : 
     976        18977 :         if msg.tenant_id != self.state.tenant_id {
     977            0 :             bail!(
     978            0 :                 "invalid tenant ID, got {}, expected {}",
     979              :                 msg.tenant_id,
     980            0 :                 self.state.tenant_id
     981              :             );
     982            0 :         }
     983        18977 :         if msg.timeline_id != self.state.timeline_id {
     984            0 :             bail!(
     985            0 :                 "invalid timeline ID, got {}, expected {}",
     986              :                 msg.timeline_id,
     987            0 :                 self.state.timeline_id
     988              :             );
     989            0 :         }
     990        18977 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     991            0 :             bail!(
     992            0 :                 "invalid wal_seg_size, got {}, expected {}",
     993              :                 msg.wal_seg_size,
     994            0 :                 self.state.server.wal_seg_size
     995              :             );
     996            0 :         }
     997              : 
     998              :         // system_id will be updated on mismatch
     999              :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
    1000        18977 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
    1001            0 :             if self.state.server.system_id != 0 {
    1002            0 :                 warn!(
    1003            0 :                     "unexpected system ID arrived, got {}, expected {}",
    1004            0 :                     msg.system_id, self.state.server.system_id
    1005              :                 );
    1006            0 :             }
    1007              : 
    1008            0 :             let mut state = self.state.start_change();
    1009            0 :             state.server.system_id = msg.system_id;
    1010            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
    1011            0 :                 state.server.pg_version = msg.pg_version;
    1012            0 :             }
    1013            0 :             self.state.finish_change(&state).await?;
    1014            0 :         }
    1015              : 
    1016              :         // Switch into conf given by proposer conf if it is higher.
    1017        18977 :         self.state.membership_switch(msg.mconf.clone()).await?;
    1018              : 
    1019        18977 :         let apg = AcceptorGreeting {
    1020        18977 :             node_id: self.node_id,
    1021        18977 :             mconf: self.state.mconf.clone(),
    1022        18977 :             term: self.state.acceptor_state.term,
    1023        18977 :         };
    1024        18977 :         info!(
    1025            0 :             "processed greeting {:?} from walproposer, sending {:?}",
    1026              :             msg, apg
    1027              :         );
    1028        18977 :         Ok(Some(AcceptorProposerMessage::Greeting(apg)))
    1029            0 :     }
    1030              : 
    1031              :     /// Give vote for the given term, if we haven't done that previously.
    1032         2903 :     async fn handle_vote_request(
    1033         2903 :         &mut self,
    1034         2903 :         msg: &VoteRequest,
    1035         2903 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1036         2903 :         if self.state.mconf.generation != msg.generation {
    1037            1 :             bail!(
    1038            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1039              :                 msg,
    1040            1 :                 self.state.mconf.generation
    1041              :             );
    1042            2 :         }
    1043              :         // Once voted, we won't accept data from older proposers; flush
    1044              :         // everything we've already received so that new proposer starts
    1045              :         // streaming at end of our WAL, without overlap. WAL is truncated at
    1046              :         // streaming point and commit_lsn may be advanced from peers, so this
    1047              :         // also avoids possible spurious attempt to truncate committed WAL.
    1048         2902 :         self.wal_store.flush_wal().await?;
    1049              :         // initialize with refusal
    1050         2902 :         let mut resp = VoteResponse {
    1051         2902 :             generation: self.state.mconf.generation,
    1052         2902 :             term: self.state.acceptor_state.term,
    1053         2902 :             vote_given: false,
    1054         2902 :             flush_lsn: self.flush_lsn(),
    1055         2902 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
    1056         2902 :             term_history: self.get_term_history(),
    1057         2902 :         };
    1058         2902 :         if self.state.acceptor_state.term < msg.term {
    1059         2773 :             let mut state = self.state.start_change();
    1060         2773 :             state.acceptor_state.term = msg.term;
    1061              :             // persist vote before sending it out
    1062         2773 :             self.state.finish_change(&state).await?;
    1063              : 
    1064         2773 :             resp.term = self.state.acceptor_state.term;
    1065         2773 :             resp.vote_given = true;
    1066            1 :         }
    1067         2902 :         info!("processed {:?}: sending {:?}", msg, &resp);
    1068         2902 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
    1069            3 :     }
    1070              : 
    1071              :     /// Form AppendResponse from current state.
    1072         3303 :     fn append_response(&self) -> AppendResponse {
    1073         3303 :         let ar = AppendResponse {
    1074         3303 :             generation: self.state.mconf.generation,
    1075         3303 :             term: self.state.acceptor_state.term,
    1076         3303 :             flush_lsn: self.flush_lsn(),
    1077         3303 :             commit_lsn: self.state.commit_lsn,
    1078         3303 :             // will be filled by the upper code to avoid bothering safekeeper
    1079         3303 :             hs_feedback: HotStandbyFeedback::empty(),
    1080         3303 :             pageserver_feedback: None,
    1081         3303 :         };
    1082         3303 :         trace!("formed AppendResponse {:?}", ar);
    1083         3303 :         ar
    1084          623 :     }
    1085              : 
    1086          788 :     async fn handle_elected(
    1087          788 :         &mut self,
    1088          788 :         msg: &ProposerElected,
    1089          788 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1090          788 :         let _timer = MISC_OPERATION_SECONDS
    1091          788 :             .with_label_values(&["handle_elected"])
    1092          788 :             .start_timer();
    1093              : 
    1094          788 :         info!(
    1095            0 :             "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
    1096              :             msg,
    1097            0 :             self.state.acceptor_state.term,
    1098            0 :             self.get_last_log_term(),
    1099            0 :             self.flush_lsn()
    1100              :         );
    1101          788 :         if self.state.mconf.generation != msg.generation {
    1102            1 :             bail!(
    1103            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1104              :                 msg,
    1105            1 :                 self.state.mconf.generation
    1106              :             );
    1107            7 :         }
    1108          787 :         if self.state.acceptor_state.term < msg.term {
    1109            7 :             let mut state = self.state.start_change();
    1110            7 :             state.acceptor_state.term = msg.term;
    1111            7 :             self.state.finish_change(&state).await?;
    1112            0 :         }
    1113              : 
    1114              :         // If our term is higher, ignore the message (next feedback will inform the compute)
    1115          787 :         if self.state.acceptor_state.term > msg.term {
    1116            0 :             return Ok(None);
    1117            7 :         }
    1118              : 
    1119              :         // Before truncating WAL check-cross the check divergence point received
    1120              :         // from the walproposer.
    1121          787 :         let sk_th = self.get_term_history();
    1122          787 :         let last_common_point = match TermHistory::find_highest_common_point(
    1123          787 :             &msg.term_history,
    1124          787 :             &sk_th,
    1125          787 :             self.flush_lsn(),
    1126          787 :         ) {
    1127              :             // No common point. Expect streaming from the beginning of the
    1128              :             // history like walproposer while we don't have proper init.
    1129          139 :             None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
    1130          139 :                 "empty walproposer term history {:?}",
    1131              :                 msg.term_history
    1132            0 :             ))?,
    1133          648 :             Some(lcp) => lcp,
    1134              :         };
    1135              :         // This is expected to happen in a rare race when another connection
    1136              :         // from the same walproposer writes + flushes WAL after this connection
    1137              :         // sent flush_lsn in VoteRequest; for instance, very late
    1138              :         // ProposerElected message delivery after another connection was
    1139              :         // established and wrote WAL. In such cases error is transient;
    1140              :         // reconnection makes safekeeper send newest term history and flush_lsn
    1141              :         // and walproposer recalculates the streaming point. OTOH repeating
    1142              :         // error indicates a serious bug.
    1143          787 :         if last_common_point.lsn != msg.start_streaming_at {
    1144            0 :             bail!(
    1145            0 :                 "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1146              :                 last_common_point,
    1147              :                 msg.start_streaming_at,
    1148            0 :                 self.state.acceptor_state.term,
    1149              :                 sk_th,
    1150            0 :                 self.flush_lsn(),
    1151              :                 msg.term_history,
    1152              :             );
    1153            7 :         }
    1154              : 
    1155              :         // We are also expected to never attempt to truncate committed data.
    1156          787 :         assert!(
    1157          787 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
    1158            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1159              :             msg.start_streaming_at,
    1160              :             self.state.inmem.commit_lsn,
    1161            0 :             self.state.acceptor_state.term,
    1162              :             sk_th,
    1163            0 :             self.flush_lsn(),
    1164              :             msg.term_history,
    1165              :         );
    1166              : 
    1167              :         // Before first WAL write initialize its segment. It makes first segment
    1168              :         // pg_waldump'able because stream from compute doesn't include its
    1169              :         // segment and page headers.
    1170              :         //
    1171              :         // If we fail before first WAL write flush this action would be
    1172              :         // repeated, that's ok because it is idempotent.
    1173          787 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
    1174          139 :             self.wal_store
    1175          139 :                 .initialize_first_segment(msg.start_streaming_at)
    1176          139 :                 .await?;
    1177            0 :         }
    1178              : 
    1179              :         // truncate wal, update the LSNs
    1180          787 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
    1181              : 
    1182              :         // and now adopt term history from proposer
    1183              :         {
    1184          787 :             let mut state = self.state.start_change();
    1185              : 
    1186              :             // Here we learn initial LSN for the first time, set fields
    1187              :             // interested in that.
    1188              : 
    1189          787 :             if let Some(start_lsn) = msg.term_history.0.first() {
    1190          787 :                 if state.timeline_start_lsn == Lsn(0) {
    1191              :                     // Remember point where WAL begins globally. In the future it
    1192              :                     // will be intialized immediately on timeline creation.
    1193          139 :                     state.timeline_start_lsn = start_lsn.lsn;
    1194          139 :                     info!(
    1195            0 :                         "setting timeline_start_lsn to {:?}",
    1196              :                         state.timeline_start_lsn
    1197              :                     );
    1198            0 :                 }
    1199            0 :             }
    1200              : 
    1201          787 :             if state.peer_horizon_lsn == Lsn(0) {
    1202          139 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
    1203          139 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
    1204          139 :                 state.peer_horizon_lsn = state.timeline_start_lsn;
    1205          139 :             }
    1206          787 :             if state.local_start_lsn == Lsn(0) {
    1207          139 :                 state.local_start_lsn = msg.start_streaming_at;
    1208          139 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
    1209            0 :             }
    1210              :             // Initializing commit_lsn before acking first flushed record is
    1211              :             // important to let find_end_of_wal skip the hole in the beginning
    1212              :             // of the first segment.
    1213              :             //
    1214              :             // NB: on new clusters, this happens at the same time as
    1215              :             // timeline_start_lsn initialization, it is taken outside to provide
    1216              :             // upgrade.
    1217          787 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
    1218              : 
    1219              :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
    1220          787 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
    1221              :             // similar for remote_consistent_lsn
    1222          787 :             state.remote_consistent_lsn =
    1223          787 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
    1224              : 
    1225          787 :             state.acceptor_state.term_history = msg.term_history.clone();
    1226          787 :             self.state.finish_change(&state).await?;
    1227              :         }
    1228              : 
    1229          787 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
    1230              : 
    1231              :         // Cache LSN where term starts to immediately fsync control file with
    1232              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
    1233              :         // persisted commit_lsn on majority of safekeepers aligns.
    1234          787 :         self.term_start_lsn = match msg.term_history.0.last() {
    1235            0 :             None => bail!("proposer elected with empty term history"),
    1236          787 :             Some(term_lsn_start) => term_lsn_start.lsn,
    1237              :         };
    1238              : 
    1239          787 :         Ok(None)
    1240            8 :     }
    1241              : 
    1242              :     /// Advance commit_lsn taking into account what we have locally.
    1243              :     ///
    1244              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
    1245              :     /// already been done outside.
    1246         2520 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
    1247              :         // Both peers and walproposer communicate this value, we might already
    1248              :         // have a fresher (higher) version.
    1249         2520 :         candidate = max(candidate, self.state.inmem.commit_lsn);
    1250         2520 :         let commit_lsn = min(candidate, self.flush_lsn());
    1251         2520 :         assert!(
    1252         2520 :             commit_lsn >= self.state.inmem.commit_lsn,
    1253            0 :             "commit_lsn monotonicity violated: old={} new={}",
    1254              :             self.state.inmem.commit_lsn,
    1255              :             commit_lsn
    1256              :         );
    1257              : 
    1258         2520 :         self.state.inmem.commit_lsn = commit_lsn;
    1259              : 
    1260              :         // If new commit_lsn reached term switch, force sync of control
    1261              :         // file: walproposer in sync mode is very interested when this
    1262              :         // happens. Note: this is for sync-safekeepers mode only, as
    1263              :         // otherwise commit_lsn might jump over term_start_lsn.
    1264         2520 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
    1265           98 :             self.state.flush().await?;
    1266          620 :         }
    1267              : 
    1268         2520 :         Ok(())
    1269          620 :     }
    1270              : 
    1271              :     /// Handle request to append WAL.
    1272              :     #[allow(clippy::comparison_chain)]
    1273         3835 :     async fn handle_append_request(
    1274         3835 :         &mut self,
    1275         3835 :         msg: &AppendRequest,
    1276         3835 :         require_flush: bool,
    1277         3835 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1278              :         // Refuse message on generation mismatch. On reconnect wp will get full
    1279              :         // configuration from greeting.
    1280         3835 :         if self.state.mconf.generation != msg.h.generation {
    1281            1 :             bail!(
    1282            1 :                 "refusing append request due to generation mismatch: request {}, sk {}",
    1283              :                 msg.h.generation,
    1284            1 :                 self.state.mconf.generation
    1285              :             );
    1286          624 :         }
    1287              : 
    1288         3834 :         if self.state.acceptor_state.term < msg.h.term {
    1289            0 :             bail!("got AppendRequest before ProposerElected");
    1290          624 :         }
    1291              : 
    1292              :         // If our term is higher, immediately refuse the message. Send term only
    1293              :         // response; elected walproposer can never advance the term, so it will
    1294              :         // figure out the refusal from it -- which is important as term change
    1295              :         // should cause not just reconnection but whole walproposer re-election.
    1296         3834 :         if self.state.acceptor_state.term > msg.h.term {
    1297            0 :             let resp = AppendResponse::term_only(
    1298            0 :                 self.state.mconf.generation,
    1299            0 :                 self.state.acceptor_state.term,
    1300              :             );
    1301            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
    1302          624 :         }
    1303              : 
    1304              :         // Disallow any non-sequential writes, which can result in gaps or
    1305              :         // overwrites. If we need to move the pointer, ProposerElected message
    1306              :         // should have truncated WAL first accordingly. Note that the first
    1307              :         // condition (WAL rewrite) is quite expected in real world; it happens
    1308              :         // when walproposer reconnects to safekeeper and writes some more data
    1309              :         // while first connection still gets some packets later. It might be
    1310              :         // better to not log this as error! above.
    1311         3834 :         let write_lsn = self.wal_store.write_lsn();
    1312         3834 :         let flush_lsn = self.wal_store.flush_lsn();
    1313         3834 :         if write_lsn > msg.h.begin_lsn {
    1314            1 :             bail!(
    1315            1 :                 "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
    1316              :                 write_lsn,
    1317              :                 msg.h.begin_lsn
    1318              :             );
    1319          623 :         }
    1320         3833 :         if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
    1321            0 :             bail!(
    1322            0 :                 "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
    1323              :                 write_lsn,
    1324              :                 msg.h.begin_lsn,
    1325              :             );
    1326          623 :         }
    1327              : 
    1328              :         // Now we know that we are in the same term as the proposer, process the
    1329              :         // message.
    1330              : 
    1331              :         // do the job
    1332         3833 :         if !msg.wal_data.is_empty() {
    1333         1409 :             self.wal_store
    1334         1409 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
    1335         1409 :                 .await?;
    1336            0 :         }
    1337              : 
    1338              :         // flush wal to the disk, if required
    1339         3833 :         if require_flush {
    1340            3 :             self.wal_store.flush_wal().await?;
    1341          620 :         }
    1342              : 
    1343              :         // Update commit_lsn. It will be flushed to the control file regularly by the timeline
    1344              :         // manager, off of the WAL ingest hot path.
    1345         3833 :         if msg.h.commit_lsn != Lsn(0) {
    1346         2520 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
    1347            3 :         }
    1348              :         // Value calculated by walproposer can always lag:
    1349              :         // - safekeepers can forget inmem value and send to proposer lower
    1350              :         //   persisted one on restart;
    1351              :         // - if we make safekeepers always send persistent value,
    1352              :         //   any compute restart would pull it down.
    1353              :         // Thus, take max before adopting.
    1354         3833 :         self.state.inmem.peer_horizon_lsn =
    1355         3833 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
    1356              : 
    1357         3833 :         trace!(
    1358            0 :             "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
    1359            0 :             msg.wal_data.len(),
    1360              :             msg.h.begin_lsn,
    1361              :             msg.h.end_lsn,
    1362              :             msg.h.commit_lsn,
    1363              :             msg.h.truncate_lsn,
    1364              :             require_flush,
    1365              :         );
    1366              : 
    1367              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
    1368              :         // This is the common case for !require_flush, but a flush can still
    1369              :         // happen on segment bounds.
    1370         3833 :         if !require_flush && flush_lsn == self.flush_lsn() {
    1371         3830 :             return Ok(None);
    1372            3 :         }
    1373              : 
    1374            3 :         let resp = self.append_response();
    1375            3 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
    1376          625 :     }
    1377              : 
    1378              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
    1379         3300 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
    1380         3300 :         self.wal_store.flush_wal().await?;
    1381         3300 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
    1382         3300 :             self.append_response(),
    1383         3300 :         )))
    1384          620 :     }
    1385              : 
    1386              :     /// Update commit_lsn from peer safekeeper data.
    1387            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
    1388            0 :         if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
    1389              :             // Note: the check is too restrictive, generally we can update local
    1390              :             // commit_lsn if our history matches (is part of) history of advanced
    1391              :             // commit_lsn provider.
    1392            0 :             if sk_info.last_log_term == self.get_last_log_term() {
    1393            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
    1394            0 :             }
    1395            0 :         }
    1396            0 :         Ok(())
    1397            0 :     }
    1398              : }
    1399              : 
    1400              : #[cfg(test)]
    1401              : mod tests {
    1402              :     use std::ops::Deref;
    1403              :     use std::str::FromStr;
    1404              :     use std::time::{Instant, UNIX_EPOCH};
    1405              : 
    1406              :     use futures::future::BoxFuture;
    1407              :     use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
    1408              :     use safekeeper_api::ServerInfo;
    1409              :     use safekeeper_api::membership::{
    1410              :         Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
    1411              :     };
    1412              : 
    1413              :     use super::*;
    1414              :     use crate::state::{EvictionState, TimelinePersistentState};
    1415              : 
    1416              :     // fake storage for tests
    1417              :     struct InMemoryState {
    1418              :         persisted_state: TimelinePersistentState,
    1419              :     }
    1420              : 
    1421              :     impl control_file::Storage for InMemoryState {
    1422            5 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
    1423            5 :             self.persisted_state = s.clone();
    1424            5 :             Ok(())
    1425            5 :         }
    1426              : 
    1427            0 :         fn last_persist_at(&self) -> Instant {
    1428            0 :             Instant::now()
    1429            0 :         }
    1430              :     }
    1431              : 
    1432              :     impl Deref for InMemoryState {
    1433              :         type Target = TimelinePersistentState;
    1434              : 
    1435          100 :         fn deref(&self) -> &Self::Target {
    1436          100 :             &self.persisted_state
    1437          100 :         }
    1438              :     }
    1439              : 
    1440            3 :     fn test_sk_state() -> TimelinePersistentState {
    1441            3 :         let mut state = TimelinePersistentState::empty();
    1442            3 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1443            3 :         state.tenant_id = TenantId::from([1u8; 16]);
    1444            3 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1445            3 :         state
    1446            3 :     }
    1447              : 
    1448              :     struct DummyWalStore {
    1449              :         lsn: Lsn,
    1450              :     }
    1451              : 
    1452              :     impl wal_storage::Storage for DummyWalStore {
    1453            4 :         fn write_lsn(&self) -> Lsn {
    1454            4 :             self.lsn
    1455            4 :         }
    1456              : 
    1457           19 :         fn flush_lsn(&self) -> Lsn {
    1458           19 :             self.lsn
    1459           19 :         }
    1460              : 
    1461            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
    1462            2 :             Ok(())
    1463            2 :         }
    1464              : 
    1465            3 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1466            3 :             self.lsn = startpos + buf.len() as u64;
    1467            3 :             Ok(())
    1468            3 :         }
    1469              : 
    1470            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1471            2 :             self.lsn = end_pos;
    1472            2 :             Ok(())
    1473            2 :         }
    1474              : 
    1475            5 :         async fn flush_wal(&mut self) -> Result<()> {
    1476            5 :             Ok(())
    1477            5 :         }
    1478              : 
    1479            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1480            0 :             Box::pin(async { Ok(()) })
    1481            0 :         }
    1482              : 
    1483            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1484            0 :             crate::metrics::WalStorageMetrics::default()
    1485            0 :         }
    1486              :     }
    1487              : 
    1488              :     #[tokio::test]
    1489            1 :     async fn test_voting() {
    1490            1 :         let storage = InMemoryState {
    1491            1 :             persisted_state: test_sk_state(),
    1492            1 :         };
    1493            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1494            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1495              : 
    1496              :         // Vote with generation mismatch should be rejected.
    1497            1 :         let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1498            1 :             generation: SafekeeperGeneration::new(42),
    1499            1 :             term: 1,
    1500            1 :         });
    1501            1 :         assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
    1502              : 
    1503              :         // check voting for 1 is ok
    1504            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1505            1 :             generation: Generation::new(0),
    1506            1 :             term: 1,
    1507            1 :         });
    1508            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1509            1 :         match vote_resp.unwrap() {
    1510            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
    1511            0 :             r => panic!("unexpected response: {r:?}"),
    1512              :         }
    1513              : 
    1514              :         // reboot...
    1515            1 :         let state = sk.state.deref().clone();
    1516            1 :         let storage = InMemoryState {
    1517            1 :             persisted_state: state,
    1518            1 :         };
    1519              : 
    1520            1 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1521              : 
    1522              :         // and ensure voting second time for 1 is not ok
    1523            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1524            1 :         match vote_resp.unwrap() {
    1525            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
    1526            1 :             r => panic!("unexpected response: {r:?}"),
    1527            1 :         }
    1528            1 :     }
    1529              : 
    1530              :     #[tokio::test]
    1531            1 :     async fn test_last_log_term_switch() {
    1532            1 :         let storage = InMemoryState {
    1533            1 :             persisted_state: test_sk_state(),
    1534            1 :         };
    1535            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1536              : 
    1537            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1538              : 
    1539            1 :         let mut ar_hdr = AppendRequestHeader {
    1540            1 :             generation: Generation::new(0),
    1541            1 :             term: 2,
    1542            1 :             begin_lsn: Lsn(1),
    1543            1 :             end_lsn: Lsn(2),
    1544            1 :             commit_lsn: Lsn(0),
    1545            1 :             truncate_lsn: Lsn(0),
    1546            1 :         };
    1547            1 :         let mut append_request = AppendRequest {
    1548            1 :             h: ar_hdr.clone(),
    1549            1 :             wal_data: Bytes::from_static(b"b"),
    1550            1 :         };
    1551              : 
    1552            1 :         let pem = ProposerElected {
    1553            1 :             generation: Generation::new(0),
    1554            1 :             term: 2,
    1555            1 :             start_streaming_at: Lsn(1),
    1556            1 :             term_history: TermHistory(vec![
    1557            1 :                 TermLsn {
    1558            1 :                     term: 1,
    1559            1 :                     lsn: Lsn(1),
    1560            1 :                 },
    1561            1 :                 TermLsn {
    1562            1 :                     term: 2,
    1563            1 :                     lsn: Lsn(3),
    1564            1 :                 },
    1565            1 :             ]),
    1566            1 :         };
    1567              : 
    1568              :         // check that elected msg with generation mismatch is rejected
    1569            1 :         let mut pem_gen_mismatch = pem.clone();
    1570            1 :         pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1571            1 :         assert!(
    1572            1 :             sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
    1573            1 :                 .await
    1574            1 :                 .is_err()
    1575              :         );
    1576              : 
    1577            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1578            1 :             .await
    1579            1 :             .unwrap();
    1580              : 
    1581              :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1582            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1583            1 :             .await
    1584            1 :             .unwrap();
    1585            1 :         assert_eq!(sk.get_last_log_term(), 1);
    1586              : 
    1587              :         // but record at term_start_lsn does the switch
    1588            1 :         ar_hdr.begin_lsn = Lsn(2);
    1589            1 :         ar_hdr.end_lsn = Lsn(3);
    1590            1 :         append_request = AppendRequest {
    1591            1 :             h: ar_hdr,
    1592            1 :             wal_data: Bytes::from_static(b"b"),
    1593            1 :         };
    1594            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1595            1 :             .await
    1596            1 :             .unwrap();
    1597            1 :         assert_eq!(sk.get_last_log_term(), 2);
    1598            1 :     }
    1599              : 
    1600              :     #[tokio::test]
    1601            1 :     async fn test_non_consecutive_write() {
    1602            1 :         let storage = InMemoryState {
    1603            1 :             persisted_state: test_sk_state(),
    1604            1 :         };
    1605            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1606              : 
    1607            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1608              : 
    1609            1 :         let pem = ProposerElected {
    1610            1 :             generation: Generation::new(0),
    1611            1 :             term: 1,
    1612            1 :             start_streaming_at: Lsn(1),
    1613            1 :             term_history: TermHistory(vec![TermLsn {
    1614            1 :                 term: 1,
    1615            1 :                 lsn: Lsn(1),
    1616            1 :             }]),
    1617            1 :         };
    1618            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1619            1 :             .await
    1620            1 :             .unwrap();
    1621              : 
    1622            1 :         let ar_hdr = AppendRequestHeader {
    1623            1 :             generation: Generation::new(0),
    1624            1 :             term: 1,
    1625            1 :             begin_lsn: Lsn(1),
    1626            1 :             end_lsn: Lsn(2),
    1627            1 :             commit_lsn: Lsn(0),
    1628            1 :             truncate_lsn: Lsn(0),
    1629            1 :         };
    1630            1 :         let append_request = AppendRequest {
    1631            1 :             h: ar_hdr.clone(),
    1632            1 :             wal_data: Bytes::from_static(b"b"),
    1633            1 :         };
    1634              : 
    1635              :         // check that append request with generation mismatch is rejected
    1636            1 :         let mut ar_hdr_gen_mismatch = ar_hdr.clone();
    1637            1 :         ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1638            1 :         let append_request_gen_mismatch = AppendRequest {
    1639            1 :             h: ar_hdr_gen_mismatch,
    1640            1 :             wal_data: Bytes::from_static(b"b"),
    1641            1 :         };
    1642            1 :         assert!(
    1643            1 :             sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
    1644            1 :                 append_request_gen_mismatch
    1645            1 :             ))
    1646            1 :             .await
    1647            1 :             .is_err()
    1648              :         );
    1649              : 
    1650              :         // do write ending at 2, it should be ok
    1651            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1652            1 :             .await
    1653            1 :             .unwrap();
    1654            1 :         let mut ar_hrd2 = ar_hdr.clone();
    1655            1 :         ar_hrd2.begin_lsn = Lsn(4);
    1656            1 :         ar_hrd2.end_lsn = Lsn(5);
    1657            1 :         let append_request = AppendRequest {
    1658            1 :             h: ar_hdr,
    1659            1 :             wal_data: Bytes::from_static(b"b"),
    1660            1 :         };
    1661              :         // and now starting at 4, it must fail
    1662            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1663            1 :             .await
    1664            1 :             .unwrap_err();
    1665            1 :     }
    1666              : 
    1667              :     #[test]
    1668            1 :     fn test_find_highest_common_point_none() {
    1669            1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1670            1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1671            1 :         assert_eq!(
    1672            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1673              :             None
    1674              :         );
    1675            1 :     }
    1676              : 
    1677              :     #[test]
    1678            1 :     fn test_find_highest_common_point_middle() {
    1679            1 :         let prop_th = TermHistory(vec![
    1680            1 :             (1, Lsn(10)).into(),
    1681            1 :             (2, Lsn(20)).into(),
    1682            1 :             (4, Lsn(40)).into(),
    1683            1 :         ]);
    1684            1 :         let sk_th = TermHistory(vec![
    1685            1 :             (1, Lsn(10)).into(),
    1686            1 :             (2, Lsn(20)).into(),
    1687            1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1688            1 :         ]);
    1689            1 :         assert_eq!(
    1690            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1691              :             Some(TermLsn {
    1692              :                 term: 2,
    1693              :                 lsn: Lsn(30),
    1694              :             })
    1695              :         );
    1696            1 :     }
    1697              : 
    1698              :     #[test]
    1699            1 :     fn test_find_highest_common_point_sk_end() {
    1700            1 :         let prop_th = TermHistory(vec![
    1701            1 :             (1, Lsn(10)).into(),
    1702            1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1703            1 :             (4, Lsn(40)).into(),
    1704            1 :         ]);
    1705            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1706            1 :         assert_eq!(
    1707            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1708              :             Some(TermLsn {
    1709              :                 term: 2,
    1710              :                 lsn: Lsn(32),
    1711              :             })
    1712              :         );
    1713            1 :     }
    1714              : 
    1715              :     #[test]
    1716            1 :     fn test_find_highest_common_point_walprop() {
    1717            1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1718            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1719            1 :         assert_eq!(
    1720            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1721              :             Some(TermLsn {
    1722              :                 term: 2,
    1723              :                 lsn: Lsn(32),
    1724              :             })
    1725              :         );
    1726            1 :     }
    1727              : 
    1728              :     #[test]
    1729            1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1730            1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1731            1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1732            1 :         let state = TimelinePersistentState {
    1733            1 :             tenant_id,
    1734            1 :             timeline_id,
    1735            1 :             mconf: Configuration {
    1736            1 :                 generation: SafekeeperGeneration::new(42),
    1737            1 :                 members: MemberSet::new(vec![SafekeeperId {
    1738            1 :                     id: NodeId(1),
    1739            1 :                     host: "hehe.org".to_owned(),
    1740            1 :                     pg_port: 5432,
    1741            1 :                 }])
    1742            1 :                 .expect("duplicate member"),
    1743            1 :                 new_members: None,
    1744            1 :             },
    1745            1 :             acceptor_state: AcceptorState {
    1746            1 :                 term: 42,
    1747            1 :                 term_history: TermHistory(vec![TermLsn {
    1748            1 :                     lsn: Lsn(0x1),
    1749            1 :                     term: 41,
    1750            1 :                 }]),
    1751            1 :             },
    1752            1 :             server: ServerInfo {
    1753            1 :                 pg_version: PgVersionId::from_full_pg_version(140000),
    1754            1 :                 system_id: 0x1234567887654321,
    1755            1 :                 wal_seg_size: 0x12345678,
    1756            1 :             },
    1757            1 :             proposer_uuid: {
    1758            1 :                 let mut arr = timeline_id.as_arr();
    1759            1 :                 arr.reverse();
    1760            1 :                 arr
    1761            1 :             },
    1762            1 :             timeline_start_lsn: Lsn(0x12345600),
    1763            1 :             local_start_lsn: Lsn(0x12),
    1764            1 :             commit_lsn: Lsn(1234567800),
    1765            1 :             backup_lsn: Lsn(1234567300),
    1766            1 :             peer_horizon_lsn: Lsn(9999999),
    1767            1 :             remote_consistent_lsn: Lsn(1234560000),
    1768            1 :             partial_backup: crate::wal_backup_partial::State::default(),
    1769            1 :             eviction_state: EvictionState::Present,
    1770            1 :             creation_ts: UNIX_EPOCH,
    1771            1 :         };
    1772              : 
    1773            1 :         let ser = state.ser().unwrap();
    1774              : 
    1775            1 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1776              : 
    1777            1 :         assert_eq!(deser, state);
    1778            1 :     }
    1779              : }
        

Generated by: LCOV version 2.1-beta