LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 71.6 % 1058 757
Test Date: 2025-07-26 17:20:05 Functions: 70.1 % 177 124

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use std::cmp::{max, min};
       4              : use std::fmt;
       5              : use std::io::Read;
       6              : use std::str::FromStr;
       7              : 
       8              : use anyhow::{Context, Result, bail};
       9              : use byteorder::{LittleEndian, ReadBytesExt};
      10              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      11              : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
      12              : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
      13              : use pq_proto::SystemId;
      14              : use safekeeper_api::membership::{
      15              :     INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
      16              : };
      17              : use safekeeper_api::models::HotStandbyFeedback;
      18              : use safekeeper_api::{Term, membership};
      19              : use serde::{Deserialize, Serialize};
      20              : use storage_broker::proto::SafekeeperTimelineInfo;
      21              : use tracing::*;
      22              : use utils::bin_ser::LeSer;
      23              : use utils::id::{NodeId, TenantId, TimelineId};
      24              : use utils::lsn::Lsn;
      25              : use utils::pageserver_feedback::PageserverFeedback;
      26              : 
      27              : use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
      28              : use crate::state::TimelineState;
      29              : use crate::{control_file, wal_storage};
      30              : 
      31              : pub const SK_PROTO_VERSION_2: u32 = 2;
      32              : pub const SK_PROTO_VERSION_3: u32 = 3;
      33              : pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
      34              : 
      35            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      36              : pub struct TermLsn {
      37              :     pub term: Term,
      38              :     pub lsn: Lsn,
      39              : }
      40              : 
      41              : // Creation from tuple provides less typing (e.g. for unit tests).
      42              : impl From<(Term, Lsn)> for TermLsn {
      43         1277 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      44         1277 :         TermLsn {
      45         1277 :             term: pair.0,
      46         1277 :             lsn: pair.1,
      47         1277 :         }
      48         1277 :     }
      49              : }
      50              : 
      51            0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      52              : pub struct TermHistory(pub Vec<TermLsn>);
      53              : 
      54              : impl TermHistory {
      55         1464 :     pub fn empty() -> TermHistory {
      56         1464 :         TermHistory(Vec::new())
      57         1464 :     }
      58              : 
      59              :     // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
      60         1021 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      61         1021 :         let n_entries = bytes
      62         1021 :             .get_u32_f()
      63         1021 :             .with_context(|| "TermHistory misses len")?;
      64         1021 :         let mut res = Vec::with_capacity(n_entries as usize);
      65        10761 :         for i in 0..n_entries {
      66        10761 :             let term = bytes
      67        10761 :                 .get_u64_f()
      68        10761 :                 .with_context(|| format!("TermHistory pos {i} misses term"))?;
      69        10761 :             let lsn = bytes
      70        10761 :                 .get_u64_f()
      71        10761 :                 .with_context(|| format!("TermHistory pos {i} misses lsn"))?
      72        10761 :                 .into();
      73        10761 :             res.push(TermLsn { term, lsn })
      74              :         }
      75         1021 :         Ok(TermHistory(res))
      76         1021 :     }
      77              : 
      78              :     // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
      79              :     // TODO remove once v2 protocol is fully dropped.
      80            0 :     pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
      81            0 :         if bytes.remaining() < 4 {
      82            0 :             bail!("TermHistory misses len");
      83            0 :         }
      84            0 :         let n_entries = bytes.get_u32_le();
      85            0 :         let mut res = Vec::with_capacity(n_entries as usize);
      86            0 :         for _ in 0..n_entries {
      87            0 :             if bytes.remaining() < 16 {
      88            0 :                 bail!("TermHistory is incomplete");
      89            0 :             }
      90            0 :             res.push(TermLsn {
      91            0 :                 term: bytes.get_u64_le(),
      92            0 :                 lsn: bytes.get_u64_le().into(),
      93            0 :             })
      94              :         }
      95            0 :         Ok(TermHistory(res))
      96            0 :     }
      97              : 
      98              :     /// Return copy of self with switches happening strictly after up_to
      99              :     /// truncated.
     100         5407 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
     101         5407 :         let mut res = Vec::with_capacity(self.0.len());
     102        32366 :         for e in &self.0 {
     103        26960 :             if e.lsn > up_to {
     104            1 :                 break;
     105        26959 :             }
     106        26959 :             res.push(*e);
     107              :         }
     108         5407 :         TermHistory(res)
     109         5407 :     }
     110              : 
     111              :     /// Find point of divergence between leader (walproposer) term history and
     112              :     /// safekeeper. Arguments are not symmetric as proposer history ends at
     113              :     /// +infinity while safekeeper at flush_lsn.
     114              :     /// C version is at walproposer SendProposerElected.
     115         1032 :     pub fn find_highest_common_point(
     116         1032 :         prop_th: &TermHistory,
     117         1032 :         sk_th: &TermHistory,
     118         1032 :         sk_wal_end: Lsn,
     119         1032 :     ) -> Option<TermLsn> {
     120         1032 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     121              : 
     122         1032 :         if let Some(sk_th_last) = sk_th.last() {
     123          895 :             assert!(
     124          895 :                 sk_th_last.lsn <= sk_wal_end,
     125            0 :                 "safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
     126              :             );
     127          137 :         }
     128              : 
     129              :         // find last common term, if any...
     130         1032 :         let mut last_common_idx = None;
     131         9710 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     132         9710 :             if prop_th[i].term != sk_th[i].term {
     133            2 :                 break;
     134         9708 :             }
     135              :             // If term is the same, LSN must be equal as well.
     136         9708 :             assert!(
     137         9708 :                 prop_th[i].lsn == sk_th[i].lsn,
     138            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     139            0 :                 prop_th[i].term,
     140            0 :                 prop_th[i].lsn,
     141            0 :                 sk_th[i].lsn
     142              :             );
     143         9708 :             last_common_idx = Some(i);
     144              :         }
     145         1032 :         let last_common_idx = last_common_idx?;
     146              :         // Now find where it ends at both prop and sk and take min. End of
     147              :         // (common) term is the start of the next except it is the last one;
     148              :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     149              :         // +infinity, so we just take flush_lsn then.
     150          894 :         if last_common_idx == prop_th.len() - 1 {
     151           79 :             Some(TermLsn {
     152           79 :                 term: prop_th[last_common_idx].term,
     153           79 :                 lsn: sk_wal_end,
     154           79 :             })
     155              :         } else {
     156          815 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     157          815 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     158            1 :                 sk_th[last_common_idx + 1].lsn
     159              :             } else {
     160          814 :                 sk_wal_end
     161              :             };
     162          815 :             Some(TermLsn {
     163          815 :                 term: prop_th[last_common_idx].term,
     164          815 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     165          815 :             })
     166              :         }
     167         1032 :     }
     168              : }
     169              : 
     170              : /// Display only latest entries for Debug.
     171              : impl fmt::Debug for TermHistory {
     172          163 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     173          163 :         let n_printed = 20;
     174          163 :         write!(
     175          163 :             fmt,
     176          163 :             "{}{:?}",
     177          163 :             if self.0.len() > n_printed { "... " } else { "" },
     178          163 :             self.0
     179          163 :                 .iter()
     180          163 :                 .rev()
     181          163 :                 .take(n_printed)
     182          163 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     183          163 :                 .collect::<Vec<_>>()
     184              :         )
     185          163 :     }
     186              : }
     187              : 
     188              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     189              : pub type PgUuid = [u8; 16];
     190              : 
     191              : /// Persistent consensus state of the acceptor.
     192            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     193              : pub struct AcceptorState {
     194              :     /// acceptor's last term it voted for (advanced in 1 phase)
     195              :     pub term: Term,
     196              :     /// History of term switches for safekeeper's WAL.
     197              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     198              :     /// from the proposer before recovery.
     199              :     pub term_history: TermHistory,
     200              : }
     201              : 
     202              : impl AcceptorState {
     203              :     /// acceptor's last_log_term is the term of the highest entry in the log
     204         1292 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     205         1292 :         let th = self.term_history.up_to(flush_lsn);
     206         1292 :         match th.0.last() {
     207         1292 :             Some(e) => e.term,
     208            0 :             None => 0,
     209              :         }
     210         1292 :     }
     211              : }
     212              : 
     213              : // protocol messages
     214              : 
     215              : /// Initial Proposer -> Acceptor message
     216            0 : #[derive(Debug, Deserialize)]
     217              : pub struct ProposerGreeting {
     218              :     pub tenant_id: TenantId,
     219              :     pub timeline_id: TimelineId,
     220              :     pub mconf: membership::Configuration,
     221              :     /// Postgres server version
     222              :     pub pg_version: PgVersionId,
     223              :     pub system_id: SystemId,
     224              :     pub wal_seg_size: u32,
     225              : }
     226              : 
     227              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     228            0 : #[derive(Debug, Deserialize)]
     229              : pub struct ProposerGreetingV2 {
     230              :     /// proposer-acceptor protocol version
     231              :     pub protocol_version: u32,
     232              :     /// Postgres server version
     233              :     pub pg_version: PgVersionId,
     234              :     pub proposer_id: PgUuid,
     235              :     pub system_id: SystemId,
     236              :     pub timeline_id: TimelineId,
     237              :     pub tenant_id: TenantId,
     238              :     pub tli: TimeLineID,
     239              :     pub wal_seg_size: u32,
     240              : }
     241              : 
     242              : /// Acceptor -> Proposer initial response: the highest term known to me
     243              : /// (acceptor voted for).
     244              : #[derive(Debug, Serialize)]
     245              : pub struct AcceptorGreeting {
     246              :     node_id: NodeId,
     247              :     mconf: membership::Configuration,
     248              :     term: u64,
     249              : }
     250              : 
     251              : /// Vote request sent from proposer to safekeepers
     252              : #[derive(Debug)]
     253              : pub struct VoteRequest {
     254              :     pub generation: Generation,
     255              :     pub term: Term,
     256              : }
     257              : 
     258              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     259            0 : #[derive(Debug, Deserialize)]
     260              : pub struct VoteRequestV2 {
     261              :     pub term: Term,
     262              : }
     263              : 
     264              : /// Vote itself, sent from safekeeper to proposer
     265              : #[derive(Debug, Serialize)]
     266              : pub struct VoteResponse {
     267              :     generation: Generation, // membership conf generation
     268              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     269              :     vote_given: bool,
     270              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     271              :     // proposer to choose the most advanced one.
     272              :     pub flush_lsn: Lsn,
     273              :     truncate_lsn: Lsn,
     274              :     pub term_history: TermHistory,
     275              : }
     276              : 
     277              : /*
     278              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     279              :  * term history to it.
     280              :  */
     281              : #[derive(Debug, Clone)]
     282              : pub struct ProposerElected {
     283              :     pub generation: Generation, // membership conf generation
     284              :     pub term: Term,
     285              :     pub start_streaming_at: Lsn,
     286              :     pub term_history: TermHistory,
     287              : }
     288              : 
     289              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     290              : /// communicates commit_lsn.
     291              : #[derive(Debug)]
     292              : pub struct AppendRequest {
     293              :     pub h: AppendRequestHeader,
     294              :     pub wal_data: Bytes,
     295              : }
     296            0 : #[derive(Debug, Clone, Deserialize)]
     297              : pub struct AppendRequestHeader {
     298              :     pub generation: Generation, // membership conf generation
     299              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     300              :     pub term: Term,
     301              :     /// start position of message in WAL
     302              :     pub begin_lsn: Lsn,
     303              :     /// end position of message in WAL
     304              :     pub end_lsn: Lsn,
     305              :     /// LSN committed by quorum of safekeepers
     306              :     pub commit_lsn: Lsn,
     307              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     308              :     pub truncate_lsn: Lsn,
     309              : }
     310              : 
     311              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     312            0 : #[derive(Debug, Clone, Deserialize)]
     313              : pub struct AppendRequestHeaderV2 {
     314              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     315              :     pub term: Term,
     316              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     317              :     // switch can be taken from ProposerElected (as well as from term history).
     318              :     pub term_start_lsn: Lsn,
     319              :     /// start position of message in WAL
     320              :     pub begin_lsn: Lsn,
     321              :     /// end position of message in WAL
     322              :     pub end_lsn: Lsn,
     323              :     /// LSN committed by quorum of safekeepers
     324              :     pub commit_lsn: Lsn,
     325              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     326              :     pub truncate_lsn: Lsn,
     327              :     // only for logging/debugging
     328              :     pub proposer_uuid: PgUuid,
     329              : }
     330              : 
     331              : /// Report safekeeper state to proposer
     332              : #[derive(Debug, Serialize, Clone)]
     333              : pub struct AppendResponse {
     334              :     // Membership conf generation. Not strictly required because on mismatch
     335              :     // connection is reset, but let's sanity check it.
     336              :     generation: Generation,
     337              :     // Current term of the safekeeper; if it is higher than proposer's, the
     338              :     // compute is out of date.
     339              :     pub term: Term,
     340              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     341              :     // term history this value comes, either checking history directly or
     342              :     // observing term being set to one for which WAL truncation is known to have
     343              :     // happened.
     344              :     pub flush_lsn: Lsn,
     345              :     // We report back our awareness about which WAL is committed, as this is
     346              :     // a criterion for walproposer --sync mode exit
     347              :     pub commit_lsn: Lsn,
     348              :     pub hs_feedback: HotStandbyFeedback,
     349              :     pub pageserver_feedback: Option<PageserverFeedback>,
     350              : }
     351              : 
     352              : impl AppendResponse {
     353            0 :     fn term_only(generation: Generation, term: Term) -> AppendResponse {
     354            0 :         AppendResponse {
     355            0 :             generation,
     356            0 :             term,
     357            0 :             flush_lsn: Lsn(0),
     358            0 :             commit_lsn: Lsn(0),
     359            0 :             hs_feedback: HotStandbyFeedback::empty(),
     360            0 :             pageserver_feedback: None,
     361            0 :         }
     362            0 :     }
     363              : }
     364              : 
     365              : /// Proposer -> Acceptor messages
     366              : #[derive(Debug)]
     367              : pub enum ProposerAcceptorMessage {
     368              :     Greeting(ProposerGreeting),
     369              :     VoteRequest(VoteRequest),
     370              :     Elected(ProposerElected),
     371              :     AppendRequest(AppendRequest),
     372              :     NoFlushAppendRequest(AppendRequest),
     373              :     FlushWAL,
     374              : }
     375              : 
     376              : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
     377              : /// All reads are in network (big endian) order.
     378              : trait BytesF {
     379              :     fn get_u8_f(&mut self) -> Result<u8>;
     380              :     fn get_u16_f(&mut self) -> Result<u16>;
     381              :     fn get_u32_f(&mut self) -> Result<u32>;
     382              :     fn get_u64_f(&mut self) -> Result<u64>;
     383              : }
     384              : 
     385              : impl BytesF for Bytes {
     386        28750 :     fn get_u8_f(&mut self) -> Result<u8> {
     387        28750 :         if self.is_empty() {
     388            0 :             bail!("no bytes left, expected 1");
     389        28750 :         }
     390        28750 :         Ok(self.get_u8())
     391        28750 :     }
     392            0 :     fn get_u16_f(&mut self) -> Result<u16> {
     393            0 :         if self.remaining() < 2 {
     394            0 :             bail!("no bytes left, expected 2");
     395            0 :         }
     396            0 :         Ok(self.get_u16())
     397            0 :     }
     398       110419 :     fn get_u32_f(&mut self) -> Result<u32> {
     399       110419 :         if self.remaining() < 4 {
     400            0 :             bail!("only {} bytes left, expected 4", self.remaining());
     401       110419 :         }
     402       110419 :         Ok(self.get_u32())
     403       110419 :     }
     404        69221 :     fn get_u64_f(&mut self) -> Result<u64> {
     405        69221 :         if self.remaining() < 8 {
     406            0 :             bail!("only {} bytes left, expected 8", self.remaining());
     407        69221 :         }
     408        69221 :         Ok(self.get_u64())
     409        69221 :     }
     410              : }
     411              : 
     412              : impl ProposerAcceptorMessage {
     413              :     /// Read cstring from Bytes.
     414        40324 :     fn get_cstr(buf: &mut Bytes) -> Result<String> {
     415        40324 :         let pos = buf
     416        40324 :             .iter()
     417      1330692 :             .position(|x| *x == 0)
     418        40324 :             .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
     419        40324 :         let result = buf.split_to(pos);
     420        40324 :         buf.advance(1); // drop the null terminator
     421        40324 :         match std::str::from_utf8(&result) {
     422        40324 :             Ok(s) => Ok(s.to_string()),
     423            0 :             Err(e) => bail!("invalid utf8 in cstring: {}", e),
     424              :         }
     425        40324 :     }
     426              : 
     427              :     /// Read membership::Configuration from Bytes.
     428        20162 :     fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
     429        20162 :         let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
     430        20162 :         let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
     431              :         // Main member set must have at least someone in valid configuration.
     432              :         // Empty conf is allowed until we fully migrate.
     433        20162 :         if generation != INVALID_GENERATION && members_len == 0 {
     434            0 :             bail!("empty members_len");
     435        20162 :         }
     436        20162 :         let mut members = MemberSet::empty();
     437        20162 :         for i in 0..members_len {
     438            0 :             let id = buf
     439            0 :                 .get_u64_f()
     440            0 :                 .with_context(|| format!("reading member {i} node_id"))?;
     441            0 :             let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
     442            0 :             let pg_port = buf
     443            0 :                 .get_u16_f()
     444            0 :                 .with_context(|| format!("reading member {i} port"))?;
     445            0 :             let sk = SafekeeperId {
     446            0 :                 id: NodeId(id),
     447            0 :                 host,
     448            0 :                 pg_port,
     449            0 :             };
     450            0 :             members.add(sk)?;
     451              :         }
     452        20162 :         let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
     453              :         // Non joint conf.
     454        20162 :         if new_members_len == 0 {
     455        20162 :             Ok(membership::Configuration {
     456        20162 :                 generation,
     457        20162 :                 members,
     458        20162 :                 new_members: None,
     459        20162 :             })
     460              :         } else {
     461            0 :             let mut new_members = MemberSet::empty();
     462            0 :             for i in 0..new_members_len {
     463            0 :                 let id = buf
     464            0 :                     .get_u64_f()
     465            0 :                     .with_context(|| format!("reading new member {i} node_id"))?;
     466            0 :                 let host =
     467            0 :                     Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
     468            0 :                 let pg_port = buf
     469            0 :                     .get_u16_f()
     470            0 :                     .with_context(|| format!("reading new member {i} port"))?;
     471            0 :                 let sk = SafekeeperId {
     472            0 :                     id: NodeId(id),
     473            0 :                     host,
     474            0 :                     pg_port,
     475            0 :                 };
     476            0 :                 new_members.add(sk)?;
     477              :             }
     478            0 :             Ok(membership::Configuration {
     479            0 :                 generation,
     480            0 :                 members,
     481            0 :                 new_members: Some(new_members),
     482            0 :             })
     483              :         }
     484        20162 :     }
     485              : 
     486              :     /// Parse proposer message.
     487        28750 :     pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
     488        28750 :         if proto_version == SK_PROTO_VERSION_3 {
     489        28750 :             if msg_bytes.is_empty() {
     490            0 :                 bail!("ProposerAcceptorMessage is not complete: missing tag");
     491        28750 :             }
     492        28750 :             let tag = msg_bytes.get_u8_f().with_context(|| {
     493            0 :                 "ProposerAcceptorMessage is not complete: missing tag".to_string()
     494            0 :             })? as char;
     495        28750 :             match tag {
     496              :                 'g' => {
     497        20162 :                     let tenant_id_str =
     498        20162 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
     499        20162 :                     let tenant_id = TenantId::from_str(&tenant_id_str)?;
     500        20162 :                     let timeline_id_str =
     501        20162 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
     502        20162 :                     let timeline_id = TimelineId::from_str(&timeline_id_str)?;
     503        20162 :                     let mconf = Self::get_mconf(&mut msg_bytes)?;
     504        20162 :                     let pg_version = msg_bytes
     505        20162 :                         .get_u32_f()
     506        20162 :                         .with_context(|| "reading pg_version")?;
     507        20162 :                     let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
     508        20162 :                     let wal_seg_size = msg_bytes
     509        20162 :                         .get_u32_f()
     510        20162 :                         .with_context(|| "reading wal_seg_size")?;
     511        20162 :                     let g = ProposerGreeting {
     512        20162 :                         tenant_id,
     513        20162 :                         timeline_id,
     514        20162 :                         mconf,
     515        20162 :                         pg_version: PgVersionId::from_full_pg_version(pg_version),
     516        20162 :                         system_id,
     517        20162 :                         wal_seg_size,
     518        20162 :                     };
     519        20162 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     520              :                 }
     521              :                 'v' => {
     522         3085 :                     let generation = Generation::new(
     523         3085 :                         msg_bytes
     524         3085 :                             .get_u32_f()
     525         3085 :                             .with_context(|| "reading generation")?,
     526              :                     );
     527         3085 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     528         3085 :                     let v = VoteRequest { generation, term };
     529         3085 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     530              :                 }
     531              :                 'e' => {
     532         1021 :                     let generation = Generation::new(
     533         1021 :                         msg_bytes
     534         1021 :                             .get_u32_f()
     535         1021 :                             .with_context(|| "reading generation")?,
     536              :                     );
     537         1021 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     538         1021 :                     let start_streaming_at: Lsn = msg_bytes
     539         1021 :                         .get_u64_f()
     540         1021 :                         .with_context(|| "reading start_streaming_at")?
     541         1021 :                         .into();
     542         1021 :                     let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     543         1021 :                     let msg = ProposerElected {
     544         1021 :                         generation,
     545         1021 :                         term,
     546         1021 :                         start_streaming_at,
     547         1021 :                         term_history,
     548         1021 :                     };
     549         1021 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     550              :                 }
     551              :                 'a' => {
     552         4482 :                     let generation = Generation::new(
     553         4482 :                         msg_bytes
     554         4482 :                             .get_u32_f()
     555         4482 :                             .with_context(|| "reading generation")?,
     556              :                     );
     557         4482 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     558         4482 :                     let begin_lsn: Lsn = msg_bytes
     559         4482 :                         .get_u64_f()
     560         4482 :                         .with_context(|| "reading begin_lsn")?
     561         4482 :                         .into();
     562         4482 :                     let end_lsn: Lsn = msg_bytes
     563         4482 :                         .get_u64_f()
     564         4482 :                         .with_context(|| "reading end_lsn")?
     565         4482 :                         .into();
     566         4482 :                     let commit_lsn: Lsn = msg_bytes
     567         4482 :                         .get_u64_f()
     568         4482 :                         .with_context(|| "reading commit_lsn")?
     569         4482 :                         .into();
     570         4482 :                     let truncate_lsn: Lsn = msg_bytes
     571         4482 :                         .get_u64_f()
     572         4482 :                         .with_context(|| "reading truncate_lsn")?
     573         4482 :                         .into();
     574         4482 :                     let hdr = AppendRequestHeader {
     575         4482 :                         generation,
     576         4482 :                         term,
     577         4482 :                         begin_lsn,
     578         4482 :                         end_lsn,
     579         4482 :                         commit_lsn,
     580         4482 :                         truncate_lsn,
     581         4482 :                     };
     582         4482 :                     let rec_size = hdr
     583         4482 :                         .end_lsn
     584         4482 :                         .checked_sub(hdr.begin_lsn)
     585         4482 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     586              :                         .0 as usize;
     587         4482 :                     if rec_size > MAX_SEND_SIZE {
     588            0 :                         bail!(
     589            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     590              :                             MAX_SEND_SIZE
     591              :                         );
     592         4482 :                     }
     593         4482 :                     if msg_bytes.remaining() < rec_size {
     594            0 :                         bail!(
     595            0 :                             "reading WAL: only {} bytes left, wanted {}",
     596            0 :                             msg_bytes.remaining(),
     597              :                             rec_size
     598              :                         );
     599         4482 :                     }
     600         4482 :                     let wal_data = msg_bytes.copy_to_bytes(rec_size);
     601         4482 :                     let msg = AppendRequest { h: hdr, wal_data };
     602              : 
     603         4482 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     604              :                 }
     605            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     606              :             }
     607            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     608              :             // xxx using Reader is inefficient but easy to work with bincode
     609            0 :             let mut stream = msg_bytes.reader();
     610              :             // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     611            0 :             let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     612            0 :             match tag {
     613              :                 'g' => {
     614            0 :                     let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
     615            0 :                     let g = ProposerGreeting {
     616            0 :                         tenant_id: msgv2.tenant_id,
     617            0 :                         timeline_id: msgv2.timeline_id,
     618            0 :                         mconf: membership::Configuration {
     619            0 :                             generation: INVALID_GENERATION,
     620            0 :                             members: MemberSet::empty(),
     621            0 :                             new_members: None,
     622            0 :                         },
     623            0 :                         pg_version: msgv2.pg_version,
     624            0 :                         system_id: msgv2.system_id,
     625            0 :                         wal_seg_size: msgv2.wal_seg_size,
     626            0 :                     };
     627            0 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     628              :                 }
     629              :                 'v' => {
     630            0 :                     let msg = VoteRequestV2::des_from(&mut stream)?;
     631            0 :                     let v = VoteRequest {
     632            0 :                         generation: INVALID_GENERATION,
     633            0 :                         term: msg.term,
     634            0 :                     };
     635            0 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     636              :                 }
     637              :                 'e' => {
     638            0 :                     let mut msg_bytes = stream.into_inner();
     639            0 :                     if msg_bytes.remaining() < 16 {
     640            0 :                         bail!("ProposerElected message is not complete");
     641            0 :                     }
     642            0 :                     let term = msg_bytes.get_u64_le();
     643            0 :                     let start_streaming_at = msg_bytes.get_u64_le().into();
     644            0 :                     let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
     645            0 :                     if msg_bytes.remaining() < 8 {
     646            0 :                         bail!("ProposerElected message is not complete");
     647            0 :                     }
     648            0 :                     let _timeline_start_lsn = msg_bytes.get_u64_le();
     649            0 :                     let msg = ProposerElected {
     650            0 :                         generation: INVALID_GENERATION,
     651            0 :                         term,
     652            0 :                         start_streaming_at,
     653            0 :                         term_history,
     654            0 :                     };
     655            0 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     656              :                 }
     657              :                 'a' => {
     658              :                     // read header followed by wal data
     659            0 :                     let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
     660            0 :                     let hdr = AppendRequestHeader {
     661            0 :                         generation: INVALID_GENERATION,
     662            0 :                         term: hdrv2.term,
     663            0 :                         begin_lsn: hdrv2.begin_lsn,
     664            0 :                         end_lsn: hdrv2.end_lsn,
     665            0 :                         commit_lsn: hdrv2.commit_lsn,
     666            0 :                         truncate_lsn: hdrv2.truncate_lsn,
     667            0 :                     };
     668            0 :                     let rec_size = hdr
     669            0 :                         .end_lsn
     670            0 :                         .checked_sub(hdr.begin_lsn)
     671            0 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     672              :                         .0 as usize;
     673            0 :                     if rec_size > MAX_SEND_SIZE {
     674            0 :                         bail!(
     675            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     676              :                             MAX_SEND_SIZE
     677              :                         );
     678            0 :                     }
     679              : 
     680            0 :                     let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     681            0 :                     stream.read_exact(&mut wal_data_vec)?;
     682            0 :                     let wal_data = Bytes::from(wal_data_vec);
     683              : 
     684            0 :                     let msg = AppendRequest { h: hdr, wal_data };
     685              : 
     686            0 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     687              :                 }
     688            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     689              :             }
     690              :         } else {
     691            0 :             bail!("unsupported protocol version {}", proto_version);
     692              :         }
     693        28750 :     }
     694              : 
     695              :     /// The memory size of the message, including byte slices.
     696          620 :     pub fn size(&self) -> usize {
     697              :         const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
     698              : 
     699              :         // For most types, the size is just the base enum size including the nested structs. Some
     700              :         // types also contain byte slices; add them.
     701              :         //
     702              :         // We explicitly list all fields, to draw attention here when new fields are added.
     703          620 :         let mut size = BASE_SIZE;
     704          620 :         size += match self {
     705            0 :             Self::Greeting(_) => 0,
     706              : 
     707            0 :             Self::VoteRequest(_) => 0,
     708              : 
     709            0 :             Self::Elected(_) => 0,
     710              : 
     711              :             Self::AppendRequest(AppendRequest {
     712              :                 h:
     713              :                     AppendRequestHeader {
     714              :                         generation: _,
     715              :                         term: _,
     716              :                         begin_lsn: _,
     717              :                         end_lsn: _,
     718              :                         commit_lsn: _,
     719              :                         truncate_lsn: _,
     720              :                     },
     721          620 :                 wal_data,
     722          620 :             }) => wal_data.len(),
     723              : 
     724              :             Self::NoFlushAppendRequest(AppendRequest {
     725              :                 h:
     726              :                     AppendRequestHeader {
     727              :                         generation: _,
     728              :                         term: _,
     729              :                         begin_lsn: _,
     730              :                         end_lsn: _,
     731              :                         commit_lsn: _,
     732              :                         truncate_lsn: _,
     733              :                     },
     734            0 :                 wal_data,
     735            0 :             }) => wal_data.len(),
     736              : 
     737            0 :             Self::FlushWAL => 0,
     738              :         };
     739              : 
     740          620 :         size
     741          620 :     }
     742              : }
     743              : 
     744              : /// Acceptor -> Proposer messages
     745              : #[derive(Debug)]
     746              : pub enum AcceptorProposerMessage {
     747              :     Greeting(AcceptorGreeting),
     748              :     VoteResponse(VoteResponse),
     749              :     AppendResponse(AppendResponse),
     750              : }
     751              : 
     752              : impl AcceptorProposerMessage {
     753            0 :     fn put_cstr(buf: &mut BytesMut, s: &str) {
     754            0 :         buf.put_slice(s.as_bytes());
     755            0 :         buf.put_u8(0); // null terminator
     756            0 :     }
     757              : 
     758              :     /// Serialize membership::Configuration into buf.
     759        20162 :     fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
     760        20162 :         buf.put_u32(mconf.generation.into_inner());
     761        20162 :         buf.put_u32(mconf.members.m.len() as u32);
     762        20162 :         for sk in &mconf.members.m {
     763            0 :             buf.put_u64(sk.id.0);
     764            0 :             Self::put_cstr(buf, &sk.host);
     765            0 :             buf.put_u16(sk.pg_port);
     766            0 :         }
     767        20162 :         if let Some(ref new_members) = mconf.new_members {
     768            0 :             buf.put_u32(new_members.m.len() as u32);
     769            0 :             for sk in &new_members.m {
     770            0 :                 buf.put_u64(sk.id.0);
     771            0 :                 Self::put_cstr(buf, &sk.host);
     772            0 :                 buf.put_u16(sk.pg_port);
     773            0 :             }
     774        20162 :         } else {
     775        20162 :             buf.put_u32(0);
     776        20162 :         }
     777        20162 :     }
     778              : 
     779              :     /// Serialize acceptor -> proposer message.
     780        27059 :     pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
     781        27059 :         if proto_version == SK_PROTO_VERSION_3 {
     782        27059 :             match self {
     783        20162 :                 AcceptorProposerMessage::Greeting(msg) => {
     784        20162 :                     buf.put_u8(b'g');
     785        20162 :                     buf.put_u64(msg.node_id.0);
     786        20162 :                     Self::serialize_mconf(buf, &msg.mconf);
     787        20162 :                     buf.put_u64(msg.term)
     788              :                 }
     789         3085 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     790         3085 :                     buf.put_u8(b'v');
     791         3085 :                     buf.put_u32(msg.generation.into_inner());
     792         3085 :                     buf.put_u64(msg.term);
     793         3085 :                     buf.put_u8(msg.vote_given as u8);
     794         3085 :                     buf.put_u64(msg.flush_lsn.into());
     795         3085 :                     buf.put_u64(msg.truncate_lsn.into());
     796         3085 :                     buf.put_u32(msg.term_history.0.len() as u32);
     797        19049 :                     for e in &msg.term_history.0 {
     798        15964 :                         buf.put_u64(e.term);
     799        15964 :                         buf.put_u64(e.lsn.into());
     800        15964 :                     }
     801              :                 }
     802         3812 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     803         3812 :                     buf.put_u8(b'a');
     804         3812 :                     buf.put_u32(msg.generation.into_inner());
     805         3812 :                     buf.put_u64(msg.term);
     806         3812 :                     buf.put_u64(msg.flush_lsn.into());
     807         3812 :                     buf.put_u64(msg.commit_lsn.into());
     808         3812 :                     buf.put_i64(msg.hs_feedback.ts);
     809         3812 :                     buf.put_u64(msg.hs_feedback.xmin);
     810         3812 :                     buf.put_u64(msg.hs_feedback.catalog_xmin);
     811              : 
     812              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     813              :                     // if it is not present.
     814         3812 :                     if let Some(ref msg) = msg.pageserver_feedback {
     815            0 :                         msg.serialize(buf);
     816         3812 :                     }
     817              :                 }
     818              :             }
     819        27059 :             Ok(())
     820              :         // TODO remove 3 after converting all msgs
     821            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     822            0 :             match self {
     823            0 :                 AcceptorProposerMessage::Greeting(msg) => {
     824            0 :                     buf.put_u64_le('g' as u64);
     825            0 :                     // v2 didn't have mconf and fields were reordered
     826            0 :                     buf.put_u64_le(msg.term);
     827            0 :                     buf.put_u64_le(msg.node_id.0);
     828            0 :                 }
     829            0 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     830              :                     // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
     831            0 :                     buf.put_u64_le('v' as u64);
     832            0 :                     buf.put_u64_le(msg.term);
     833            0 :                     buf.put_u64_le(msg.vote_given as u64);
     834            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     835            0 :                     buf.put_u64_le(msg.truncate_lsn.into());
     836            0 :                     buf.put_u32_le(msg.term_history.0.len() as u32);
     837            0 :                     for e in &msg.term_history.0 {
     838            0 :                         buf.put_u64_le(e.term);
     839            0 :                         buf.put_u64_le(e.lsn.into());
     840            0 :                     }
     841              :                     // removed timeline_start_lsn
     842            0 :                     buf.put_u64_le(0);
     843              :                 }
     844            0 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     845              :                     // v2 didn't have generation
     846            0 :                     buf.put_u64_le('a' as u64);
     847            0 :                     buf.put_u64_le(msg.term);
     848            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     849            0 :                     buf.put_u64_le(msg.commit_lsn.into());
     850            0 :                     buf.put_i64_le(msg.hs_feedback.ts);
     851            0 :                     buf.put_u64_le(msg.hs_feedback.xmin);
     852            0 :                     buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     853              : 
     854              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     855              :                     // if it is not present.
     856            0 :                     if let Some(ref msg) = msg.pageserver_feedback {
     857            0 :                         msg.serialize(buf);
     858            0 :                     }
     859              :                 }
     860              :             }
     861            0 :             Ok(())
     862              :         } else {
     863            0 :             bail!("unsupported protocol version {}", proto_version);
     864              :         }
     865        27059 :     }
     866              : }
     867              : 
     868              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     869              : /// It controls all WAL disk writes and updates of control file.
     870              : ///
     871              : /// Currently safekeeper processes:
     872              : /// - messages from compute (proposers) and provides replies
     873              : /// - messages from broker peers
     874              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     875              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     876              :     /// determines last_log_term switch point.
     877              :     pub term_start_lsn: Lsn,
     878              : 
     879              :     pub state: TimelineState<CTRL>, // persistent state storage
     880              :     pub wal_store: WAL,
     881              : 
     882              :     node_id: NodeId, // safekeeper's node id
     883              : }
     884              : 
     885              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     886              : where
     887              :     CTRL: control_file::Storage,
     888              :     WAL: wal_storage::Storage,
     889              : {
     890              :     /// Accepts a control file storage containing the safekeeper state.
     891              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     892              :     /// and `server` (`wal_seg_size` inside it) fields.
     893         9149 :     pub fn new(
     894         9149 :         state: TimelineState<CTRL>,
     895         9149 :         wal_store: WAL,
     896         9149 :         node_id: NodeId,
     897         9149 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     898         9149 :         if state.tenant_id == TenantId::from([0u8; 16])
     899         9149 :             || state.timeline_id == TimelineId::from([0u8; 16])
     900              :         {
     901            0 :             bail!(
     902            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     903            0 :                 state.tenant_id,
     904            0 :                 state.timeline_id
     905              :             );
     906            9 :         }
     907              : 
     908         9149 :         Ok(SafeKeeper {
     909         9149 :             term_start_lsn: Lsn(0),
     910         9149 :             state,
     911         9149 :             wal_store,
     912         9149 :             node_id,
     913         9149 :         })
     914            9 :     }
     915              : 
     916              :     /// Get history of term switches for the available WAL
     917         4115 :     fn get_term_history(&self) -> TermHistory {
     918         4115 :         self.state
     919         4115 :             .acceptor_state
     920         4115 :             .term_history
     921         4115 :             .up_to(self.flush_lsn())
     922            9 :     }
     923              : 
     924            2 :     pub fn get_last_log_term(&self) -> Term {
     925            2 :         self.state
     926            2 :             .acceptor_state
     927            2 :             .get_last_log_term(self.flush_lsn())
     928            2 :     }
     929              : 
     930              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     931        21266 :     pub fn flush_lsn(&self) -> Lsn {
     932        21266 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     933         1883 :     }
     934              : 
     935              :     /// Process message from proposer and possibly form reply. Concurrent
     936              :     /// callers must exclude each other.
     937        33818 :     pub async fn process_msg(
     938        33818 :         &mut self,
     939        33818 :         msg: &ProposerAcceptorMessage,
     940        33818 :     ) -> Result<Option<AcceptorProposerMessage>> {
     941        33818 :         let res = match msg {
     942        20162 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     943         3088 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     944         1029 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     945            5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     946            5 :                 self.handle_append_request(msg, true).await
     947              :             }
     948         5102 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     949         5102 :                 self.handle_append_request(msg, false).await
     950              :             }
     951         4432 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     952              :         };
     953              : 
     954              :         // BEGIN HADRON
     955        33818 :         match &res {
     956        33814 :             Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
     957        33814 :                 .with_label_values(&["success"])
     958        33814 :                 .inc(),
     959            4 :             Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
     960            4 :                 .with_label_values(&["error"])
     961            4 :                 .inc(),
     962              :         };
     963              : 
     964        33818 :         res
     965              :         // END HADRON
     966         1256 :     }
     967              : 
     968              :     /// Handle initial message from proposer: check its sanity and send my
     969              :     /// current term.
     970        20162 :     async fn handle_greeting(
     971        20162 :         &mut self,
     972        20162 :         msg: &ProposerGreeting,
     973        20162 :     ) -> Result<Option<AcceptorProposerMessage>> {
     974              :         /* Postgres major version mismatch is treated as fatal error
     975              :          * because safekeepers parse WAL headers and the format
     976              :          * may change between versions.
     977              :          */
     978        20162 :         if PgMajorVersion::try_from(msg.pg_version)?
     979        20162 :             != PgMajorVersion::try_from(self.state.server.pg_version)?
     980            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     981              :         {
     982            0 :             bail!(
     983            0 :                 "incompatible server version {}, expected {}",
     984              :                 msg.pg_version,
     985            0 :                 self.state.server.pg_version
     986              :             );
     987            0 :         }
     988              : 
     989        20162 :         if msg.tenant_id != self.state.tenant_id {
     990            0 :             bail!(
     991            0 :                 "invalid tenant ID, got {}, expected {}",
     992              :                 msg.tenant_id,
     993            0 :                 self.state.tenant_id
     994              :             );
     995            0 :         }
     996        20162 :         if msg.timeline_id != self.state.timeline_id {
     997            0 :             bail!(
     998            0 :                 "invalid timeline ID, got {}, expected {}",
     999              :                 msg.timeline_id,
    1000            0 :                 self.state.timeline_id
    1001              :             );
    1002            0 :         }
    1003        20162 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
    1004            0 :             bail!(
    1005            0 :                 "invalid wal_seg_size, got {}, expected {}",
    1006              :                 msg.wal_seg_size,
    1007            0 :                 self.state.server.wal_seg_size
    1008              :             );
    1009            0 :         }
    1010              : 
    1011              :         // system_id will be updated on mismatch
    1012              :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
    1013        20162 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
    1014            0 :             if self.state.server.system_id != 0 {
    1015            0 :                 warn!(
    1016            0 :                     "unexpected system ID arrived, got {}, expected {}",
    1017            0 :                     msg.system_id, self.state.server.system_id
    1018              :                 );
    1019            0 :             }
    1020              : 
    1021            0 :             let mut state = self.state.start_change();
    1022            0 :             state.server.system_id = msg.system_id;
    1023            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
    1024            0 :                 state.server.pg_version = msg.pg_version;
    1025            0 :             }
    1026            0 :             self.state.finish_change(&state).await?;
    1027            0 :         }
    1028              : 
    1029        20162 :         if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
    1030            0 :             bail!(
    1031            0 :                 "refused to switch into {}, node {} is not a member of it",
    1032              :                 msg.mconf,
    1033              :                 self.node_id,
    1034              :             );
    1035            0 :         }
    1036              :         // Switch into conf given by proposer conf if it is higher.
    1037        20162 :         self.state.membership_switch(msg.mconf.clone()).await?;
    1038              : 
    1039        20162 :         let apg = AcceptorGreeting {
    1040        20162 :             node_id: self.node_id,
    1041        20162 :             mconf: self.state.mconf.clone(),
    1042        20162 :             term: self.state.acceptor_state.term,
    1043        20162 :         };
    1044        20162 :         info!(
    1045            0 :             "processed greeting {:?} from walproposer, sending {:?}",
    1046              :             msg, apg
    1047              :         );
    1048        20162 :         Ok(Some(AcceptorProposerMessage::Greeting(apg)))
    1049            0 :     }
    1050              : 
    1051              :     /// Give vote for the given term, if we haven't done that previously.
    1052         3088 :     async fn handle_vote_request(
    1053         3088 :         &mut self,
    1054         3088 :         msg: &VoteRequest,
    1055         3088 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1056         3088 :         if self.state.mconf.generation != msg.generation {
    1057            1 :             bail!(
    1058            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1059              :                 msg,
    1060            1 :                 self.state.mconf.generation
    1061              :             );
    1062            2 :         }
    1063              :         // Once voted, we won't accept data from older proposers; flush
    1064              :         // everything we've already received so that new proposer starts
    1065              :         // streaming at end of our WAL, without overlap. WAL is truncated at
    1066              :         // streaming point and commit_lsn may be advanced from peers, so this
    1067              :         // also avoids possible spurious attempt to truncate committed WAL.
    1068         3087 :         self.wal_store.flush_wal().await?;
    1069              :         // initialize with refusal
    1070         3087 :         let mut resp = VoteResponse {
    1071         3087 :             generation: self.state.mconf.generation,
    1072         3087 :             term: self.state.acceptor_state.term,
    1073         3087 :             vote_given: false,
    1074         3087 :             flush_lsn: self.flush_lsn(),
    1075         3087 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
    1076         3087 :             term_history: self.get_term_history(),
    1077         3087 :         };
    1078         3087 :         if self.state.acceptor_state.term < msg.term {
    1079         2905 :             let mut state = self.state.start_change();
    1080         2905 :             state.acceptor_state.term = msg.term;
    1081              :             // persist vote before sending it out
    1082         2905 :             self.state.finish_change(&state).await?;
    1083              : 
    1084         2905 :             resp.term = self.state.acceptor_state.term;
    1085         2905 :             resp.vote_given = true;
    1086            1 :         }
    1087         3087 :         info!("processed {:?}: sending {:?}", msg, &resp);
    1088         3087 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
    1089            3 :     }
    1090              : 
    1091              :     /// Form AppendResponse from current state.
    1092         4435 :     fn append_response(&self) -> AppendResponse {
    1093         4435 :         let ar = AppendResponse {
    1094         4435 :             generation: self.state.mconf.generation,
    1095         4435 :             term: self.state.acceptor_state.term,
    1096         4435 :             flush_lsn: self.flush_lsn(),
    1097         4435 :             commit_lsn: self.state.commit_lsn,
    1098         4435 :             // will be filled by the upper code to avoid bothering safekeeper
    1099         4435 :             hs_feedback: HotStandbyFeedback::empty(),
    1100         4435 :             pageserver_feedback: None,
    1101         4435 :         };
    1102         4435 :         trace!("formed AppendResponse {:?}", ar);
    1103         4435 :         ar
    1104          623 :     }
    1105              : 
    1106         1029 :     async fn handle_elected(
    1107         1029 :         &mut self,
    1108         1029 :         msg: &ProposerElected,
    1109         1029 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1110         1029 :         let _timer = MISC_OPERATION_SECONDS
    1111         1029 :             .with_label_values(&["handle_elected"])
    1112         1029 :             .start_timer();
    1113              : 
    1114         1029 :         info!(
    1115            0 :             "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
    1116              :             msg,
    1117            0 :             self.state.acceptor_state.term,
    1118            0 :             self.get_last_log_term(),
    1119            0 :             self.flush_lsn()
    1120              :         );
    1121         1029 :         if self.state.mconf.generation != msg.generation {
    1122            1 :             bail!(
    1123            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1124              :                 msg,
    1125            1 :                 self.state.mconf.generation
    1126              :             );
    1127            7 :         }
    1128         1028 :         if self.state.acceptor_state.term < msg.term {
    1129            7 :             let mut state = self.state.start_change();
    1130            7 :             state.acceptor_state.term = msg.term;
    1131            7 :             self.state.finish_change(&state).await?;
    1132            0 :         }
    1133              : 
    1134              :         // If our term is higher, ignore the message (next feedback will inform the compute)
    1135         1028 :         if self.state.acceptor_state.term > msg.term {
    1136            0 :             return Ok(None);
    1137            7 :         }
    1138              : 
    1139              :         // Before truncating WAL check-cross the check divergence point received
    1140              :         // from the walproposer.
    1141         1028 :         let sk_th = self.get_term_history();
    1142         1028 :         let last_common_point = match TermHistory::find_highest_common_point(
    1143         1028 :             &msg.term_history,
    1144         1028 :             &sk_th,
    1145         1028 :             self.flush_lsn(),
    1146         1028 :         ) {
    1147              :             // No common point. Expect streaming from the beginning of the
    1148              :             // history like walproposer while we don't have proper init.
    1149          137 :             None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
    1150          137 :                 "empty walproposer term history {:?}",
    1151              :                 msg.term_history
    1152            0 :             ))?,
    1153          891 :             Some(lcp) => lcp,
    1154              :         };
    1155              :         // This is expected to happen in a rare race when another connection
    1156              :         // from the same walproposer writes + flushes WAL after this connection
    1157              :         // sent flush_lsn in VoteRequest; for instance, very late
    1158              :         // ProposerElected message delivery after another connection was
    1159              :         // established and wrote WAL. In such cases error is transient;
    1160              :         // reconnection makes safekeeper send newest term history and flush_lsn
    1161              :         // and walproposer recalculates the streaming point. OTOH repeating
    1162              :         // error indicates a serious bug.
    1163         1028 :         if last_common_point.lsn != msg.start_streaming_at {
    1164            0 :             bail!(
    1165            0 :                 "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1166              :                 last_common_point,
    1167              :                 msg.start_streaming_at,
    1168            0 :                 self.state.acceptor_state.term,
    1169              :                 sk_th,
    1170            0 :                 self.flush_lsn(),
    1171              :                 msg.term_history,
    1172              :             );
    1173            7 :         }
    1174              : 
    1175              :         // We are also expected to never attempt to truncate committed data.
    1176         1028 :         assert!(
    1177         1028 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
    1178            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1179              :             msg.start_streaming_at,
    1180              :             self.state.inmem.commit_lsn,
    1181            0 :             self.state.acceptor_state.term,
    1182              :             sk_th,
    1183            0 :             self.flush_lsn(),
    1184              :             msg.term_history,
    1185              :         );
    1186              : 
    1187              :         // Before first WAL write initialize its segment. It makes first segment
    1188              :         // pg_waldump'able because stream from compute doesn't include its
    1189              :         // segment and page headers.
    1190              :         //
    1191              :         // If we fail before first WAL write flush this action would be
    1192              :         // repeated, that's ok because it is idempotent.
    1193         1028 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
    1194          137 :             self.wal_store
    1195          137 :                 .initialize_first_segment(msg.start_streaming_at)
    1196          137 :                 .await?;
    1197            0 :         }
    1198              : 
    1199              :         // truncate wal, update the LSNs
    1200         1028 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
    1201              : 
    1202              :         // and now adopt term history from proposer
    1203              :         {
    1204         1028 :             let mut state = self.state.start_change();
    1205              : 
    1206              :             // Here we learn initial LSN for the first time, set fields
    1207              :             // interested in that.
    1208              : 
    1209         1028 :             if let Some(start_lsn) = msg.term_history.0.first() {
    1210         1028 :                 if state.timeline_start_lsn == Lsn(0) {
    1211              :                     // Remember point where WAL begins globally. In the future it
    1212              :                     // will be intialized immediately on timeline creation.
    1213          137 :                     state.timeline_start_lsn = start_lsn.lsn;
    1214          137 :                     info!(
    1215            0 :                         "setting timeline_start_lsn to {:?}",
    1216              :                         state.timeline_start_lsn
    1217              :                     );
    1218            0 :                 }
    1219            0 :             }
    1220              : 
    1221         1028 :             if state.peer_horizon_lsn == Lsn(0) {
    1222          137 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
    1223          137 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
    1224          137 :                 state.peer_horizon_lsn = state.timeline_start_lsn;
    1225          137 :             }
    1226         1028 :             if state.local_start_lsn == Lsn(0) {
    1227          137 :                 state.local_start_lsn = msg.start_streaming_at;
    1228          137 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
    1229            0 :             }
    1230              :             // Initializing commit_lsn before acking first flushed record is
    1231              :             // important to let find_end_of_wal skip the hole in the beginning
    1232              :             // of the first segment.
    1233              :             //
    1234              :             // NB: on new clusters, this happens at the same time as
    1235              :             // timeline_start_lsn initialization, it is taken outside to provide
    1236              :             // upgrade.
    1237         1028 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
    1238              : 
    1239              :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
    1240         1028 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
    1241              :             // similar for remote_consistent_lsn
    1242         1028 :             state.remote_consistent_lsn =
    1243         1028 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
    1244              : 
    1245         1028 :             state.acceptor_state.term_history = msg.term_history.clone();
    1246         1028 :             self.state.finish_change(&state).await?;
    1247              :         }
    1248              : 
    1249         1028 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
    1250              : 
    1251              :         // Cache LSN where term starts to immediately fsync control file with
    1252              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
    1253              :         // persisted commit_lsn on majority of safekeepers aligns.
    1254         1028 :         self.term_start_lsn = match msg.term_history.0.last() {
    1255            0 :             None => bail!("proposer elected with empty term history"),
    1256         1028 :             Some(term_lsn_start) => term_lsn_start.lsn,
    1257              :         };
    1258              : 
    1259         1028 :         Ok(None)
    1260            8 :     }
    1261              : 
    1262              :     /// Advance commit_lsn taking into account what we have locally.
    1263              :     ///
    1264              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
    1265              :     /// already been done outside.
    1266         3497 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
    1267              :         // Both peers and walproposer communicate this value, we might already
    1268              :         // have a fresher (higher) version.
    1269         3497 :         candidate = max(candidate, self.state.inmem.commit_lsn);
    1270         3497 :         let commit_lsn = min(candidate, self.flush_lsn());
    1271         3497 :         assert!(
    1272         3497 :             commit_lsn >= self.state.inmem.commit_lsn,
    1273            0 :             "commit_lsn monotonicity violated: old={} new={}",
    1274              :             self.state.inmem.commit_lsn,
    1275              :             commit_lsn
    1276              :         );
    1277              : 
    1278         3497 :         self.state.inmem.commit_lsn = commit_lsn;
    1279              : 
    1280              :         // If new commit_lsn reached term switch, force sync of control
    1281              :         // file: walproposer in sync mode is very interested when this
    1282              :         // happens. Note: this is for sync-safekeepers mode only, as
    1283              :         // otherwise commit_lsn might jump over term_start_lsn.
    1284         3497 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
    1285          140 :             self.state.flush().await?;
    1286          620 :         }
    1287              : 
    1288         3497 :         Ok(())
    1289          620 :     }
    1290              : 
    1291              :     /// Handle request to append WAL.
    1292              :     #[allow(clippy::comparison_chain)]
    1293         5107 :     async fn handle_append_request(
    1294         5107 :         &mut self,
    1295         5107 :         msg: &AppendRequest,
    1296         5107 :         require_flush: bool,
    1297         5107 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1298              :         // Refuse message on generation mismatch. On reconnect wp will get full
    1299              :         // configuration from greeting.
    1300         5107 :         if self.state.mconf.generation != msg.h.generation {
    1301            1 :             bail!(
    1302            1 :                 "refusing append request due to generation mismatch: request {}, sk {}",
    1303              :                 msg.h.generation,
    1304            1 :                 self.state.mconf.generation
    1305              :             );
    1306          624 :         }
    1307              : 
    1308         5106 :         if self.state.acceptor_state.term < msg.h.term {
    1309            0 :             bail!("got AppendRequest before ProposerElected");
    1310          624 :         }
    1311              : 
    1312              :         // If our term is higher, immediately refuse the message. Send term only
    1313              :         // response; elected walproposer can never advance the term, so it will
    1314              :         // figure out the refusal from it -- which is important as term change
    1315              :         // should cause not just reconnection but whole walproposer re-election.
    1316         5106 :         if self.state.acceptor_state.term > msg.h.term {
    1317            0 :             let resp = AppendResponse::term_only(
    1318            0 :                 self.state.mconf.generation,
    1319            0 :                 self.state.acceptor_state.term,
    1320              :             );
    1321            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
    1322          624 :         }
    1323              : 
    1324              :         // Disallow any non-sequential writes, which can result in gaps or
    1325              :         // overwrites. If we need to move the pointer, ProposerElected message
    1326              :         // should have truncated WAL first accordingly. Note that the first
    1327              :         // condition (WAL rewrite) is quite expected in real world; it happens
    1328              :         // when walproposer reconnects to safekeeper and writes some more data
    1329              :         // while first connection still gets some packets later. It might be
    1330              :         // better to not log this as error! above.
    1331         5106 :         let write_lsn = self.wal_store.write_lsn();
    1332         5106 :         let flush_lsn = self.wal_store.flush_lsn();
    1333         5106 :         if write_lsn > msg.h.begin_lsn {
    1334            1 :             bail!(
    1335            1 :                 "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
    1336              :                 write_lsn,
    1337              :                 msg.h.begin_lsn
    1338              :             );
    1339          623 :         }
    1340         5105 :         if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
    1341            0 :             bail!(
    1342            0 :                 "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
    1343              :                 write_lsn,
    1344              :                 msg.h.begin_lsn,
    1345              :             );
    1346          623 :         }
    1347              : 
    1348              :         // Now we know that we are in the same term as the proposer, process the
    1349              :         // message.
    1350              : 
    1351              :         // do the job
    1352         5105 :         if !msg.wal_data.is_empty() {
    1353         1757 :             self.wal_store
    1354         1757 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
    1355         1757 :                 .await?;
    1356            0 :         }
    1357              : 
    1358              :         // flush wal to the disk, if required
    1359         5105 :         if require_flush {
    1360            3 :             self.wal_store.flush_wal().await?;
    1361          620 :         }
    1362              : 
    1363              :         // Update commit_lsn. It will be flushed to the control file regularly by the timeline
    1364              :         // manager, off of the WAL ingest hot path.
    1365         5105 :         if msg.h.commit_lsn != Lsn(0) {
    1366         3497 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
    1367            3 :         }
    1368              :         // Value calculated by walproposer can always lag:
    1369              :         // - safekeepers can forget inmem value and send to proposer lower
    1370              :         //   persisted one on restart;
    1371              :         // - if we make safekeepers always send persistent value,
    1372              :         //   any compute restart would pull it down.
    1373              :         // Thus, take max before adopting.
    1374         5105 :         self.state.inmem.peer_horizon_lsn =
    1375         5105 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
    1376              : 
    1377         5105 :         trace!(
    1378            0 :             "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
    1379            0 :             msg.wal_data.len(),
    1380              :             msg.h.begin_lsn,
    1381              :             msg.h.end_lsn,
    1382              :             msg.h.commit_lsn,
    1383              :             msg.h.truncate_lsn,
    1384              :             require_flush,
    1385              :         );
    1386              : 
    1387              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
    1388              :         // This is the common case for !require_flush, but a flush can still
    1389              :         // happen on segment bounds.
    1390         5105 :         if !require_flush && flush_lsn == self.flush_lsn() {
    1391         5102 :             return Ok(None);
    1392            3 :         }
    1393              : 
    1394            3 :         let resp = self.append_response();
    1395            3 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
    1396          625 :     }
    1397              : 
    1398              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
    1399         4432 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
    1400         4432 :         self.wal_store.flush_wal().await?;
    1401         4432 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
    1402         4432 :             self.append_response(),
    1403         4432 :         )))
    1404          620 :     }
    1405              : 
    1406              :     /// Update commit_lsn from peer safekeeper data.
    1407            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
    1408            0 :         if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
    1409              :             // Note: the check is too restrictive, generally we can update local
    1410              :             // commit_lsn if our history matches (is part of) history of advanced
    1411              :             // commit_lsn provider.
    1412            0 :             if sk_info.last_log_term == self.get_last_log_term() {
    1413            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
    1414            0 :             }
    1415            0 :         }
    1416            0 :         Ok(())
    1417            0 :     }
    1418              : }
    1419              : 
    1420              : #[cfg(test)]
    1421              : mod tests {
    1422              :     use std::ops::Deref;
    1423              :     use std::str::FromStr;
    1424              :     use std::time::{Instant, UNIX_EPOCH};
    1425              : 
    1426              :     use futures::future::BoxFuture;
    1427              :     use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
    1428              :     use safekeeper_api::ServerInfo;
    1429              :     use safekeeper_api::membership::{
    1430              :         Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
    1431              :     };
    1432              : 
    1433              :     use super::*;
    1434              :     use crate::state::{EvictionState, TimelinePersistentState};
    1435              : 
    1436              :     // fake storage for tests
    1437              :     struct InMemoryState {
    1438              :         persisted_state: TimelinePersistentState,
    1439              :     }
    1440              : 
    1441              :     impl control_file::Storage for InMemoryState {
    1442            5 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
    1443            5 :             self.persisted_state = s.clone();
    1444            5 :             Ok(())
    1445            5 :         }
    1446              : 
    1447            0 :         fn last_persist_at(&self) -> Instant {
    1448            0 :             Instant::now()
    1449            0 :         }
    1450              :     }
    1451              : 
    1452              :     impl Deref for InMemoryState {
    1453              :         type Target = TimelinePersistentState;
    1454              : 
    1455          100 :         fn deref(&self) -> &Self::Target {
    1456          100 :             &self.persisted_state
    1457          100 :         }
    1458              :     }
    1459              : 
    1460            3 :     fn test_sk_state() -> TimelinePersistentState {
    1461            3 :         let mut state = TimelinePersistentState::empty();
    1462            3 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1463            3 :         state.tenant_id = TenantId::from([1u8; 16]);
    1464            3 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1465            3 :         state
    1466            3 :     }
    1467              : 
    1468              :     struct DummyWalStore {
    1469              :         lsn: Lsn,
    1470              :     }
    1471              : 
    1472              :     impl wal_storage::Storage for DummyWalStore {
    1473            4 :         fn write_lsn(&self) -> Lsn {
    1474            4 :             self.lsn
    1475            4 :         }
    1476              : 
    1477           19 :         fn flush_lsn(&self) -> Lsn {
    1478           19 :             self.lsn
    1479           19 :         }
    1480              : 
    1481            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
    1482            2 :             Ok(())
    1483            2 :         }
    1484              : 
    1485            3 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1486            3 :             self.lsn = startpos + buf.len() as u64;
    1487            3 :             Ok(())
    1488            3 :         }
    1489              : 
    1490            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1491            2 :             self.lsn = end_pos;
    1492            2 :             Ok(())
    1493            2 :         }
    1494              : 
    1495            5 :         async fn flush_wal(&mut self) -> Result<()> {
    1496            5 :             Ok(())
    1497            5 :         }
    1498              : 
    1499            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1500            0 :             Box::pin(async { Ok(()) })
    1501            0 :         }
    1502              : 
    1503            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1504            0 :             crate::metrics::WalStorageMetrics::default()
    1505            0 :         }
    1506              :     }
    1507              : 
    1508              :     #[tokio::test]
    1509            1 :     async fn test_voting() {
    1510            1 :         let storage = InMemoryState {
    1511            1 :             persisted_state: test_sk_state(),
    1512            1 :         };
    1513            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1514            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1515              : 
    1516              :         // Vote with generation mismatch should be rejected.
    1517            1 :         let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1518            1 :             generation: SafekeeperGeneration::new(42),
    1519            1 :             term: 1,
    1520            1 :         });
    1521            1 :         assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
    1522              : 
    1523              :         // check voting for 1 is ok
    1524            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1525            1 :             generation: Generation::new(0),
    1526            1 :             term: 1,
    1527            1 :         });
    1528            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1529            1 :         match vote_resp.unwrap() {
    1530            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
    1531            0 :             r => panic!("unexpected response: {r:?}"),
    1532              :         }
    1533              : 
    1534              :         // reboot...
    1535            1 :         let state = sk.state.deref().clone();
    1536            1 :         let storage = InMemoryState {
    1537            1 :             persisted_state: state,
    1538            1 :         };
    1539              : 
    1540            1 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1541              : 
    1542              :         // and ensure voting second time for 1 is not ok
    1543            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1544            1 :         match vote_resp.unwrap() {
    1545            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
    1546            1 :             r => panic!("unexpected response: {r:?}"),
    1547            1 :         }
    1548            1 :     }
    1549              : 
    1550              :     #[tokio::test]
    1551            1 :     async fn test_last_log_term_switch() {
    1552            1 :         let storage = InMemoryState {
    1553            1 :             persisted_state: test_sk_state(),
    1554            1 :         };
    1555            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1556              : 
    1557            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1558              : 
    1559            1 :         let mut ar_hdr = AppendRequestHeader {
    1560            1 :             generation: Generation::new(0),
    1561            1 :             term: 2,
    1562            1 :             begin_lsn: Lsn(1),
    1563            1 :             end_lsn: Lsn(2),
    1564            1 :             commit_lsn: Lsn(0),
    1565            1 :             truncate_lsn: Lsn(0),
    1566            1 :         };
    1567            1 :         let mut append_request = AppendRequest {
    1568            1 :             h: ar_hdr.clone(),
    1569            1 :             wal_data: Bytes::from_static(b"b"),
    1570            1 :         };
    1571              : 
    1572            1 :         let pem = ProposerElected {
    1573            1 :             generation: Generation::new(0),
    1574            1 :             term: 2,
    1575            1 :             start_streaming_at: Lsn(1),
    1576            1 :             term_history: TermHistory(vec![
    1577            1 :                 TermLsn {
    1578            1 :                     term: 1,
    1579            1 :                     lsn: Lsn(1),
    1580            1 :                 },
    1581            1 :                 TermLsn {
    1582            1 :                     term: 2,
    1583            1 :                     lsn: Lsn(3),
    1584            1 :                 },
    1585            1 :             ]),
    1586            1 :         };
    1587              : 
    1588              :         // check that elected msg with generation mismatch is rejected
    1589            1 :         let mut pem_gen_mismatch = pem.clone();
    1590            1 :         pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1591            1 :         assert!(
    1592            1 :             sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
    1593            1 :                 .await
    1594            1 :                 .is_err()
    1595              :         );
    1596              : 
    1597            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1598            1 :             .await
    1599            1 :             .unwrap();
    1600              : 
    1601              :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1602            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1603            1 :             .await
    1604            1 :             .unwrap();
    1605            1 :         assert_eq!(sk.get_last_log_term(), 1);
    1606              : 
    1607              :         // but record at term_start_lsn does the switch
    1608            1 :         ar_hdr.begin_lsn = Lsn(2);
    1609            1 :         ar_hdr.end_lsn = Lsn(3);
    1610            1 :         append_request = AppendRequest {
    1611            1 :             h: ar_hdr,
    1612            1 :             wal_data: Bytes::from_static(b"b"),
    1613            1 :         };
    1614            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1615            1 :             .await
    1616            1 :             .unwrap();
    1617            1 :         assert_eq!(sk.get_last_log_term(), 2);
    1618            1 :     }
    1619              : 
    1620              :     #[tokio::test]
    1621            1 :     async fn test_non_consecutive_write() {
    1622            1 :         let storage = InMemoryState {
    1623            1 :             persisted_state: test_sk_state(),
    1624            1 :         };
    1625            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1626              : 
    1627            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1628              : 
    1629            1 :         let pem = ProposerElected {
    1630            1 :             generation: Generation::new(0),
    1631            1 :             term: 1,
    1632            1 :             start_streaming_at: Lsn(1),
    1633            1 :             term_history: TermHistory(vec![TermLsn {
    1634            1 :                 term: 1,
    1635            1 :                 lsn: Lsn(1),
    1636            1 :             }]),
    1637            1 :         };
    1638            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1639            1 :             .await
    1640            1 :             .unwrap();
    1641              : 
    1642            1 :         let ar_hdr = AppendRequestHeader {
    1643            1 :             generation: Generation::new(0),
    1644            1 :             term: 1,
    1645            1 :             begin_lsn: Lsn(1),
    1646            1 :             end_lsn: Lsn(2),
    1647            1 :             commit_lsn: Lsn(0),
    1648            1 :             truncate_lsn: Lsn(0),
    1649            1 :         };
    1650            1 :         let append_request = AppendRequest {
    1651            1 :             h: ar_hdr.clone(),
    1652            1 :             wal_data: Bytes::from_static(b"b"),
    1653            1 :         };
    1654              : 
    1655              :         // check that append request with generation mismatch is rejected
    1656            1 :         let mut ar_hdr_gen_mismatch = ar_hdr.clone();
    1657            1 :         ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1658            1 :         let append_request_gen_mismatch = AppendRequest {
    1659            1 :             h: ar_hdr_gen_mismatch,
    1660            1 :             wal_data: Bytes::from_static(b"b"),
    1661            1 :         };
    1662            1 :         assert!(
    1663            1 :             sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
    1664            1 :                 append_request_gen_mismatch
    1665            1 :             ))
    1666            1 :             .await
    1667            1 :             .is_err()
    1668              :         );
    1669              : 
    1670              :         // do write ending at 2, it should be ok
    1671            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1672            1 :             .await
    1673            1 :             .unwrap();
    1674            1 :         let mut ar_hrd2 = ar_hdr.clone();
    1675            1 :         ar_hrd2.begin_lsn = Lsn(4);
    1676            1 :         ar_hrd2.end_lsn = Lsn(5);
    1677            1 :         let append_request = AppendRequest {
    1678            1 :             h: ar_hdr,
    1679            1 :             wal_data: Bytes::from_static(b"b"),
    1680            1 :         };
    1681              :         // and now starting at 4, it must fail
    1682            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1683            1 :             .await
    1684            1 :             .unwrap_err();
    1685            1 :     }
    1686              : 
    1687              :     #[test]
    1688            1 :     fn test_find_highest_common_point_none() {
    1689            1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1690            1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1691            1 :         assert_eq!(
    1692            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1693              :             None
    1694              :         );
    1695            1 :     }
    1696              : 
    1697              :     #[test]
    1698            1 :     fn test_find_highest_common_point_middle() {
    1699            1 :         let prop_th = TermHistory(vec![
    1700            1 :             (1, Lsn(10)).into(),
    1701            1 :             (2, Lsn(20)).into(),
    1702            1 :             (4, Lsn(40)).into(),
    1703            1 :         ]);
    1704            1 :         let sk_th = TermHistory(vec![
    1705            1 :             (1, Lsn(10)).into(),
    1706            1 :             (2, Lsn(20)).into(),
    1707            1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1708            1 :         ]);
    1709            1 :         assert_eq!(
    1710            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1711              :             Some(TermLsn {
    1712              :                 term: 2,
    1713              :                 lsn: Lsn(30),
    1714              :             })
    1715              :         );
    1716            1 :     }
    1717              : 
    1718              :     #[test]
    1719            1 :     fn test_find_highest_common_point_sk_end() {
    1720            1 :         let prop_th = TermHistory(vec![
    1721            1 :             (1, Lsn(10)).into(),
    1722            1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1723            1 :             (4, Lsn(40)).into(),
    1724            1 :         ]);
    1725            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1726            1 :         assert_eq!(
    1727            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1728              :             Some(TermLsn {
    1729              :                 term: 2,
    1730              :                 lsn: Lsn(32),
    1731              :             })
    1732              :         );
    1733            1 :     }
    1734              : 
    1735              :     #[test]
    1736            1 :     fn test_find_highest_common_point_walprop() {
    1737            1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1738            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1739            1 :         assert_eq!(
    1740            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1741              :             Some(TermLsn {
    1742              :                 term: 2,
    1743              :                 lsn: Lsn(32),
    1744              :             })
    1745              :         );
    1746            1 :     }
    1747              : 
    1748              :     #[test]
    1749            1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1750            1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1751            1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1752            1 :         let state = TimelinePersistentState {
    1753            1 :             tenant_id,
    1754            1 :             timeline_id,
    1755            1 :             mconf: Configuration {
    1756            1 :                 generation: SafekeeperGeneration::new(42),
    1757            1 :                 members: MemberSet::new(vec![SafekeeperId {
    1758            1 :                     id: NodeId(1),
    1759            1 :                     host: "hehe.org".to_owned(),
    1760            1 :                     pg_port: 5432,
    1761            1 :                 }])
    1762            1 :                 .expect("duplicate member"),
    1763            1 :                 new_members: None,
    1764            1 :             },
    1765            1 :             acceptor_state: AcceptorState {
    1766            1 :                 term: 42,
    1767            1 :                 term_history: TermHistory(vec![TermLsn {
    1768            1 :                     lsn: Lsn(0x1),
    1769            1 :                     term: 41,
    1770            1 :                 }]),
    1771            1 :             },
    1772            1 :             server: ServerInfo {
    1773            1 :                 pg_version: PgVersionId::from_full_pg_version(140000),
    1774            1 :                 system_id: 0x1234567887654321,
    1775            1 :                 wal_seg_size: 0x12345678,
    1776            1 :             },
    1777            1 :             proposer_uuid: {
    1778            1 :                 let mut arr = timeline_id.as_arr();
    1779            1 :                 arr.reverse();
    1780            1 :                 arr
    1781            1 :             },
    1782            1 :             timeline_start_lsn: Lsn(0x12345600),
    1783            1 :             local_start_lsn: Lsn(0x12),
    1784            1 :             commit_lsn: Lsn(1234567800),
    1785            1 :             backup_lsn: Lsn(1234567300),
    1786            1 :             peer_horizon_lsn: Lsn(9999999),
    1787            1 :             remote_consistent_lsn: Lsn(1234560000),
    1788            1 :             partial_backup: crate::wal_backup_partial::State::default(),
    1789            1 :             eviction_state: EvictionState::Present,
    1790            1 :             creation_ts: UNIX_EPOCH,
    1791            1 :         };
    1792              : 
    1793            1 :         let ser = state.ser().unwrap();
    1794              : 
    1795            1 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1796              : 
    1797            1 :         assert_eq!(deser, state);
    1798            1 :     }
    1799              : }
        

Generated by: LCOV version 2.1-beta