LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 71.8 % 1054 757
Test Date: 2025-07-16 12:29:03 Functions: 71.2 % 177 126

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use std::cmp::{max, min};
       4              : use std::fmt;
       5              : use std::io::Read;
       6              : use std::str::FromStr;
       7              : 
       8              : use anyhow::{Context, Result, bail};
       9              : use byteorder::{LittleEndian, ReadBytesExt};
      10              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      11              : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
      12              : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
      13              : use pq_proto::SystemId;
      14              : use safekeeper_api::membership::{
      15              :     INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
      16              : };
      17              : use safekeeper_api::models::HotStandbyFeedback;
      18              : use safekeeper_api::{Term, membership};
      19              : use serde::{Deserialize, Serialize};
      20              : use storage_broker::proto::SafekeeperTimelineInfo;
      21              : use tracing::*;
      22              : use utils::bin_ser::LeSer;
      23              : use utils::id::{NodeId, TenantId, TimelineId};
      24              : use utils::lsn::Lsn;
      25              : use utils::pageserver_feedback::PageserverFeedback;
      26              : 
      27              : use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
      28              : use crate::state::TimelineState;
      29              : use crate::{control_file, wal_storage};
      30              : 
      31              : pub const SK_PROTO_VERSION_2: u32 = 2;
      32              : pub const SK_PROTO_VERSION_3: u32 = 3;
      33              : pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
      34              : 
      35            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      36              : pub struct TermLsn {
      37              :     pub term: Term,
      38              :     pub lsn: Lsn,
      39              : }
      40              : 
      41              : // Creation from tuple provides less typing (e.g. for unit tests).
      42              : impl From<(Term, Lsn)> for TermLsn {
      43         1298 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      44         1298 :         TermLsn {
      45         1298 :             term: pair.0,
      46         1298 :             lsn: pair.1,
      47         1298 :         }
      48         1298 :     }
      49              : }
      50              : 
      51            0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      52              : pub struct TermHistory(pub Vec<TermLsn>);
      53              : 
      54              : impl TermHistory {
      55         1471 :     pub fn empty() -> TermHistory {
      56         1471 :         TermHistory(Vec::new())
      57         1471 :     }
      58              : 
      59              :     // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
      60          680 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      61          680 :         let n_entries = bytes
      62          680 :             .get_u32_f()
      63          680 :             .with_context(|| "TermHistory misses len")?;
      64          680 :         let mut res = Vec::with_capacity(n_entries as usize);
      65         4636 :         for i in 0..n_entries {
      66         4636 :             let term = bytes
      67         4636 :                 .get_u64_f()
      68         4636 :                 .with_context(|| format!("TermHistory pos {i} misses term"))?;
      69         4636 :             let lsn = bytes
      70         4636 :                 .get_u64_f()
      71         4636 :                 .with_context(|| format!("TermHistory pos {i} misses lsn"))?
      72         4636 :                 .into();
      73         4636 :             res.push(TermLsn { term, lsn })
      74              :         }
      75          680 :         Ok(TermHistory(res))
      76          680 :     }
      77              : 
      78              :     // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
      79              :     // TODO remove once v2 protocol is fully dropped.
      80            0 :     pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
      81            0 :         if bytes.remaining() < 4 {
      82            0 :             bail!("TermHistory misses len");
      83            0 :         }
      84            0 :         let n_entries = bytes.get_u32_le();
      85            0 :         let mut res = Vec::with_capacity(n_entries as usize);
      86            0 :         for _ in 0..n_entries {
      87            0 :             if bytes.remaining() < 16 {
      88            0 :                 bail!("TermHistory is incomplete");
      89            0 :             }
      90            0 :             res.push(TermLsn {
      91            0 :                 term: bytes.get_u64_le(),
      92            0 :                 lsn: bytes.get_u64_le().into(),
      93            0 :             })
      94              :         }
      95            0 :         Ok(TermHistory(res))
      96            0 :     }
      97              : 
      98              :     /// Return copy of self with switches happening strictly after up_to
      99              :     /// truncated.
     100         4643 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
     101         4643 :         let mut res = Vec::with_capacity(self.0.len());
     102        17081 :         for e in &self.0 {
     103        12441 :             if e.lsn > up_to {
     104            3 :                 break;
     105        12438 :             }
     106        12438 :             res.push(*e);
     107              :         }
     108         4643 :         TermHistory(res)
     109         4643 :     }
     110              : 
     111              :     /// Find point of divergence between leader (walproposer) term history and
     112              :     /// safekeeper. Arguments are not symmetric as proposer history ends at
     113              :     /// +infinity while safekeeper at flush_lsn.
     114              :     /// C version is at walproposer SendProposerElected.
     115          691 :     pub fn find_highest_common_point(
     116          691 :         prop_th: &TermHistory,
     117          691 :         sk_th: &TermHistory,
     118          691 :         sk_wal_end: Lsn,
     119          691 :     ) -> Option<TermLsn> {
     120          691 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     121              : 
     122          691 :         if let Some(sk_th_last) = sk_th.last() {
     123          552 :             assert!(
     124          552 :                 sk_th_last.lsn <= sk_wal_end,
     125            0 :                 "safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
     126              :             );
     127          139 :         }
     128              : 
     129              :         // find last common term, if any...
     130          691 :         let mut last_common_idx = None;
     131         3914 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     132         3914 :             if prop_th[i].term != sk_th[i].term {
     133            4 :                 break;
     134         3910 :             }
     135              :             // If term is the same, LSN must be equal as well.
     136         3910 :             assert!(
     137         3910 :                 prop_th[i].lsn == sk_th[i].lsn,
     138            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     139            0 :                 prop_th[i].term,
     140            0 :                 prop_th[i].lsn,
     141            0 :                 sk_th[i].lsn
     142              :             );
     143         3910 :             last_common_idx = Some(i);
     144              :         }
     145          691 :         let last_common_idx = last_common_idx?;
     146              :         // Now find where it ends at both prop and sk and take min. End of
     147              :         // (common) term is the start of the next except it is the last one;
     148              :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     149              :         // +infinity, so we just take flush_lsn then.
     150          551 :         if last_common_idx == prop_th.len() - 1 {
     151           46 :             Some(TermLsn {
     152           46 :                 term: prop_th[last_common_idx].term,
     153           46 :                 lsn: sk_wal_end,
     154           46 :             })
     155              :         } else {
     156          505 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     157          505 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     158            3 :                 sk_th[last_common_idx + 1].lsn
     159              :             } else {
     160          502 :                 sk_wal_end
     161              :             };
     162          505 :             Some(TermLsn {
     163          505 :                 term: prop_th[last_common_idx].term,
     164          505 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     165          505 :             })
     166              :         }
     167          691 :     }
     168              : }
     169              : 
     170              : /// Display only latest entries for Debug.
     171              : impl fmt::Debug for TermHistory {
     172          340 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     173          340 :         let n_printed = 20;
     174          340 :         write!(
     175          340 :             fmt,
     176          340 :             "{}{:?}",
     177          340 :             if self.0.len() > n_printed { "... " } else { "" },
     178          340 :             self.0
     179          340 :                 .iter()
     180          340 :                 .rev()
     181          340 :                 .take(n_printed)
     182         1257 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     183          340 :                 .collect::<Vec<_>>()
     184              :         )
     185          340 :     }
     186              : }
     187              : 
     188              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     189              : pub type PgUuid = [u8; 16];
     190              : 
     191              : /// Persistent consensus state of the acceptor.
     192            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     193              : pub struct AcceptorState {
     194              :     /// acceptor's last term it voted for (advanced in 1 phase)
     195              :     pub term: Term,
     196              :     /// History of term switches for safekeeper's WAL.
     197              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     198              :     /// from the proposer before recovery.
     199              :     pub term_history: TermHistory,
     200              : }
     201              : 
     202              : impl AcceptorState {
     203              :     /// acceptor's last_log_term is the term of the highest entry in the log
     204         1376 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     205         1376 :         let th = self.term_history.up_to(flush_lsn);
     206         1376 :         match th.0.last() {
     207         1370 :             Some(e) => e.term,
     208            6 :             None => 0,
     209              :         }
     210         1376 :     }
     211              : }
     212              : 
     213              : // protocol messages
     214              : 
     215              : /// Initial Proposer -> Acceptor message
     216            0 : #[derive(Debug, Deserialize)]
     217              : pub struct ProposerGreeting {
     218              :     pub tenant_id: TenantId,
     219              :     pub timeline_id: TimelineId,
     220              :     pub mconf: membership::Configuration,
     221              :     /// Postgres server version
     222              :     pub pg_version: PgVersionId,
     223              :     pub system_id: SystemId,
     224              :     pub wal_seg_size: u32,
     225              : }
     226              : 
     227              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     228            0 : #[derive(Debug, Deserialize)]
     229              : pub struct ProposerGreetingV2 {
     230              :     /// proposer-acceptor protocol version
     231              :     pub protocol_version: u32,
     232              :     /// Postgres server version
     233              :     pub pg_version: PgVersionId,
     234              :     pub proposer_id: PgUuid,
     235              :     pub system_id: SystemId,
     236              :     pub timeline_id: TimelineId,
     237              :     pub tenant_id: TenantId,
     238              :     pub tli: TimeLineID,
     239              :     pub wal_seg_size: u32,
     240              : }
     241              : 
     242              : /// Acceptor -> Proposer initial response: the highest term known to me
     243              : /// (acceptor voted for).
     244              : #[derive(Debug, Serialize)]
     245              : pub struct AcceptorGreeting {
     246              :     node_id: NodeId,
     247              :     mconf: membership::Configuration,
     248              :     term: u64,
     249              : }
     250              : 
     251              : /// Vote request sent from proposer to safekeepers
     252              : #[derive(Debug)]
     253              : pub struct VoteRequest {
     254              :     pub generation: Generation,
     255              :     pub term: Term,
     256              : }
     257              : 
     258              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     259            0 : #[derive(Debug, Deserialize)]
     260              : pub struct VoteRequestV2 {
     261              :     pub term: Term,
     262              : }
     263              : 
     264              : /// Vote itself, sent from safekeeper to proposer
     265              : #[derive(Debug, Serialize)]
     266              : pub struct VoteResponse {
     267              :     generation: Generation, // membership conf generation
     268              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     269              :     vote_given: bool,
     270              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     271              :     // proposer to choose the most advanced one.
     272              :     pub flush_lsn: Lsn,
     273              :     truncate_lsn: Lsn,
     274              :     pub term_history: TermHistory,
     275              : }
     276              : 
     277              : /*
     278              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     279              :  * term history to it.
     280              :  */
     281              : #[derive(Debug, Clone)]
     282              : pub struct ProposerElected {
     283              :     pub generation: Generation, // membership conf generation
     284              :     pub term: Term,
     285              :     pub start_streaming_at: Lsn,
     286              :     pub term_history: TermHistory,
     287              : }
     288              : 
     289              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     290              : /// communicates commit_lsn.
     291              : #[derive(Debug)]
     292              : pub struct AppendRequest {
     293              :     pub h: AppendRequestHeader,
     294              :     pub wal_data: Bytes,
     295              : }
     296            0 : #[derive(Debug, Clone, Deserialize)]
     297              : pub struct AppendRequestHeader {
     298              :     pub generation: Generation, // membership conf generation
     299              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     300              :     pub term: Term,
     301              :     /// start position of message in WAL
     302              :     pub begin_lsn: Lsn,
     303              :     /// end position of message in WAL
     304              :     pub end_lsn: Lsn,
     305              :     /// LSN committed by quorum of safekeepers
     306              :     pub commit_lsn: Lsn,
     307              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     308              :     pub truncate_lsn: Lsn,
     309              : }
     310              : 
     311              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     312            0 : #[derive(Debug, Clone, Deserialize)]
     313              : pub struct AppendRequestHeaderV2 {
     314              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     315              :     pub term: Term,
     316              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     317              :     // switch can be taken from ProposerElected (as well as from term history).
     318              :     pub term_start_lsn: Lsn,
     319              :     /// start position of message in WAL
     320              :     pub begin_lsn: Lsn,
     321              :     /// end position of message in WAL
     322              :     pub end_lsn: Lsn,
     323              :     /// LSN committed by quorum of safekeepers
     324              :     pub commit_lsn: Lsn,
     325              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     326              :     pub truncate_lsn: Lsn,
     327              :     // only for logging/debugging
     328              :     pub proposer_uuid: PgUuid,
     329              : }
     330              : 
     331              : /// Report safekeeper state to proposer
     332              : #[derive(Debug, Serialize, Clone)]
     333              : pub struct AppendResponse {
     334              :     // Membership conf generation. Not strictly required because on mismatch
     335              :     // connection is reset, but let's sanity check it.
     336              :     generation: Generation,
     337              :     // Current term of the safekeeper; if it is higher than proposer's, the
     338              :     // compute is out of date.
     339              :     pub term: Term,
     340              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     341              :     // term history this value comes, either checking history directly or
     342              :     // observing term being set to one for which WAL truncation is known to have
     343              :     // happened.
     344              :     pub flush_lsn: Lsn,
     345              :     // We report back our awareness about which WAL is committed, as this is
     346              :     // a criterion for walproposer --sync mode exit
     347              :     pub commit_lsn: Lsn,
     348              :     pub hs_feedback: HotStandbyFeedback,
     349              :     pub pageserver_feedback: Option<PageserverFeedback>,
     350              : }
     351              : 
     352              : impl AppendResponse {
     353            0 :     fn term_only(generation: Generation, term: Term) -> AppendResponse {
     354            0 :         AppendResponse {
     355            0 :             generation,
     356            0 :             term,
     357            0 :             flush_lsn: Lsn(0),
     358            0 :             commit_lsn: Lsn(0),
     359            0 :             hs_feedback: HotStandbyFeedback::empty(),
     360            0 :             pageserver_feedback: None,
     361            0 :         }
     362            0 :     }
     363              : }
     364              : 
     365              : /// Proposer -> Acceptor messages
     366              : #[derive(Debug)]
     367              : pub enum ProposerAcceptorMessage {
     368              :     Greeting(ProposerGreeting),
     369              :     VoteRequest(VoteRequest),
     370              :     Elected(ProposerElected),
     371              :     AppendRequest(AppendRequest),
     372              :     NoFlushAppendRequest(AppendRequest),
     373              :     FlushWAL,
     374              : }
     375              : 
     376              : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
     377              : /// All reads are in network (big endian) order.
     378              : trait BytesF {
     379              :     fn get_u8_f(&mut self) -> Result<u8>;
     380              :     fn get_u16_f(&mut self) -> Result<u16>;
     381              :     fn get_u32_f(&mut self) -> Result<u32>;
     382              :     fn get_u64_f(&mut self) -> Result<u64>;
     383              : }
     384              : 
     385              : impl BytesF for Bytes {
     386        26464 :     fn get_u8_f(&mut self) -> Result<u8> {
     387        26464 :         if self.is_empty() {
     388            0 :             bail!("no bytes left, expected 1");
     389        26464 :         }
     390        26464 :         Ok(self.get_u8())
     391        26464 :     }
     392            0 :     fn get_u16_f(&mut self) -> Result<u16> {
     393            0 :         if self.remaining() < 2 {
     394            0 :             bail!("no bytes left, expected 2");
     395            0 :         }
     396            0 :         Ok(self.get_u16())
     397            0 :     }
     398       108416 :     fn get_u32_f(&mut self) -> Result<u32> {
     399       108416 :         if self.remaining() < 4 {
     400            0 :             bail!("only {} bytes left, expected 4", self.remaining());
     401       108416 :         }
     402       108416 :         Ok(self.get_u32())
     403       108416 :     }
     404        47968 :     fn get_u64_f(&mut self) -> Result<u64> {
     405        47968 :         if self.remaining() < 8 {
     406            0 :             bail!("only {} bytes left, expected 8", self.remaining());
     407        47968 :         }
     408        47968 :         Ok(self.get_u64())
     409        47968 :     }
     410              : }
     411              : 
     412              : impl ProposerAcceptorMessage {
     413              :     /// Read cstring from Bytes.
     414        40636 :     fn get_cstr(buf: &mut Bytes) -> Result<String> {
     415        40636 :         let pos = buf
     416        40636 :             .iter()
     417      1340988 :             .position(|x| *x == 0)
     418        40636 :             .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
     419        40636 :         let result = buf.split_to(pos);
     420        40636 :         buf.advance(1); // drop the null terminator
     421        40636 :         match std::str::from_utf8(&result) {
     422        40636 :             Ok(s) => Ok(s.to_string()),
     423            0 :             Err(e) => bail!("invalid utf8 in cstring: {}", e),
     424              :         }
     425        40636 :     }
     426              : 
     427              :     /// Read membership::Configuration from Bytes.
     428        20318 :     fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
     429        20318 :         let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
     430        20318 :         let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
     431              :         // Main member set must have at least someone in valid configuration.
     432              :         // Empty conf is allowed until we fully migrate.
     433        20318 :         if generation != INVALID_GENERATION && members_len == 0 {
     434            0 :             bail!("empty members_len");
     435        20318 :         }
     436        20318 :         let mut members = MemberSet::empty();
     437        20318 :         for i in 0..members_len {
     438            0 :             let id = buf
     439            0 :                 .get_u64_f()
     440            0 :                 .with_context(|| format!("reading member {i} node_id"))?;
     441            0 :             let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
     442            0 :             let pg_port = buf
     443            0 :                 .get_u16_f()
     444            0 :                 .with_context(|| format!("reading member {i} port"))?;
     445            0 :             let sk = SafekeeperId {
     446            0 :                 id: NodeId(id),
     447            0 :                 host,
     448            0 :                 pg_port,
     449            0 :             };
     450            0 :             members.add(sk)?;
     451              :         }
     452        20318 :         let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
     453              :         // Non joint conf.
     454        20318 :         if new_members_len == 0 {
     455        20318 :             Ok(membership::Configuration {
     456        20318 :                 generation,
     457        20318 :                 members,
     458        20318 :                 new_members: None,
     459        20318 :             })
     460              :         } else {
     461            0 :             let mut new_members = MemberSet::empty();
     462            0 :             for i in 0..new_members_len {
     463            0 :                 let id = buf
     464            0 :                     .get_u64_f()
     465            0 :                     .with_context(|| format!("reading new member {i} node_id"))?;
     466            0 :                 let host =
     467            0 :                     Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
     468            0 :                 let pg_port = buf
     469            0 :                     .get_u16_f()
     470            0 :                     .with_context(|| format!("reading new member {i} port"))?;
     471            0 :                 let sk = SafekeeperId {
     472            0 :                     id: NodeId(id),
     473            0 :                     host,
     474            0 :                     pg_port,
     475            0 :                 };
     476            0 :                 new_members.add(sk)?;
     477              :             }
     478            0 :             Ok(membership::Configuration {
     479            0 :                 generation,
     480            0 :                 members,
     481            0 :                 new_members: Some(new_members),
     482            0 :             })
     483              :         }
     484        20318 :     }
     485              : 
     486              :     /// Parse proposer message.
     487        26464 :     pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
     488        26464 :         if proto_version == SK_PROTO_VERSION_3 {
     489        26464 :             if msg_bytes.is_empty() {
     490            0 :                 bail!("ProposerAcceptorMessage is not complete: missing tag");
     491        26464 :             }
     492        26464 :             let tag = msg_bytes.get_u8_f().with_context(|| {
     493            0 :                 "ProposerAcceptorMessage is not complete: missing tag".to_string()
     494            0 :             })? as char;
     495        26464 :             match tag {
     496              :                 'g' => {
     497        20318 :                     let tenant_id_str =
     498        20318 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
     499        20318 :                     let tenant_id = TenantId::from_str(&tenant_id_str)?;
     500        20318 :                     let timeline_id_str =
     501        20318 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
     502        20318 :                     let timeline_id = TimelineId::from_str(&timeline_id_str)?;
     503        20318 :                     let mconf = Self::get_mconf(&mut msg_bytes)?;
     504        20318 :                     let pg_version = msg_bytes
     505        20318 :                         .get_u32_f()
     506        20318 :                         .with_context(|| "reading pg_version")?;
     507        20318 :                     let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
     508        20318 :                     let wal_seg_size = msg_bytes
     509        20318 :                         .get_u32_f()
     510        20318 :                         .with_context(|| "reading wal_seg_size")?;
     511        20318 :                     let g = ProposerGreeting {
     512        20318 :                         tenant_id,
     513        20318 :                         timeline_id,
     514        20318 :                         mconf,
     515        20318 :                         pg_version: PgVersionId::from_full_pg_version(pg_version),
     516        20318 :                         system_id,
     517        20318 :                         wal_seg_size,
     518        20318 :                     };
     519        20318 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     520              :                 }
     521              :                 'v' => {
     522         2578 :                     let generation = Generation::new(
     523         2578 :                         msg_bytes
     524         2578 :                             .get_u32_f()
     525         2578 :                             .with_context(|| "reading generation")?,
     526              :                     );
     527         2578 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     528         2578 :                     let v = VoteRequest { generation, term };
     529         2578 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     530              :                 }
     531              :                 'e' => {
     532          680 :                     let generation = Generation::new(
     533          680 :                         msg_bytes
     534          680 :                             .get_u32_f()
     535          680 :                             .with_context(|| "reading generation")?,
     536              :                     );
     537          680 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     538          680 :                     let start_streaming_at: Lsn = msg_bytes
     539          680 :                         .get_u64_f()
     540          680 :                         .with_context(|| "reading start_streaming_at")?
     541          680 :                         .into();
     542          680 :                     let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     543          680 :                     let msg = ProposerElected {
     544          680 :                         generation,
     545          680 :                         term,
     546          680 :                         start_streaming_at,
     547          680 :                         term_history,
     548          680 :                     };
     549          680 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     550              :                 }
     551              :                 'a' => {
     552         2888 :                     let generation = Generation::new(
     553         2888 :                         msg_bytes
     554         2888 :                             .get_u32_f()
     555         2888 :                             .with_context(|| "reading generation")?,
     556              :                     );
     557         2888 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     558         2888 :                     let begin_lsn: Lsn = msg_bytes
     559         2888 :                         .get_u64_f()
     560         2888 :                         .with_context(|| "reading begin_lsn")?
     561         2888 :                         .into();
     562         2888 :                     let end_lsn: Lsn = msg_bytes
     563         2888 :                         .get_u64_f()
     564         2888 :                         .with_context(|| "reading end_lsn")?
     565         2888 :                         .into();
     566         2888 :                     let commit_lsn: Lsn = msg_bytes
     567         2888 :                         .get_u64_f()
     568         2888 :                         .with_context(|| "reading commit_lsn")?
     569         2888 :                         .into();
     570         2888 :                     let truncate_lsn: Lsn = msg_bytes
     571         2888 :                         .get_u64_f()
     572         2888 :                         .with_context(|| "reading truncate_lsn")?
     573         2888 :                         .into();
     574         2888 :                     let hdr = AppendRequestHeader {
     575         2888 :                         generation,
     576         2888 :                         term,
     577         2888 :                         begin_lsn,
     578         2888 :                         end_lsn,
     579         2888 :                         commit_lsn,
     580         2888 :                         truncate_lsn,
     581         2888 :                     };
     582         2888 :                     let rec_size = hdr
     583         2888 :                         .end_lsn
     584         2888 :                         .checked_sub(hdr.begin_lsn)
     585         2888 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     586              :                         .0 as usize;
     587         2888 :                     if rec_size > MAX_SEND_SIZE {
     588            0 :                         bail!(
     589            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     590              :                             MAX_SEND_SIZE
     591              :                         );
     592         2888 :                     }
     593         2888 :                     if msg_bytes.remaining() < rec_size {
     594            0 :                         bail!(
     595            0 :                             "reading WAL: only {} bytes left, wanted {}",
     596            0 :                             msg_bytes.remaining(),
     597              :                             rec_size
     598              :                         );
     599         2888 :                     }
     600         2888 :                     let wal_data = msg_bytes.copy_to_bytes(rec_size);
     601         2888 :                     let msg = AppendRequest { h: hdr, wal_data };
     602              : 
     603         2888 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     604              :                 }
     605            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     606              :             }
     607            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     608              :             // xxx using Reader is inefficient but easy to work with bincode
     609            0 :             let mut stream = msg_bytes.reader();
     610              :             // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     611            0 :             let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     612            0 :             match tag {
     613              :                 'g' => {
     614            0 :                     let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
     615            0 :                     let g = ProposerGreeting {
     616            0 :                         tenant_id: msgv2.tenant_id,
     617            0 :                         timeline_id: msgv2.timeline_id,
     618            0 :                         mconf: membership::Configuration {
     619            0 :                             generation: INVALID_GENERATION,
     620            0 :                             members: MemberSet::empty(),
     621            0 :                             new_members: None,
     622            0 :                         },
     623            0 :                         pg_version: msgv2.pg_version,
     624            0 :                         system_id: msgv2.system_id,
     625            0 :                         wal_seg_size: msgv2.wal_seg_size,
     626            0 :                     };
     627            0 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     628              :                 }
     629              :                 'v' => {
     630            0 :                     let msg = VoteRequestV2::des_from(&mut stream)?;
     631            0 :                     let v = VoteRequest {
     632            0 :                         generation: INVALID_GENERATION,
     633            0 :                         term: msg.term,
     634            0 :                     };
     635            0 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     636              :                 }
     637              :                 'e' => {
     638            0 :                     let mut msg_bytes = stream.into_inner();
     639            0 :                     if msg_bytes.remaining() < 16 {
     640            0 :                         bail!("ProposerElected message is not complete");
     641            0 :                     }
     642            0 :                     let term = msg_bytes.get_u64_le();
     643            0 :                     let start_streaming_at = msg_bytes.get_u64_le().into();
     644            0 :                     let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
     645            0 :                     if msg_bytes.remaining() < 8 {
     646            0 :                         bail!("ProposerElected message is not complete");
     647            0 :                     }
     648            0 :                     let _timeline_start_lsn = msg_bytes.get_u64_le();
     649            0 :                     let msg = ProposerElected {
     650            0 :                         generation: INVALID_GENERATION,
     651            0 :                         term,
     652            0 :                         start_streaming_at,
     653            0 :                         term_history,
     654            0 :                     };
     655            0 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     656              :                 }
     657              :                 'a' => {
     658              :                     // read header followed by wal data
     659            0 :                     let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
     660            0 :                     let hdr = AppendRequestHeader {
     661            0 :                         generation: INVALID_GENERATION,
     662            0 :                         term: hdrv2.term,
     663            0 :                         begin_lsn: hdrv2.begin_lsn,
     664            0 :                         end_lsn: hdrv2.end_lsn,
     665            0 :                         commit_lsn: hdrv2.commit_lsn,
     666            0 :                         truncate_lsn: hdrv2.truncate_lsn,
     667            0 :                     };
     668            0 :                     let rec_size = hdr
     669            0 :                         .end_lsn
     670            0 :                         .checked_sub(hdr.begin_lsn)
     671            0 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     672              :                         .0 as usize;
     673            0 :                     if rec_size > MAX_SEND_SIZE {
     674            0 :                         bail!(
     675            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     676              :                             MAX_SEND_SIZE
     677              :                         );
     678            0 :                     }
     679              : 
     680            0 :                     let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     681            0 :                     stream.read_exact(&mut wal_data_vec)?;
     682            0 :                     let wal_data = Bytes::from(wal_data_vec);
     683              : 
     684            0 :                     let msg = AppendRequest { h: hdr, wal_data };
     685              : 
     686            0 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     687              :                 }
     688            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     689              :             }
     690              :         } else {
     691            0 :             bail!("unsupported protocol version {}", proto_version);
     692              :         }
     693        26464 :     }
     694              : 
     695              :     /// The memory size of the message, including byte slices.
     696          620 :     pub fn size(&self) -> usize {
     697              :         const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
     698              : 
     699              :         // For most types, the size is just the base enum size including the nested structs. Some
     700              :         // types also contain byte slices; add them.
     701              :         //
     702              :         // We explicitly list all fields, to draw attention here when new fields are added.
     703          620 :         let mut size = BASE_SIZE;
     704          620 :         size += match self {
     705            0 :             Self::Greeting(_) => 0,
     706              : 
     707            0 :             Self::VoteRequest(_) => 0,
     708              : 
     709            0 :             Self::Elected(_) => 0,
     710              : 
     711              :             Self::AppendRequest(AppendRequest {
     712              :                 h:
     713              :                     AppendRequestHeader {
     714              :                         generation: _,
     715              :                         term: _,
     716              :                         begin_lsn: _,
     717              :                         end_lsn: _,
     718              :                         commit_lsn: _,
     719              :                         truncate_lsn: _,
     720              :                     },
     721          620 :                 wal_data,
     722          620 :             }) => wal_data.len(),
     723              : 
     724              :             Self::NoFlushAppendRequest(AppendRequest {
     725              :                 h:
     726              :                     AppendRequestHeader {
     727              :                         generation: _,
     728              :                         term: _,
     729              :                         begin_lsn: _,
     730              :                         end_lsn: _,
     731              :                         commit_lsn: _,
     732              :                         truncate_lsn: _,
     733              :                     },
     734            0 :                 wal_data,
     735            0 :             }) => wal_data.len(),
     736              : 
     737            0 :             Self::FlushWAL => 0,
     738              :         };
     739              : 
     740          620 :         size
     741          620 :     }
     742              : }
     743              : 
     744              : /// Acceptor -> Proposer messages
     745              : #[derive(Debug)]
     746              : pub enum AcceptorProposerMessage {
     747              :     Greeting(AcceptorGreeting),
     748              :     VoteResponse(VoteResponse),
     749              :     AppendResponse(AppendResponse),
     750              : }
     751              : 
     752              : impl AcceptorProposerMessage {
     753            0 :     fn put_cstr(buf: &mut BytesMut, s: &str) {
     754            0 :         buf.put_slice(s.as_bytes());
     755            0 :         buf.put_u8(0); // null terminator
     756            0 :     }
     757              : 
     758              :     /// Serialize membership::Configuration into buf.
     759        20318 :     fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
     760        20318 :         buf.put_u32(mconf.generation.into_inner());
     761        20318 :         buf.put_u32(mconf.members.m.len() as u32);
     762        20318 :         for sk in &mconf.members.m {
     763            0 :             buf.put_u64(sk.id.0);
     764            0 :             Self::put_cstr(buf, &sk.host);
     765            0 :             buf.put_u16(sk.pg_port);
     766            0 :         }
     767        20318 :         if let Some(ref new_members) = mconf.new_members {
     768            0 :             buf.put_u32(new_members.m.len() as u32);
     769            0 :             for sk in &new_members.m {
     770            0 :                 buf.put_u64(sk.id.0);
     771            0 :                 Self::put_cstr(buf, &sk.host);
     772            0 :                 buf.put_u16(sk.pg_port);
     773            0 :             }
     774        20318 :         } else {
     775        20318 :             buf.put_u32(0);
     776        20318 :         }
     777        20318 :     }
     778              : 
     779              :     /// Serialize acceptor -> proposer message.
     780        25283 :     pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
     781        25283 :         if proto_version == SK_PROTO_VERSION_3 {
     782        25283 :             match self {
     783        20318 :                 AcceptorProposerMessage::Greeting(msg) => {
     784        20318 :                     buf.put_u8(b'g');
     785        20318 :                     buf.put_u64(msg.node_id.0);
     786        20318 :                     Self::serialize_mconf(buf, &msg.mconf);
     787        20318 :                     buf.put_u64(msg.term)
     788              :                 }
     789         2578 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     790         2578 :                     buf.put_u8(b'v');
     791         2578 :                     buf.put_u32(msg.generation.into_inner());
     792         2578 :                     buf.put_u64(msg.term);
     793         2578 :                     buf.put_u8(msg.vote_given as u8);
     794         2578 :                     buf.put_u64(msg.flush_lsn.into());
     795         2578 :                     buf.put_u64(msg.truncate_lsn.into());
     796         2578 :                     buf.put_u32(msg.term_history.0.len() as u32);
     797         9545 :                     for e in &msg.term_history.0 {
     798         6967 :                         buf.put_u64(e.term);
     799         6967 :                         buf.put_u64(e.lsn.into());
     800         6967 :                     }
     801              :                 }
     802         2387 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     803         2387 :                     buf.put_u8(b'a');
     804         2387 :                     buf.put_u32(msg.generation.into_inner());
     805         2387 :                     buf.put_u64(msg.term);
     806         2387 :                     buf.put_u64(msg.flush_lsn.into());
     807         2387 :                     buf.put_u64(msg.commit_lsn.into());
     808         2387 :                     buf.put_i64(msg.hs_feedback.ts);
     809         2387 :                     buf.put_u64(msg.hs_feedback.xmin);
     810         2387 :                     buf.put_u64(msg.hs_feedback.catalog_xmin);
     811              : 
     812              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     813              :                     // if it is not present.
     814         2387 :                     if let Some(ref msg) = msg.pageserver_feedback {
     815            0 :                         msg.serialize(buf);
     816         2387 :                     }
     817              :                 }
     818              :             }
     819        25283 :             Ok(())
     820              :         // TODO remove 3 after converting all msgs
     821            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     822            0 :             match self {
     823            0 :                 AcceptorProposerMessage::Greeting(msg) => {
     824            0 :                     buf.put_u64_le('g' as u64);
     825            0 :                     // v2 didn't have mconf and fields were reordered
     826            0 :                     buf.put_u64_le(msg.term);
     827            0 :                     buf.put_u64_le(msg.node_id.0);
     828            0 :                 }
     829            0 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     830              :                     // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
     831            0 :                     buf.put_u64_le('v' as u64);
     832            0 :                     buf.put_u64_le(msg.term);
     833            0 :                     buf.put_u64_le(msg.vote_given as u64);
     834            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     835            0 :                     buf.put_u64_le(msg.truncate_lsn.into());
     836            0 :                     buf.put_u32_le(msg.term_history.0.len() as u32);
     837            0 :                     for e in &msg.term_history.0 {
     838            0 :                         buf.put_u64_le(e.term);
     839            0 :                         buf.put_u64_le(e.lsn.into());
     840            0 :                     }
     841              :                     // removed timeline_start_lsn
     842            0 :                     buf.put_u64_le(0);
     843              :                 }
     844            0 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     845              :                     // v2 didn't have generation
     846            0 :                     buf.put_u64_le('a' as u64);
     847            0 :                     buf.put_u64_le(msg.term);
     848            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     849            0 :                     buf.put_u64_le(msg.commit_lsn.into());
     850            0 :                     buf.put_i64_le(msg.hs_feedback.ts);
     851            0 :                     buf.put_u64_le(msg.hs_feedback.xmin);
     852            0 :                     buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     853              : 
     854              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     855              :                     // if it is not present.
     856            0 :                     if let Some(ref msg) = msg.pageserver_feedback {
     857            0 :                         msg.serialize(buf);
     858            0 :                     }
     859              :                 }
     860              :             }
     861            0 :             Ok(())
     862              :         } else {
     863            0 :             bail!("unsupported protocol version {}", proto_version);
     864              :         }
     865        25283 :     }
     866              : }
     867              : 
     868              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     869              : /// It controls all WAL disk writes and updates of control file.
     870              : ///
     871              : /// Currently safekeeper processes:
     872              : /// - messages from compute (proposers) and provides replies
     873              : /// - messages from broker peers
     874              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     875              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     876              :     /// determines last_log_term switch point.
     877              :     pub term_start_lsn: Lsn,
     878              : 
     879              :     pub state: TimelineState<CTRL>, // persistent state storage
     880              :     pub wal_store: WAL,
     881              : 
     882              :     node_id: NodeId, // safekeeper's node id
     883              : }
     884              : 
     885              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     886              : where
     887              :     CTRL: control_file::Storage,
     888              :     WAL: wal_storage::Storage,
     889              : {
     890              :     /// Accepts a control file storage containing the safekeeper state.
     891              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     892              :     /// and `server` (`wal_seg_size` inside it) fields.
     893         9112 :     pub fn new(
     894         9112 :         state: TimelineState<CTRL>,
     895         9112 :         wal_store: WAL,
     896         9112 :         node_id: NodeId,
     897         9112 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     898         9112 :         if state.tenant_id == TenantId::from([0u8; 16])
     899         9112 :             || state.timeline_id == TimelineId::from([0u8; 16])
     900              :         {
     901            0 :             bail!(
     902            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     903            0 :                 state.tenant_id,
     904            0 :                 state.timeline_id
     905              :             );
     906            9 :         }
     907              : 
     908         9112 :         Ok(SafeKeeper {
     909         9112 :             term_start_lsn: Lsn(0),
     910         9112 :             state,
     911         9112 :             wal_store,
     912         9112 :             node_id,
     913         9112 :         })
     914            9 :     }
     915              : 
     916              :     /// Get history of term switches for the available WAL
     917         3267 :     fn get_term_history(&self) -> TermHistory {
     918         3267 :         self.state
     919         3267 :             .acceptor_state
     920         3267 :             .term_history
     921         3267 :             .up_to(self.flush_lsn())
     922            9 :     }
     923              : 
     924           43 :     pub fn get_last_log_term(&self) -> Term {
     925           43 :         self.state
     926           43 :             .acceptor_state
     927           43 :             .get_last_log_term(self.flush_lsn())
     928            2 :     }
     929              : 
     930              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     931        15483 :     pub fn flush_lsn(&self) -> Lsn {
     932        15483 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     933         1883 :     }
     934              : 
     935              :     /// Process message from proposer and possibly form reply. Concurrent
     936              :     /// callers must exclude each other.
     937        30107 :     pub async fn process_msg(
     938        30107 :         &mut self,
     939        30107 :         msg: &ProposerAcceptorMessage,
     940        30107 :     ) -> Result<Option<AcceptorProposerMessage>> {
     941        30107 :         let res = match msg {
     942        20318 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     943         2581 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     944          688 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     945            5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     946            5 :                 self.handle_append_request(msg, true).await
     947              :             }
     948         3508 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     949         3508 :                 self.handle_append_request(msg, false).await
     950              :             }
     951         3007 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     952              :         };
     953              : 
     954              :         // BEGIN HADRON
     955        30107 :         match &res {
     956        30103 :             Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
     957        30103 :                 .with_label_values(&["success"])
     958        30103 :                 .inc(),
     959            4 :             Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
     960            4 :                 .with_label_values(&["error"])
     961            4 :                 .inc(),
     962              :         };
     963              : 
     964        30107 :         res
     965              :         // END HADRON
     966         1256 :     }
     967              : 
     968              :     /// Handle initial message from proposer: check its sanity and send my
     969              :     /// current term.
     970        20318 :     async fn handle_greeting(
     971        20318 :         &mut self,
     972        20318 :         msg: &ProposerGreeting,
     973        20318 :     ) -> Result<Option<AcceptorProposerMessage>> {
     974              :         /* Postgres major version mismatch is treated as fatal error
     975              :          * because safekeepers parse WAL headers and the format
     976              :          * may change between versions.
     977              :          */
     978        20318 :         if PgMajorVersion::try_from(msg.pg_version)?
     979        20318 :             != PgMajorVersion::try_from(self.state.server.pg_version)?
     980            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     981              :         {
     982            0 :             bail!(
     983            0 :                 "incompatible server version {}, expected {}",
     984              :                 msg.pg_version,
     985            0 :                 self.state.server.pg_version
     986              :             );
     987            0 :         }
     988              : 
     989        20318 :         if msg.tenant_id != self.state.tenant_id {
     990            0 :             bail!(
     991            0 :                 "invalid tenant ID, got {}, expected {}",
     992              :                 msg.tenant_id,
     993            0 :                 self.state.tenant_id
     994              :             );
     995            0 :         }
     996        20318 :         if msg.timeline_id != self.state.timeline_id {
     997            0 :             bail!(
     998            0 :                 "invalid timeline ID, got {}, expected {}",
     999              :                 msg.timeline_id,
    1000            0 :                 self.state.timeline_id
    1001              :             );
    1002            0 :         }
    1003        20318 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
    1004            0 :             bail!(
    1005            0 :                 "invalid wal_seg_size, got {}, expected {}",
    1006              :                 msg.wal_seg_size,
    1007            0 :                 self.state.server.wal_seg_size
    1008              :             );
    1009            0 :         }
    1010              : 
    1011              :         // system_id will be updated on mismatch
    1012              :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
    1013        20318 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
    1014            0 :             if self.state.server.system_id != 0 {
    1015            0 :                 warn!(
    1016            0 :                     "unexpected system ID arrived, got {}, expected {}",
    1017            0 :                     msg.system_id, self.state.server.system_id
    1018              :                 );
    1019            0 :             }
    1020              : 
    1021            0 :             let mut state = self.state.start_change();
    1022            0 :             state.server.system_id = msg.system_id;
    1023            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
    1024            0 :                 state.server.pg_version = msg.pg_version;
    1025            0 :             }
    1026            0 :             self.state.finish_change(&state).await?;
    1027            0 :         }
    1028              : 
    1029              :         // Switch into conf given by proposer conf if it is higher.
    1030        20318 :         self.state.membership_switch(msg.mconf.clone()).await?;
    1031              : 
    1032        20318 :         let apg = AcceptorGreeting {
    1033        20318 :             node_id: self.node_id,
    1034        20318 :             mconf: self.state.mconf.clone(),
    1035        20318 :             term: self.state.acceptor_state.term,
    1036        20318 :         };
    1037        20318 :         info!(
    1038            0 :             "processed greeting {:?} from walproposer, sending {:?}",
    1039              :             msg, apg
    1040              :         );
    1041        20318 :         Ok(Some(AcceptorProposerMessage::Greeting(apg)))
    1042            0 :     }
    1043              : 
    1044              :     /// Give vote for the given term, if we haven't done that previously.
    1045         2581 :     async fn handle_vote_request(
    1046         2581 :         &mut self,
    1047         2581 :         msg: &VoteRequest,
    1048         2581 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1049         2581 :         if self.state.mconf.generation != msg.generation {
    1050            1 :             bail!(
    1051            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1052              :                 msg,
    1053            1 :                 self.state.mconf.generation
    1054              :             );
    1055            2 :         }
    1056              :         // Once voted, we won't accept data from older proposers; flush
    1057              :         // everything we've already received so that new proposer starts
    1058              :         // streaming at end of our WAL, without overlap. WAL is truncated at
    1059              :         // streaming point and commit_lsn may be advanced from peers, so this
    1060              :         // also avoids possible spurious attempt to truncate committed WAL.
    1061         2580 :         self.wal_store.flush_wal().await?;
    1062              :         // initialize with refusal
    1063         2580 :         let mut resp = VoteResponse {
    1064         2580 :             generation: self.state.mconf.generation,
    1065         2580 :             term: self.state.acceptor_state.term,
    1066         2580 :             vote_given: false,
    1067         2580 :             flush_lsn: self.flush_lsn(),
    1068         2580 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
    1069         2580 :             term_history: self.get_term_history(),
    1070         2580 :         };
    1071         2580 :         if self.state.acceptor_state.term < msg.term {
    1072         2472 :             let mut state = self.state.start_change();
    1073         2472 :             state.acceptor_state.term = msg.term;
    1074              :             // persist vote before sending it out
    1075         2472 :             self.state.finish_change(&state).await?;
    1076              : 
    1077         2472 :             resp.term = self.state.acceptor_state.term;
    1078         2472 :             resp.vote_given = true;
    1079            1 :         }
    1080         2580 :         info!("processed {:?}: sending {:?}", msg, &resp);
    1081         2580 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
    1082            3 :     }
    1083              : 
    1084              :     /// Form AppendResponse from current state.
    1085         3010 :     fn append_response(&self) -> AppendResponse {
    1086         3010 :         let ar = AppendResponse {
    1087         3010 :             generation: self.state.mconf.generation,
    1088         3010 :             term: self.state.acceptor_state.term,
    1089         3010 :             flush_lsn: self.flush_lsn(),
    1090         3010 :             commit_lsn: self.state.commit_lsn,
    1091         3010 :             // will be filled by the upper code to avoid bothering safekeeper
    1092         3010 :             hs_feedback: HotStandbyFeedback::empty(),
    1093         3010 :             pageserver_feedback: None,
    1094         3010 :         };
    1095         3010 :         trace!("formed AppendResponse {:?}", ar);
    1096         3010 :         ar
    1097          623 :     }
    1098              : 
    1099          688 :     async fn handle_elected(
    1100          688 :         &mut self,
    1101          688 :         msg: &ProposerElected,
    1102          688 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1103          688 :         let _timer = MISC_OPERATION_SECONDS
    1104          688 :             .with_label_values(&["handle_elected"])
    1105          688 :             .start_timer();
    1106              : 
    1107          688 :         info!(
    1108            0 :             "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
    1109              :             msg,
    1110            0 :             self.state.acceptor_state.term,
    1111            0 :             self.get_last_log_term(),
    1112            0 :             self.flush_lsn()
    1113              :         );
    1114          688 :         if self.state.mconf.generation != msg.generation {
    1115            1 :             bail!(
    1116            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1117              :                 msg,
    1118            1 :                 self.state.mconf.generation
    1119              :             );
    1120            7 :         }
    1121          687 :         if self.state.acceptor_state.term < msg.term {
    1122            7 :             let mut state = self.state.start_change();
    1123            7 :             state.acceptor_state.term = msg.term;
    1124            7 :             self.state.finish_change(&state).await?;
    1125            0 :         }
    1126              : 
    1127              :         // If our term is higher, ignore the message (next feedback will inform the compute)
    1128          687 :         if self.state.acceptor_state.term > msg.term {
    1129            0 :             return Ok(None);
    1130            7 :         }
    1131              : 
    1132              :         // Before truncating WAL check-cross the check divergence point received
    1133              :         // from the walproposer.
    1134          687 :         let sk_th = self.get_term_history();
    1135          687 :         let last_common_point = match TermHistory::find_highest_common_point(
    1136          687 :             &msg.term_history,
    1137          687 :             &sk_th,
    1138          687 :             self.flush_lsn(),
    1139          687 :         ) {
    1140              :             // No common point. Expect streaming from the beginning of the
    1141              :             // history like walproposer while we don't have proper init.
    1142          139 :             None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
    1143          139 :                 "empty walproposer term history {:?}",
    1144              :                 msg.term_history
    1145            0 :             ))?,
    1146          548 :             Some(lcp) => lcp,
    1147              :         };
    1148              :         // This is expected to happen in a rare race when another connection
    1149              :         // from the same walproposer writes + flushes WAL after this connection
    1150              :         // sent flush_lsn in VoteRequest; for instance, very late
    1151              :         // ProposerElected message delivery after another connection was
    1152              :         // established and wrote WAL. In such cases error is transient;
    1153              :         // reconnection makes safekeeper send newest term history and flush_lsn
    1154              :         // and walproposer recalculates the streaming point. OTOH repeating
    1155              :         // error indicates a serious bug.
    1156          687 :         if last_common_point.lsn != msg.start_streaming_at {
    1157            0 :             bail!(
    1158            0 :                 "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1159              :                 last_common_point,
    1160              :                 msg.start_streaming_at,
    1161            0 :                 self.state.acceptor_state.term,
    1162              :                 sk_th,
    1163            0 :                 self.flush_lsn(),
    1164              :                 msg.term_history,
    1165              :             );
    1166            7 :         }
    1167              : 
    1168              :         // We are also expected to never attempt to truncate committed data.
    1169          687 :         assert!(
    1170          687 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
    1171            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1172              :             msg.start_streaming_at,
    1173              :             self.state.inmem.commit_lsn,
    1174            0 :             self.state.acceptor_state.term,
    1175              :             sk_th,
    1176            0 :             self.flush_lsn(),
    1177              :             msg.term_history,
    1178              :         );
    1179              : 
    1180              :         // Before first WAL write initialize its segment. It makes first segment
    1181              :         // pg_waldump'able because stream from compute doesn't include its
    1182              :         // segment and page headers.
    1183              :         //
    1184              :         // If we fail before first WAL write flush this action would be
    1185              :         // repeated, that's ok because it is idempotent.
    1186          687 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
    1187          139 :             self.wal_store
    1188          139 :                 .initialize_first_segment(msg.start_streaming_at)
    1189          139 :                 .await?;
    1190            0 :         }
    1191              : 
    1192              :         // truncate wal, update the LSNs
    1193          687 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
    1194              : 
    1195              :         // and now adopt term history from proposer
    1196              :         {
    1197          687 :             let mut state = self.state.start_change();
    1198              : 
    1199              :             // Here we learn initial LSN for the first time, set fields
    1200              :             // interested in that.
    1201              : 
    1202          687 :             if let Some(start_lsn) = msg.term_history.0.first() {
    1203          687 :                 if state.timeline_start_lsn == Lsn(0) {
    1204              :                     // Remember point where WAL begins globally. In the future it
    1205              :                     // will be intialized immediately on timeline creation.
    1206          139 :                     state.timeline_start_lsn = start_lsn.lsn;
    1207          139 :                     info!(
    1208            0 :                         "setting timeline_start_lsn to {:?}",
    1209              :                         state.timeline_start_lsn
    1210              :                     );
    1211            0 :                 }
    1212            0 :             }
    1213              : 
    1214          687 :             if state.peer_horizon_lsn == Lsn(0) {
    1215          139 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
    1216          139 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
    1217          139 :                 state.peer_horizon_lsn = state.timeline_start_lsn;
    1218          139 :             }
    1219          687 :             if state.local_start_lsn == Lsn(0) {
    1220          139 :                 state.local_start_lsn = msg.start_streaming_at;
    1221          139 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
    1222            0 :             }
    1223              :             // Initializing commit_lsn before acking first flushed record is
    1224              :             // important to let find_end_of_wal skip the hole in the beginning
    1225              :             // of the first segment.
    1226              :             //
    1227              :             // NB: on new clusters, this happens at the same time as
    1228              :             // timeline_start_lsn initialization, it is taken outside to provide
    1229              :             // upgrade.
    1230          687 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
    1231              : 
    1232              :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
    1233          687 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
    1234              :             // similar for remote_consistent_lsn
    1235          687 :             state.remote_consistent_lsn =
    1236          687 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
    1237              : 
    1238          687 :             state.acceptor_state.term_history = msg.term_history.clone();
    1239          687 :             self.state.finish_change(&state).await?;
    1240              :         }
    1241              : 
    1242          687 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
    1243              : 
    1244              :         // Cache LSN where term starts to immediately fsync control file with
    1245              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
    1246              :         // persisted commit_lsn on majority of safekeepers aligns.
    1247          687 :         self.term_start_lsn = match msg.term_history.0.last() {
    1248            0 :             None => bail!("proposer elected with empty term history"),
    1249          687 :             Some(term_lsn_start) => term_lsn_start.lsn,
    1250              :         };
    1251              : 
    1252          687 :         Ok(None)
    1253            8 :     }
    1254              : 
    1255              :     /// Advance commit_lsn taking into account what we have locally.
    1256              :     ///
    1257              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
    1258              :     /// already been done outside.
    1259         2347 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
    1260              :         // Both peers and walproposer communicate this value, we might already
    1261              :         // have a fresher (higher) version.
    1262         2347 :         candidate = max(candidate, self.state.inmem.commit_lsn);
    1263         2347 :         let commit_lsn = min(candidate, self.flush_lsn());
    1264         2347 :         assert!(
    1265         2347 :             commit_lsn >= self.state.inmem.commit_lsn,
    1266            0 :             "commit_lsn monotonicity violated: old={} new={}",
    1267              :             self.state.inmem.commit_lsn,
    1268              :             commit_lsn
    1269              :         );
    1270              : 
    1271         2347 :         self.state.inmem.commit_lsn = commit_lsn;
    1272              : 
    1273              :         // If new commit_lsn reached term switch, force sync of control
    1274              :         // file: walproposer in sync mode is very interested when this
    1275              :         // happens. Note: this is for sync-safekeepers mode only, as
    1276              :         // otherwise commit_lsn might jump over term_start_lsn.
    1277         2347 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
    1278           85 :             self.state.flush().await?;
    1279          620 :         }
    1280              : 
    1281         2347 :         Ok(())
    1282          620 :     }
    1283              : 
    1284              :     /// Handle request to append WAL.
    1285              :     #[allow(clippy::comparison_chain)]
    1286         3513 :     async fn handle_append_request(
    1287         3513 :         &mut self,
    1288         3513 :         msg: &AppendRequest,
    1289         3513 :         require_flush: bool,
    1290         3513 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1291              :         // Refuse message on generation mismatch. On reconnect wp will get full
    1292              :         // configuration from greeting.
    1293         3513 :         if self.state.mconf.generation != msg.h.generation {
    1294            1 :             bail!(
    1295            1 :                 "refusing append request due to generation mismatch: request {}, sk {}",
    1296              :                 msg.h.generation,
    1297            1 :                 self.state.mconf.generation
    1298              :             );
    1299          624 :         }
    1300              : 
    1301         3512 :         if self.state.acceptor_state.term < msg.h.term {
    1302            0 :             bail!("got AppendRequest before ProposerElected");
    1303          624 :         }
    1304              : 
    1305              :         // If our term is higher, immediately refuse the message. Send term only
    1306              :         // response; elected walproposer can never advance the term, so it will
    1307              :         // figure out the refusal from it -- which is important as term change
    1308              :         // should cause not just reconnection but whole walproposer re-election.
    1309         3512 :         if self.state.acceptor_state.term > msg.h.term {
    1310            0 :             let resp = AppendResponse::term_only(
    1311            0 :                 self.state.mconf.generation,
    1312            0 :                 self.state.acceptor_state.term,
    1313              :             );
    1314            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
    1315          624 :         }
    1316              : 
    1317              :         // Disallow any non-sequential writes, which can result in gaps or
    1318              :         // overwrites. If we need to move the pointer, ProposerElected message
    1319              :         // should have truncated WAL first accordingly. Note that the first
    1320              :         // condition (WAL rewrite) is quite expected in real world; it happens
    1321              :         // when walproposer reconnects to safekeeper and writes some more data
    1322              :         // while first connection still gets some packets later. It might be
    1323              :         // better to not log this as error! above.
    1324         3512 :         let write_lsn = self.wal_store.write_lsn();
    1325         3512 :         let flush_lsn = self.wal_store.flush_lsn();
    1326         3512 :         if write_lsn > msg.h.begin_lsn {
    1327            1 :             bail!(
    1328            1 :                 "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
    1329              :                 write_lsn,
    1330              :                 msg.h.begin_lsn
    1331              :             );
    1332          623 :         }
    1333         3511 :         if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
    1334            0 :             bail!(
    1335            0 :                 "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
    1336              :                 write_lsn,
    1337              :                 msg.h.begin_lsn,
    1338              :             );
    1339          623 :         }
    1340              : 
    1341              :         // Now we know that we are in the same term as the proposer, process the
    1342              :         // message.
    1343              : 
    1344              :         // do the job
    1345         3511 :         if !msg.wal_data.is_empty() {
    1346         1356 :             self.wal_store
    1347         1356 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
    1348         1356 :                 .await?;
    1349            0 :         }
    1350              : 
    1351              :         // flush wal to the disk, if required
    1352         3511 :         if require_flush {
    1353            3 :             self.wal_store.flush_wal().await?;
    1354          620 :         }
    1355              : 
    1356              :         // Update commit_lsn. It will be flushed to the control file regularly by the timeline
    1357              :         // manager, off of the WAL ingest hot path.
    1358         3511 :         if msg.h.commit_lsn != Lsn(0) {
    1359         2347 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
    1360            3 :         }
    1361              :         // Value calculated by walproposer can always lag:
    1362              :         // - safekeepers can forget inmem value and send to proposer lower
    1363              :         //   persisted one on restart;
    1364              :         // - if we make safekeepers always send persistent value,
    1365              :         //   any compute restart would pull it down.
    1366              :         // Thus, take max before adopting.
    1367         3511 :         self.state.inmem.peer_horizon_lsn =
    1368         3511 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
    1369              : 
    1370         3511 :         trace!(
    1371            0 :             "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
    1372            0 :             msg.wal_data.len(),
    1373              :             msg.h.begin_lsn,
    1374              :             msg.h.end_lsn,
    1375              :             msg.h.commit_lsn,
    1376              :             msg.h.truncate_lsn,
    1377              :             require_flush,
    1378              :         );
    1379              : 
    1380              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
    1381              :         // This is the common case for !require_flush, but a flush can still
    1382              :         // happen on segment bounds.
    1383         3511 :         if !require_flush && flush_lsn == self.flush_lsn() {
    1384         3508 :             return Ok(None);
    1385            3 :         }
    1386              : 
    1387            3 :         let resp = self.append_response();
    1388            3 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
    1389          625 :     }
    1390              : 
    1391              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
    1392         3007 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
    1393         3007 :         self.wal_store.flush_wal().await?;
    1394         3007 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
    1395         3007 :             self.append_response(),
    1396         3007 :         )))
    1397          620 :     }
    1398              : 
    1399              :     /// Update commit_lsn from peer safekeeper data.
    1400            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
    1401            0 :         if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
    1402              :             // Note: the check is too restrictive, generally we can update local
    1403              :             // commit_lsn if our history matches (is part of) history of advanced
    1404              :             // commit_lsn provider.
    1405            0 :             if sk_info.last_log_term == self.get_last_log_term() {
    1406            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
    1407            0 :             }
    1408            0 :         }
    1409            0 :         Ok(())
    1410            0 :     }
    1411              : }
    1412              : 
    1413              : #[cfg(test)]
    1414              : mod tests {
    1415              :     use std::ops::Deref;
    1416              :     use std::str::FromStr;
    1417              :     use std::time::{Instant, UNIX_EPOCH};
    1418              : 
    1419              :     use futures::future::BoxFuture;
    1420              :     use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
    1421              :     use safekeeper_api::ServerInfo;
    1422              :     use safekeeper_api::membership::{
    1423              :         Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
    1424              :     };
    1425              : 
    1426              :     use super::*;
    1427              :     use crate::state::{EvictionState, TimelinePersistentState};
    1428              : 
    1429              :     // fake storage for tests
    1430              :     struct InMemoryState {
    1431              :         persisted_state: TimelinePersistentState,
    1432              :     }
    1433              : 
    1434              :     impl control_file::Storage for InMemoryState {
    1435            5 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
    1436            5 :             self.persisted_state = s.clone();
    1437            5 :             Ok(())
    1438            5 :         }
    1439              : 
    1440            0 :         fn last_persist_at(&self) -> Instant {
    1441            0 :             Instant::now()
    1442            0 :         }
    1443              :     }
    1444              : 
    1445              :     impl Deref for InMemoryState {
    1446              :         type Target = TimelinePersistentState;
    1447              : 
    1448          100 :         fn deref(&self) -> &Self::Target {
    1449          100 :             &self.persisted_state
    1450          100 :         }
    1451              :     }
    1452              : 
    1453            3 :     fn test_sk_state() -> TimelinePersistentState {
    1454            3 :         let mut state = TimelinePersistentState::empty();
    1455            3 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1456            3 :         state.tenant_id = TenantId::from([1u8; 16]);
    1457            3 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1458            3 :         state
    1459            3 :     }
    1460              : 
    1461              :     struct DummyWalStore {
    1462              :         lsn: Lsn,
    1463              :     }
    1464              : 
    1465              :     impl wal_storage::Storage for DummyWalStore {
    1466            4 :         fn write_lsn(&self) -> Lsn {
    1467            4 :             self.lsn
    1468            4 :         }
    1469              : 
    1470           19 :         fn flush_lsn(&self) -> Lsn {
    1471           19 :             self.lsn
    1472           19 :         }
    1473              : 
    1474            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
    1475            2 :             Ok(())
    1476            2 :         }
    1477              : 
    1478            3 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1479            3 :             self.lsn = startpos + buf.len() as u64;
    1480            3 :             Ok(())
    1481            3 :         }
    1482              : 
    1483            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1484            2 :             self.lsn = end_pos;
    1485            2 :             Ok(())
    1486            2 :         }
    1487              : 
    1488            5 :         async fn flush_wal(&mut self) -> Result<()> {
    1489            5 :             Ok(())
    1490            5 :         }
    1491              : 
    1492            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1493            0 :             Box::pin(async { Ok(()) })
    1494            0 :         }
    1495              : 
    1496            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1497            0 :             crate::metrics::WalStorageMetrics::default()
    1498            0 :         }
    1499              :     }
    1500              : 
    1501              :     #[tokio::test]
    1502            1 :     async fn test_voting() {
    1503            1 :         let storage = InMemoryState {
    1504            1 :             persisted_state: test_sk_state(),
    1505            1 :         };
    1506            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1507            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1508              : 
    1509              :         // Vote with generation mismatch should be rejected.
    1510            1 :         let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1511            1 :             generation: SafekeeperGeneration::new(42),
    1512            1 :             term: 1,
    1513            1 :         });
    1514            1 :         assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
    1515              : 
    1516              :         // check voting for 1 is ok
    1517            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1518            1 :             generation: Generation::new(0),
    1519            1 :             term: 1,
    1520            1 :         });
    1521            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1522            1 :         match vote_resp.unwrap() {
    1523            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
    1524            0 :             r => panic!("unexpected response: {r:?}"),
    1525              :         }
    1526              : 
    1527              :         // reboot...
    1528            1 :         let state = sk.state.deref().clone();
    1529            1 :         let storage = InMemoryState {
    1530            1 :             persisted_state: state,
    1531            1 :         };
    1532              : 
    1533            1 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1534              : 
    1535              :         // and ensure voting second time for 1 is not ok
    1536            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1537            1 :         match vote_resp.unwrap() {
    1538            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
    1539            1 :             r => panic!("unexpected response: {r:?}"),
    1540            1 :         }
    1541            1 :     }
    1542              : 
    1543              :     #[tokio::test]
    1544            1 :     async fn test_last_log_term_switch() {
    1545            1 :         let storage = InMemoryState {
    1546            1 :             persisted_state: test_sk_state(),
    1547            1 :         };
    1548            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1549              : 
    1550            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1551              : 
    1552            1 :         let mut ar_hdr = AppendRequestHeader {
    1553            1 :             generation: Generation::new(0),
    1554            1 :             term: 2,
    1555            1 :             begin_lsn: Lsn(1),
    1556            1 :             end_lsn: Lsn(2),
    1557            1 :             commit_lsn: Lsn(0),
    1558            1 :             truncate_lsn: Lsn(0),
    1559            1 :         };
    1560            1 :         let mut append_request = AppendRequest {
    1561            1 :             h: ar_hdr.clone(),
    1562            1 :             wal_data: Bytes::from_static(b"b"),
    1563            1 :         };
    1564              : 
    1565            1 :         let pem = ProposerElected {
    1566            1 :             generation: Generation::new(0),
    1567            1 :             term: 2,
    1568            1 :             start_streaming_at: Lsn(1),
    1569            1 :             term_history: TermHistory(vec![
    1570            1 :                 TermLsn {
    1571            1 :                     term: 1,
    1572            1 :                     lsn: Lsn(1),
    1573            1 :                 },
    1574            1 :                 TermLsn {
    1575            1 :                     term: 2,
    1576            1 :                     lsn: Lsn(3),
    1577            1 :                 },
    1578            1 :             ]),
    1579            1 :         };
    1580              : 
    1581              :         // check that elected msg with generation mismatch is rejected
    1582            1 :         let mut pem_gen_mismatch = pem.clone();
    1583            1 :         pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1584            1 :         assert!(
    1585            1 :             sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
    1586            1 :                 .await
    1587            1 :                 .is_err()
    1588              :         );
    1589              : 
    1590            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1591            1 :             .await
    1592            1 :             .unwrap();
    1593              : 
    1594              :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1595            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1596            1 :             .await
    1597            1 :             .unwrap();
    1598            1 :         assert_eq!(sk.get_last_log_term(), 1);
    1599              : 
    1600              :         // but record at term_start_lsn does the switch
    1601            1 :         ar_hdr.begin_lsn = Lsn(2);
    1602            1 :         ar_hdr.end_lsn = Lsn(3);
    1603            1 :         append_request = AppendRequest {
    1604            1 :             h: ar_hdr,
    1605            1 :             wal_data: Bytes::from_static(b"b"),
    1606            1 :         };
    1607            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1608            1 :             .await
    1609            1 :             .unwrap();
    1610            1 :         assert_eq!(sk.get_last_log_term(), 2);
    1611            1 :     }
    1612              : 
    1613              :     #[tokio::test]
    1614            1 :     async fn test_non_consecutive_write() {
    1615            1 :         let storage = InMemoryState {
    1616            1 :             persisted_state: test_sk_state(),
    1617            1 :         };
    1618            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1619              : 
    1620            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1621              : 
    1622            1 :         let pem = ProposerElected {
    1623            1 :             generation: Generation::new(0),
    1624            1 :             term: 1,
    1625            1 :             start_streaming_at: Lsn(1),
    1626            1 :             term_history: TermHistory(vec![TermLsn {
    1627            1 :                 term: 1,
    1628            1 :                 lsn: Lsn(1),
    1629            1 :             }]),
    1630            1 :         };
    1631            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1632            1 :             .await
    1633            1 :             .unwrap();
    1634              : 
    1635            1 :         let ar_hdr = AppendRequestHeader {
    1636            1 :             generation: Generation::new(0),
    1637            1 :             term: 1,
    1638            1 :             begin_lsn: Lsn(1),
    1639            1 :             end_lsn: Lsn(2),
    1640            1 :             commit_lsn: Lsn(0),
    1641            1 :             truncate_lsn: Lsn(0),
    1642            1 :         };
    1643            1 :         let append_request = AppendRequest {
    1644            1 :             h: ar_hdr.clone(),
    1645            1 :             wal_data: Bytes::from_static(b"b"),
    1646            1 :         };
    1647              : 
    1648              :         // check that append request with generation mismatch is rejected
    1649            1 :         let mut ar_hdr_gen_mismatch = ar_hdr.clone();
    1650            1 :         ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1651            1 :         let append_request_gen_mismatch = AppendRequest {
    1652            1 :             h: ar_hdr_gen_mismatch,
    1653            1 :             wal_data: Bytes::from_static(b"b"),
    1654            1 :         };
    1655            1 :         assert!(
    1656            1 :             sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
    1657            1 :                 append_request_gen_mismatch
    1658            1 :             ))
    1659            1 :             .await
    1660            1 :             .is_err()
    1661              :         );
    1662              : 
    1663              :         // do write ending at 2, it should be ok
    1664            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1665            1 :             .await
    1666            1 :             .unwrap();
    1667            1 :         let mut ar_hrd2 = ar_hdr.clone();
    1668            1 :         ar_hrd2.begin_lsn = Lsn(4);
    1669            1 :         ar_hrd2.end_lsn = Lsn(5);
    1670            1 :         let append_request = AppendRequest {
    1671            1 :             h: ar_hdr,
    1672            1 :             wal_data: Bytes::from_static(b"b"),
    1673            1 :         };
    1674              :         // and now starting at 4, it must fail
    1675            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1676            1 :             .await
    1677            1 :             .unwrap_err();
    1678            1 :     }
    1679              : 
    1680              :     #[test]
    1681            1 :     fn test_find_highest_common_point_none() {
    1682            1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1683            1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1684            1 :         assert_eq!(
    1685            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1686              :             None
    1687              :         );
    1688            1 :     }
    1689              : 
    1690              :     #[test]
    1691            1 :     fn test_find_highest_common_point_middle() {
    1692            1 :         let prop_th = TermHistory(vec![
    1693            1 :             (1, Lsn(10)).into(),
    1694            1 :             (2, Lsn(20)).into(),
    1695            1 :             (4, Lsn(40)).into(),
    1696            1 :         ]);
    1697            1 :         let sk_th = TermHistory(vec![
    1698            1 :             (1, Lsn(10)).into(),
    1699            1 :             (2, Lsn(20)).into(),
    1700            1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1701            1 :         ]);
    1702            1 :         assert_eq!(
    1703            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1704              :             Some(TermLsn {
    1705              :                 term: 2,
    1706              :                 lsn: Lsn(30),
    1707              :             })
    1708              :         );
    1709            1 :     }
    1710              : 
    1711              :     #[test]
    1712            1 :     fn test_find_highest_common_point_sk_end() {
    1713            1 :         let prop_th = TermHistory(vec![
    1714            1 :             (1, Lsn(10)).into(),
    1715            1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1716            1 :             (4, Lsn(40)).into(),
    1717            1 :         ]);
    1718            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1719            1 :         assert_eq!(
    1720            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1721              :             Some(TermLsn {
    1722              :                 term: 2,
    1723              :                 lsn: Lsn(32),
    1724              :             })
    1725              :         );
    1726            1 :     }
    1727              : 
    1728              :     #[test]
    1729            1 :     fn test_find_highest_common_point_walprop() {
    1730            1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1731            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1732            1 :         assert_eq!(
    1733            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1734              :             Some(TermLsn {
    1735              :                 term: 2,
    1736              :                 lsn: Lsn(32),
    1737              :             })
    1738              :         );
    1739            1 :     }
    1740              : 
    1741              :     #[test]
    1742            1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1743            1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1744            1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1745            1 :         let state = TimelinePersistentState {
    1746            1 :             tenant_id,
    1747            1 :             timeline_id,
    1748            1 :             mconf: Configuration {
    1749            1 :                 generation: SafekeeperGeneration::new(42),
    1750            1 :                 members: MemberSet::new(vec![SafekeeperId {
    1751            1 :                     id: NodeId(1),
    1752            1 :                     host: "hehe.org".to_owned(),
    1753            1 :                     pg_port: 5432,
    1754            1 :                 }])
    1755            1 :                 .expect("duplicate member"),
    1756            1 :                 new_members: None,
    1757            1 :             },
    1758            1 :             acceptor_state: AcceptorState {
    1759            1 :                 term: 42,
    1760            1 :                 term_history: TermHistory(vec![TermLsn {
    1761            1 :                     lsn: Lsn(0x1),
    1762            1 :                     term: 41,
    1763            1 :                 }]),
    1764            1 :             },
    1765            1 :             server: ServerInfo {
    1766            1 :                 pg_version: PgVersionId::from_full_pg_version(140000),
    1767            1 :                 system_id: 0x1234567887654321,
    1768            1 :                 wal_seg_size: 0x12345678,
    1769            1 :             },
    1770            1 :             proposer_uuid: {
    1771            1 :                 let mut arr = timeline_id.as_arr();
    1772            1 :                 arr.reverse();
    1773            1 :                 arr
    1774            1 :             },
    1775            1 :             timeline_start_lsn: Lsn(0x12345600),
    1776            1 :             local_start_lsn: Lsn(0x12),
    1777            1 :             commit_lsn: Lsn(1234567800),
    1778            1 :             backup_lsn: Lsn(1234567300),
    1779            1 :             peer_horizon_lsn: Lsn(9999999),
    1780            1 :             remote_consistent_lsn: Lsn(1234560000),
    1781            1 :             partial_backup: crate::wal_backup_partial::State::default(),
    1782            1 :             eviction_state: EvictionState::Present,
    1783            1 :             creation_ts: UNIX_EPOCH,
    1784            1 :         };
    1785              : 
    1786            1 :         let ser = state.ser().unwrap();
    1787              : 
    1788            1 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1789              : 
    1790            1 :         assert_eq!(deser, state);
    1791            1 :     }
    1792              : }
        

Generated by: LCOV version 2.1-beta