LCOV - code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 73.0 % 1198 875
Test Date: 2025-03-12 16:10:49 Functions: 57.5 % 219 126

            Line data    Source code
       1              : //! Acceptor part of proposer-acceptor consensus algorithm.
       2              : 
       3              : use std::cmp::{max, min};
       4              : use std::fmt;
       5              : use std::io::Read;
       6              : use std::str::FromStr;
       7              : 
       8              : use anyhow::{Context, Result, bail};
       9              : use byteorder::{LittleEndian, ReadBytesExt};
      10              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      11              : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
      12              : use pq_proto::SystemId;
      13              : use safekeeper_api::membership::{
      14              :     INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
      15              : };
      16              : use safekeeper_api::models::HotStandbyFeedback;
      17              : use safekeeper_api::{Term, membership};
      18              : use serde::{Deserialize, Serialize};
      19              : use storage_broker::proto::SafekeeperTimelineInfo;
      20              : use tracing::*;
      21              : use utils::bin_ser::LeSer;
      22              : use utils::id::{NodeId, TenantId, TimelineId};
      23              : use utils::lsn::Lsn;
      24              : use utils::pageserver_feedback::PageserverFeedback;
      25              : 
      26              : use crate::metrics::MISC_OPERATION_SECONDS;
      27              : use crate::state::TimelineState;
      28              : use crate::{control_file, wal_storage};
      29              : 
      30              : pub const SK_PROTO_VERSION_2: u32 = 2;
      31              : pub const SK_PROTO_VERSION_3: u32 = 3;
      32              : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      33              : 
      34            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      35              : pub struct TermLsn {
      36              :     pub term: Term,
      37              :     pub lsn: Lsn,
      38              : }
      39              : 
      40              : // Creation from tuple provides less typing (e.g. for unit tests).
      41              : impl From<(Term, Lsn)> for TermLsn {
      42         1283 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      43         1283 :         TermLsn {
      44         1283 :             term: pair.0,
      45         1283 :             lsn: pair.1,
      46         1283 :         }
      47         1283 :     }
      48              : }
      49              : 
      50            0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      51              : pub struct TermHistory(pub Vec<TermLsn>);
      52              : 
      53              : impl TermHistory {
      54         1435 :     pub fn empty() -> TermHistory {
      55         1435 :         TermHistory(Vec::new())
      56         1435 :     }
      57              : 
      58              :     // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
      59          613 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      60          613 :         let n_entries = bytes
      61          613 :             .get_u32_f()
      62          613 :             .with_context(|| "TermHistory misses len")?;
      63          613 :         let mut res = Vec::with_capacity(n_entries as usize);
      64         4962 :         for i in 0..n_entries {
      65         4962 :             let term = bytes
      66         4962 :                 .get_u64_f()
      67         4962 :                 .with_context(|| format!("TermHistory pos {} misses term", i))?;
      68         4962 :             let lsn = bytes
      69         4962 :                 .get_u64_f()
      70         4962 :                 .with_context(|| format!("TermHistory pos {} misses lsn", i))?
      71         4962 :                 .into();
      72         4962 :             res.push(TermLsn { term, lsn })
      73              :         }
      74          613 :         Ok(TermHistory(res))
      75          613 :     }
      76              : 
      77              :     // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
      78              :     // TODO remove once v2 protocol is fully dropped.
      79            0 :     pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
      80            0 :         if bytes.remaining() < 4 {
      81            0 :             bail!("TermHistory misses len");
      82            0 :         }
      83            0 :         let n_entries = bytes.get_u32_le();
      84            0 :         let mut res = Vec::with_capacity(n_entries as usize);
      85            0 :         for _ in 0..n_entries {
      86            0 :             if bytes.remaining() < 16 {
      87            0 :                 bail!("TermHistory is incomplete");
      88            0 :             }
      89            0 :             res.push(TermLsn {
      90            0 :                 term: bytes.get_u64_le(),
      91            0 :                 lsn: bytes.get_u64_le().into(),
      92            0 :             })
      93              :         }
      94            0 :         Ok(TermHistory(res))
      95            0 :     }
      96              : 
      97              :     /// Return copy of self with switches happening strictly after up_to
      98              :     /// truncated.
      99         4597 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
     100         4597 :         let mut res = Vec::with_capacity(self.0.len());
     101        17951 :         for e in &self.0 {
     102        13359 :             if e.lsn > up_to {
     103            5 :                 break;
     104        13354 :             }
     105        13354 :             res.push(*e);
     106              :         }
     107         4597 :         TermHistory(res)
     108         4597 :     }
     109              : 
     110              :     /// Find point of divergence between leader (walproposer) term history and
     111              :     /// safekeeper. Arguments are not symmetric as proposer history ends at
     112              :     /// +infinity while safekeeper at flush_lsn.
     113              :     /// C version is at walproposer SendProposerElected.
     114          624 :     pub fn find_highest_common_point(
     115          624 :         prop_th: &TermHistory,
     116          624 :         sk_th: &TermHistory,
     117          624 :         sk_wal_end: Lsn,
     118          624 :     ) -> Option<TermLsn> {
     119          624 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     120              : 
     121          624 :         if let Some(sk_th_last) = sk_th.last() {
     122          489 :             assert!(
     123          489 :                 sk_th_last.lsn <= sk_wal_end,
     124            0 :                 "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
     125              :                 sk_th_last,
     126              :                 sk_wal_end
     127              :             );
     128          135 :         }
     129              : 
     130              :         // find last common term, if any...
     131          624 :         let mut last_common_idx = None;
     132         4321 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     133         4321 :             if prop_th[i].term != sk_th[i].term {
     134            5 :                 break;
     135         4316 :             }
     136         4316 :             // If term is the same, LSN must be equal as well.
     137         4316 :             assert!(
     138         4316 :                 prop_th[i].lsn == sk_th[i].lsn,
     139            0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     140            0 :                 prop_th[i].term,
     141            0 :                 prop_th[i].lsn,
     142            0 :                 sk_th[i].lsn
     143              :             );
     144         4316 :             last_common_idx = Some(i);
     145              :         }
     146          624 :         let last_common_idx = last_common_idx?;
     147              :         // Now find where it ends at both prop and sk and take min. End of
     148              :         // (common) term is the start of the next except it is the last one;
     149              :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     150              :         // +infinity, so we just take flush_lsn then.
     151          488 :         if last_common_idx == prop_th.len() - 1 {
     152           45 :             Some(TermLsn {
     153           45 :                 term: prop_th[last_common_idx].term,
     154           45 :                 lsn: sk_wal_end,
     155           45 :             })
     156              :         } else {
     157          443 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     158          443 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     159            4 :                 sk_th[last_common_idx + 1].lsn
     160              :             } else {
     161          439 :                 sk_wal_end
     162              :             };
     163          443 :             Some(TermLsn {
     164          443 :                 term: prop_th[last_common_idx].term,
     165          443 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     166          443 :             })
     167              :         }
     168          624 :     }
     169              : }
     170              : 
     171              : /// Display only latest entries for Debug.
     172              : impl fmt::Debug for TermHistory {
     173          336 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     174          336 :         let n_printed = 20;
     175          336 :         write!(
     176          336 :             fmt,
     177          336 :             "{}{:?}",
     178          336 :             if self.0.len() > n_printed { "... " } else { "" },
     179          336 :             self.0
     180          336 :                 .iter()
     181          336 :                 .rev()
     182          336 :                 .take(n_printed)
     183         1256 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     184          336 :                 .collect::<Vec<_>>()
     185          336 :         )
     186          336 :     }
     187              : }
     188              : 
     189              : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     190              : pub type PgUuid = [u8; 16];
     191              : 
     192              : /// Persistent consensus state of the acceptor.
     193            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     194              : pub struct AcceptorState {
     195              :     /// acceptor's last term it voted for (advanced in 1 phase)
     196              :     pub term: Term,
     197              :     /// History of term switches for safekeeper's WAL.
     198              :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     199              :     /// from the proposer before recovery.
     200              :     pub term_history: TermHistory,
     201              : }
     202              : 
     203              : impl AcceptorState {
     204              :     /// acceptor's last_log_term is the term of the highest entry in the log
     205         1343 :     pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
     206         1343 :         let th = self.term_history.up_to(flush_lsn);
     207         1343 :         match th.0.last() {
     208         1337 :             Some(e) => e.term,
     209            6 :             None => 0,
     210              :         }
     211         1343 :     }
     212              : }
     213              : 
     214              : // protocol messages
     215              : 
     216              : /// Initial Proposer -> Acceptor message
     217            0 : #[derive(Debug, Deserialize)]
     218              : pub struct ProposerGreeting {
     219              :     pub tenant_id: TenantId,
     220              :     pub timeline_id: TimelineId,
     221              :     pub mconf: membership::Configuration,
     222              :     /// Postgres server version
     223              :     pub pg_version: u32,
     224              :     pub system_id: SystemId,
     225              :     pub wal_seg_size: u32,
     226              : }
     227              : 
     228              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     229            0 : #[derive(Debug, Deserialize)]
     230              : pub struct ProposerGreetingV2 {
     231              :     /// proposer-acceptor protocol version
     232              :     pub protocol_version: u32,
     233              :     /// Postgres server version
     234              :     pub pg_version: u32,
     235              :     pub proposer_id: PgUuid,
     236              :     pub system_id: SystemId,
     237              :     pub timeline_id: TimelineId,
     238              :     pub tenant_id: TenantId,
     239              :     pub tli: TimeLineID,
     240              :     pub wal_seg_size: u32,
     241              : }
     242              : 
     243              : /// Acceptor -> Proposer initial response: the highest term known to me
     244              : /// (acceptor voted for).
     245              : #[derive(Debug, Serialize)]
     246              : pub struct AcceptorGreeting {
     247              :     node_id: NodeId,
     248              :     mconf: membership::Configuration,
     249              :     term: u64,
     250              : }
     251              : 
     252              : /// Vote request sent from proposer to safekeepers
     253              : #[derive(Debug)]
     254              : pub struct VoteRequest {
     255              :     pub generation: Generation,
     256              :     pub term: Term,
     257              : }
     258              : 
     259              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     260            0 : #[derive(Debug, Deserialize)]
     261              : pub struct VoteRequestV2 {
     262              :     pub term: Term,
     263              : }
     264              : 
     265              : /// Vote itself, sent from safekeeper to proposer
     266              : #[derive(Debug, Serialize)]
     267              : pub struct VoteResponse {
     268              :     generation: Generation, // membership conf generation
     269              :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     270              :     vote_given: bool,
     271              :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     272              :     // proposer to choose the most advanced one.
     273              :     pub flush_lsn: Lsn,
     274              :     truncate_lsn: Lsn,
     275              :     pub term_history: TermHistory,
     276              : }
     277              : 
     278              : /*
     279              :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     280              :  * term history to it.
     281              :  */
     282              : #[derive(Debug, Clone)]
     283              : pub struct ProposerElected {
     284              :     pub generation: Generation, // membership conf generation
     285              :     pub term: Term,
     286              :     pub start_streaming_at: Lsn,
     287              :     pub term_history: TermHistory,
     288              : }
     289              : 
     290              : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     291              : /// communicates commit_lsn.
     292              : #[derive(Debug)]
     293              : pub struct AppendRequest {
     294              :     pub h: AppendRequestHeader,
     295              :     pub wal_data: Bytes,
     296              : }
     297            0 : #[derive(Debug, Clone, Deserialize)]
     298              : pub struct AppendRequestHeader {
     299              :     pub generation: Generation, // membership conf generation
     300              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     301              :     pub term: Term,
     302              :     /// start position of message in WAL
     303              :     pub begin_lsn: Lsn,
     304              :     /// end position of message in WAL
     305              :     pub end_lsn: Lsn,
     306              :     /// LSN committed by quorum of safekeepers
     307              :     pub commit_lsn: Lsn,
     308              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     309              :     pub truncate_lsn: Lsn,
     310              : }
     311              : 
     312              : /// V2 of the message; exists as a struct because we (de)serialized it as is.
     313            0 : #[derive(Debug, Clone, Deserialize)]
     314              : pub struct AppendRequestHeaderV2 {
     315              :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     316              :     pub term: Term,
     317              :     // TODO: remove this field from the protocol, it in unused -- LSN of term
     318              :     // switch can be taken from ProposerElected (as well as from term history).
     319              :     pub term_start_lsn: Lsn,
     320              :     /// start position of message in WAL
     321              :     pub begin_lsn: Lsn,
     322              :     /// end position of message in WAL
     323              :     pub end_lsn: Lsn,
     324              :     /// LSN committed by quorum of safekeepers
     325              :     pub commit_lsn: Lsn,
     326              :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     327              :     pub truncate_lsn: Lsn,
     328              :     // only for logging/debugging
     329              :     pub proposer_uuid: PgUuid,
     330              : }
     331              : 
     332              : /// Report safekeeper state to proposer
     333              : #[derive(Debug, Serialize, Clone)]
     334              : pub struct AppendResponse {
     335              :     // Membership conf generation. Not strictly required because on mismatch
     336              :     // connection is reset, but let's sanity check it.
     337              :     generation: Generation,
     338              :     // Current term of the safekeeper; if it is higher than proposer's, the
     339              :     // compute is out of date.
     340              :     pub term: Term,
     341              :     // Flushed end of wal on safekeeper; one should be always mindful from what
     342              :     // term history this value comes, either checking history directly or
     343              :     // observing term being set to one for which WAL truncation is known to have
     344              :     // happened.
     345              :     pub flush_lsn: Lsn,
     346              :     // We report back our awareness about which WAL is committed, as this is
     347              :     // a criterion for walproposer --sync mode exit
     348              :     pub commit_lsn: Lsn,
     349              :     pub hs_feedback: HotStandbyFeedback,
     350              :     pub pageserver_feedback: Option<PageserverFeedback>,
     351              : }
     352              : 
     353              : impl AppendResponse {
     354            0 :     fn term_only(generation: Generation, term: Term) -> AppendResponse {
     355            0 :         AppendResponse {
     356            0 :             generation,
     357            0 :             term,
     358            0 :             flush_lsn: Lsn(0),
     359            0 :             commit_lsn: Lsn(0),
     360            0 :             hs_feedback: HotStandbyFeedback::empty(),
     361            0 :             pageserver_feedback: None,
     362            0 :         }
     363            0 :     }
     364              : }
     365              : 
     366              : /// Proposer -> Acceptor messages
     367              : #[derive(Debug)]
     368              : pub enum ProposerAcceptorMessage {
     369              :     Greeting(ProposerGreeting),
     370              :     VoteRequest(VoteRequest),
     371              :     Elected(ProposerElected),
     372              :     AppendRequest(AppendRequest),
     373              :     NoFlushAppendRequest(AppendRequest),
     374              :     FlushWAL,
     375              : }
     376              : 
     377              : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
     378              : /// All reads are in network (big endian) order.
     379              : trait BytesF {
     380              :     fn get_u8_f(&mut self) -> Result<u8>;
     381              :     fn get_u16_f(&mut self) -> Result<u16>;
     382              :     fn get_u32_f(&mut self) -> Result<u32>;
     383              :     fn get_u64_f(&mut self) -> Result<u64>;
     384              : }
     385              : 
     386              : impl BytesF for Bytes {
     387        25311 :     fn get_u8_f(&mut self) -> Result<u8> {
     388        25311 :         if self.is_empty() {
     389            0 :             bail!("no bytes left, expected 1");
     390        25311 :         }
     391        25311 :         Ok(self.get_u8())
     392        25311 :     }
     393            0 :     fn get_u16_f(&mut self) -> Result<u16> {
     394            0 :         if self.remaining() < 2 {
     395            0 :             bail!("no bytes left, expected 2");
     396            0 :         }
     397            0 :         Ok(self.get_u16())
     398            0 :     }
     399       103192 :     fn get_u32_f(&mut self) -> Result<u32> {
     400       103192 :         if self.remaining() < 4 {
     401            0 :             bail!("only {} bytes left, expected 4", self.remaining());
     402       103192 :         }
     403       103192 :         Ok(self.get_u32())
     404       103192 :     }
     405        46844 :     fn get_u64_f(&mut self) -> Result<u64> {
     406        46844 :         if self.remaining() < 8 {
     407            0 :             bail!("only {} bytes left, expected 8", self.remaining());
     408        46844 :         }
     409        46844 :         Ok(self.get_u64())
     410        46844 :     }
     411              : }
     412              : 
     413              : impl ProposerAcceptorMessage {
     414              :     /// Read cstring from Bytes.
     415        38634 :     fn get_cstr(buf: &mut Bytes) -> Result<String> {
     416        38634 :         let pos = buf
     417        38634 :             .iter()
     418      1274922 :             .position(|x| *x == 0)
     419        38634 :             .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
     420        38634 :         let result = buf.split_to(pos);
     421        38634 :         buf.advance(1); // drop the null terminator
     422        38634 :         match std::str::from_utf8(&result) {
     423        38634 :             Ok(s) => Ok(s.to_string()),
     424            0 :             Err(e) => bail!("invalid utf8 in cstring: {}", e),
     425              :         }
     426        38634 :     }
     427              : 
     428              :     /// Read membership::Configuration from Bytes.
     429        19317 :     fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
     430        19317 :         let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
     431        19317 :         let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
     432              :         // Main member set must have at least someone in valid configuration.
     433              :         // Empty conf is allowed until we fully migrate.
     434        19317 :         if generation != INVALID_GENERATION && members_len == 0 {
     435            0 :             bail!("empty members_len");
     436        19317 :         }
     437        19317 :         let mut members = MemberSet::empty();
     438        19317 :         for i in 0..members_len {
     439            0 :             let id = buf
     440            0 :                 .get_u64_f()
     441            0 :                 .with_context(|| format!("reading member {} node_id", i))?;
     442            0 :             let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
     443            0 :             let pg_port = buf
     444            0 :                 .get_u16_f()
     445            0 :                 .with_context(|| format!("reading member {} port", i))?;
     446            0 :             let sk = SafekeeperId {
     447            0 :                 id: NodeId(id),
     448            0 :                 host,
     449            0 :                 pg_port,
     450            0 :             };
     451            0 :             members.add(sk)?;
     452              :         }
     453        19317 :         let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
     454              :         // Non joint conf.
     455        19317 :         if new_members_len == 0 {
     456        19317 :             Ok(membership::Configuration {
     457        19317 :                 generation,
     458        19317 :                 members,
     459        19317 :                 new_members: None,
     460        19317 :             })
     461              :         } else {
     462            0 :             let mut new_members = MemberSet::empty();
     463            0 :             for i in 0..new_members_len {
     464            0 :                 let id = buf
     465            0 :                     .get_u64_f()
     466            0 :                     .with_context(|| format!("reading new member {} node_id", i))?;
     467            0 :                 let host = Self::get_cstr(buf)
     468            0 :                     .with_context(|| format!("reading new member {} host", i))?;
     469            0 :                 let pg_port = buf
     470            0 :                     .get_u16_f()
     471            0 :                     .with_context(|| format!("reading new member {} port", i))?;
     472            0 :                 let sk = SafekeeperId {
     473            0 :                     id: NodeId(id),
     474            0 :                     host,
     475            0 :                     pg_port,
     476            0 :                 };
     477            0 :                 new_members.add(sk)?;
     478              :             }
     479            0 :             Ok(membership::Configuration {
     480            0 :                 generation,
     481            0 :                 members,
     482            0 :                 new_members: Some(new_members),
     483            0 :             })
     484              :         }
     485        19317 :     }
     486              : 
     487              :     /// Parse proposer message.
     488        25311 :     pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
     489        25311 :         if proto_version == SK_PROTO_VERSION_3 {
     490        25311 :             if msg_bytes.is_empty() {
     491            0 :                 bail!("ProposerAcceptorMessage is not complete: missing tag");
     492        25311 :             }
     493        25311 :             let tag = msg_bytes.get_u8_f().with_context(|| {
     494            0 :                 "ProposerAcceptorMessage is not complete: missing tag".to_string()
     495        25311 :             })? as char;
     496        25311 :             match tag {
     497              :                 'g' => {
     498        19317 :                     let tenant_id_str =
     499        19317 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
     500        19317 :                     let tenant_id = TenantId::from_str(&tenant_id_str)?;
     501        19317 :                     let timeline_id_str =
     502        19317 :                         Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
     503        19317 :                     let timeline_id = TimelineId::from_str(&timeline_id_str)?;
     504        19317 :                     let mconf = Self::get_mconf(&mut msg_bytes)?;
     505        19317 :                     let pg_version = msg_bytes
     506        19317 :                         .get_u32_f()
     507        19317 :                         .with_context(|| "reading pg_version")?;
     508        19317 :                     let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
     509        19317 :                     let wal_seg_size = msg_bytes
     510        19317 :                         .get_u32_f()
     511        19317 :                         .with_context(|| "reading wal_seg_size")?;
     512        19317 :                     let g = ProposerGreeting {
     513        19317 :                         tenant_id,
     514        19317 :                         timeline_id,
     515        19317 :                         mconf,
     516        19317 :                         pg_version,
     517        19317 :                         system_id,
     518        19317 :                         wal_seg_size,
     519        19317 :                     };
     520        19317 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     521              :                 }
     522              :                 'v' => {
     523         2632 :                     let generation = Generation::new(
     524         2632 :                         msg_bytes
     525         2632 :                             .get_u32_f()
     526         2632 :                             .with_context(|| "reading generation")?,
     527              :                     );
     528         2632 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     529         2632 :                     let v = VoteRequest { generation, term };
     530         2632 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     531              :                 }
     532              :                 'e' => {
     533          613 :                     let generation = Generation::new(
     534          613 :                         msg_bytes
     535          613 :                             .get_u32_f()
     536          613 :                             .with_context(|| "reading generation")?,
     537              :                     );
     538          613 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     539          613 :                     let start_streaming_at: Lsn = msg_bytes
     540          613 :                         .get_u64_f()
     541          613 :                         .with_context(|| "reading start_streaming_at")?
     542          613 :                         .into();
     543          613 :                     let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     544          613 :                     let msg = ProposerElected {
     545          613 :                         generation,
     546          613 :                         term,
     547          613 :                         start_streaming_at,
     548          613 :                         term_history,
     549          613 :                     };
     550          613 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     551              :                 }
     552              :                 'a' => {
     553         2749 :                     let generation = Generation::new(
     554         2749 :                         msg_bytes
     555         2749 :                             .get_u32_f()
     556         2749 :                             .with_context(|| "reading generation")?,
     557              :                     );
     558         2749 :                     let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
     559         2749 :                     let begin_lsn: Lsn = msg_bytes
     560         2749 :                         .get_u64_f()
     561         2749 :                         .with_context(|| "reading begin_lsn")?
     562         2749 :                         .into();
     563         2749 :                     let end_lsn: Lsn = msg_bytes
     564         2749 :                         .get_u64_f()
     565         2749 :                         .with_context(|| "reading end_lsn")?
     566         2749 :                         .into();
     567         2749 :                     let commit_lsn: Lsn = msg_bytes
     568         2749 :                         .get_u64_f()
     569         2749 :                         .with_context(|| "reading commit_lsn")?
     570         2749 :                         .into();
     571         2749 :                     let truncate_lsn: Lsn = msg_bytes
     572         2749 :                         .get_u64_f()
     573         2749 :                         .with_context(|| "reading truncate_lsn")?
     574         2749 :                         .into();
     575         2749 :                     let hdr = AppendRequestHeader {
     576         2749 :                         generation,
     577         2749 :                         term,
     578         2749 :                         begin_lsn,
     579         2749 :                         end_lsn,
     580         2749 :                         commit_lsn,
     581         2749 :                         truncate_lsn,
     582         2749 :                     };
     583         2749 :                     let rec_size = hdr
     584         2749 :                         .end_lsn
     585         2749 :                         .checked_sub(hdr.begin_lsn)
     586         2749 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     587              :                         .0 as usize;
     588         2749 :                     if rec_size > MAX_SEND_SIZE {
     589            0 :                         bail!(
     590            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     591            0 :                             MAX_SEND_SIZE
     592            0 :                         );
     593         2749 :                     }
     594         2749 :                     if msg_bytes.remaining() < rec_size {
     595            0 :                         bail!(
     596            0 :                             "reading WAL: only {} bytes left, wanted {}",
     597            0 :                             msg_bytes.remaining(),
     598            0 :                             rec_size
     599            0 :                         );
     600         2749 :                     }
     601         2749 :                     let wal_data = msg_bytes.copy_to_bytes(rec_size);
     602         2749 :                     let msg = AppendRequest { h: hdr, wal_data };
     603         2749 : 
     604         2749 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     605              :                 }
     606            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     607              :             }
     608            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     609              :             // xxx using Reader is inefficient but easy to work with bincode
     610            0 :             let mut stream = msg_bytes.reader();
     611              :             // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     612            0 :             let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     613            0 :             match tag {
     614              :                 'g' => {
     615            0 :                     let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
     616            0 :                     let g = ProposerGreeting {
     617            0 :                         tenant_id: msgv2.tenant_id,
     618            0 :                         timeline_id: msgv2.timeline_id,
     619            0 :                         mconf: membership::Configuration {
     620            0 :                             generation: INVALID_GENERATION,
     621            0 :                             members: MemberSet::empty(),
     622            0 :                             new_members: None,
     623            0 :                         },
     624            0 :                         pg_version: msgv2.pg_version,
     625            0 :                         system_id: msgv2.system_id,
     626            0 :                         wal_seg_size: msgv2.wal_seg_size,
     627            0 :                     };
     628            0 :                     Ok(ProposerAcceptorMessage::Greeting(g))
     629              :                 }
     630              :                 'v' => {
     631            0 :                     let msg = VoteRequestV2::des_from(&mut stream)?;
     632            0 :                     let v = VoteRequest {
     633            0 :                         generation: INVALID_GENERATION,
     634            0 :                         term: msg.term,
     635            0 :                     };
     636            0 :                     Ok(ProposerAcceptorMessage::VoteRequest(v))
     637              :                 }
     638              :                 'e' => {
     639            0 :                     let mut msg_bytes = stream.into_inner();
     640            0 :                     if msg_bytes.remaining() < 16 {
     641            0 :                         bail!("ProposerElected message is not complete");
     642            0 :                     }
     643            0 :                     let term = msg_bytes.get_u64_le();
     644            0 :                     let start_streaming_at = msg_bytes.get_u64_le().into();
     645            0 :                     let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
     646            0 :                     if msg_bytes.remaining() < 8 {
     647            0 :                         bail!("ProposerElected message is not complete");
     648            0 :                     }
     649            0 :                     let _timeline_start_lsn = msg_bytes.get_u64_le();
     650            0 :                     let msg = ProposerElected {
     651            0 :                         generation: INVALID_GENERATION,
     652            0 :                         term,
     653            0 :                         start_streaming_at,
     654            0 :                         term_history,
     655            0 :                     };
     656            0 :                     Ok(ProposerAcceptorMessage::Elected(msg))
     657              :                 }
     658              :                 'a' => {
     659              :                     // read header followed by wal data
     660            0 :                     let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
     661            0 :                     let hdr = AppendRequestHeader {
     662            0 :                         generation: INVALID_GENERATION,
     663            0 :                         term: hdrv2.term,
     664            0 :                         begin_lsn: hdrv2.begin_lsn,
     665            0 :                         end_lsn: hdrv2.end_lsn,
     666            0 :                         commit_lsn: hdrv2.commit_lsn,
     667            0 :                         truncate_lsn: hdrv2.truncate_lsn,
     668            0 :                     };
     669            0 :                     let rec_size = hdr
     670            0 :                         .end_lsn
     671            0 :                         .checked_sub(hdr.begin_lsn)
     672            0 :                         .context("begin_lsn > end_lsn in AppendRequest")?
     673              :                         .0 as usize;
     674            0 :                     if rec_size > MAX_SEND_SIZE {
     675            0 :                         bail!(
     676            0 :                             "AppendRequest is longer than MAX_SEND_SIZE ({})",
     677            0 :                             MAX_SEND_SIZE
     678            0 :                         );
     679            0 :                     }
     680            0 : 
     681            0 :                     let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     682            0 :                     stream.read_exact(&mut wal_data_vec)?;
     683            0 :                     let wal_data = Bytes::from(wal_data_vec);
     684            0 : 
     685            0 :                     let msg = AppendRequest { h: hdr, wal_data };
     686            0 : 
     687            0 :                     Ok(ProposerAcceptorMessage::AppendRequest(msg))
     688              :                 }
     689            0 :                 _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     690              :             }
     691              :         } else {
     692            0 :             bail!("unsupported protocol version {}", proto_version);
     693              :         }
     694        25311 :     }
     695              : 
     696              :     /// The memory size of the message, including byte slices.
     697          620 :     pub fn size(&self) -> usize {
     698              :         const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
     699              : 
     700              :         // For most types, the size is just the base enum size including the nested structs. Some
     701              :         // types also contain byte slices; add them.
     702              :         //
     703              :         // We explicitly list all fields, to draw attention here when new fields are added.
     704          620 :         let mut size = BASE_SIZE;
     705          620 :         size += match self {
     706            0 :             Self::Greeting(_) => 0,
     707              : 
     708            0 :             Self::VoteRequest(_) => 0,
     709              : 
     710            0 :             Self::Elected(_) => 0,
     711              : 
     712              :             Self::AppendRequest(AppendRequest {
     713              :                 h:
     714              :                     AppendRequestHeader {
     715              :                         generation: _,
     716              :                         term: _,
     717              :                         begin_lsn: _,
     718              :                         end_lsn: _,
     719              :                         commit_lsn: _,
     720              :                         truncate_lsn: _,
     721              :                     },
     722          620 :                 wal_data,
     723          620 :             }) => wal_data.len(),
     724              : 
     725              :             Self::NoFlushAppendRequest(AppendRequest {
     726              :                 h:
     727              :                     AppendRequestHeader {
     728              :                         generation: _,
     729              :                         term: _,
     730              :                         begin_lsn: _,
     731              :                         end_lsn: _,
     732              :                         commit_lsn: _,
     733              :                         truncate_lsn: _,
     734              :                     },
     735            0 :                 wal_data,
     736            0 :             }) => wal_data.len(),
     737              : 
     738            0 :             Self::FlushWAL => 0,
     739              :         };
     740              : 
     741          620 :         size
     742          620 :     }
     743              : }
     744              : 
     745              : /// Acceptor -> Proposer messages
     746              : #[derive(Debug)]
     747              : pub enum AcceptorProposerMessage {
     748              :     Greeting(AcceptorGreeting),
     749              :     VoteResponse(VoteResponse),
     750              :     AppendResponse(AppendResponse),
     751              : }
     752              : 
     753              : impl AcceptorProposerMessage {
     754            0 :     fn put_cstr(buf: &mut BytesMut, s: &str) {
     755            0 :         buf.put_slice(s.as_bytes());
     756            0 :         buf.put_u8(0); // null terminator
     757            0 :     }
     758              : 
     759              :     /// Serialize membership::Configuration into buf.
     760        19317 :     fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
     761        19317 :         buf.put_u32(mconf.generation.into_inner());
     762        19317 :         buf.put_u32(mconf.members.m.len() as u32);
     763        19317 :         for sk in &mconf.members.m {
     764            0 :             buf.put_u64(sk.id.0);
     765            0 :             Self::put_cstr(buf, &sk.host);
     766            0 :             buf.put_u16(sk.pg_port);
     767            0 :         }
     768        19317 :         if let Some(ref new_members) = mconf.new_members {
     769            0 :             buf.put_u32(new_members.m.len() as u32);
     770            0 :             for sk in &new_members.m {
     771            0 :                 buf.put_u64(sk.id.0);
     772            0 :                 Self::put_cstr(buf, &sk.host);
     773            0 :                 buf.put_u16(sk.pg_port);
     774            0 :             }
     775        19317 :         } else {
     776        19317 :             buf.put_u32(0);
     777        19317 :         }
     778        19317 :     }
     779              : 
     780              :     /// Serialize acceptor -> proposer message.
     781        24279 :     pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
     782        24279 :         if proto_version == SK_PROTO_VERSION_3 {
     783        24279 :             match self {
     784        19317 :                 AcceptorProposerMessage::Greeting(msg) => {
     785        19317 :                     buf.put_u8(b'g');
     786        19317 :                     buf.put_u64(msg.node_id.0);
     787        19317 :                     Self::serialize_mconf(buf, &msg.mconf);
     788        19317 :                     buf.put_u64(msg.term)
     789              :                 }
     790         2632 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     791         2632 :                     buf.put_u8(b'v');
     792         2632 :                     buf.put_u32(msg.generation.into_inner());
     793         2632 :                     buf.put_u64(msg.term);
     794         2632 :                     buf.put_u8(msg.vote_given as u8);
     795         2632 :                     buf.put_u64(msg.flush_lsn.into());
     796         2632 :                     buf.put_u64(msg.truncate_lsn.into());
     797         2632 :                     buf.put_u32(msg.term_history.0.len() as u32);
     798        10141 :                     for e in &msg.term_history.0 {
     799         7509 :                         buf.put_u64(e.term);
     800         7509 :                         buf.put_u64(e.lsn.into());
     801         7509 :                     }
     802              :                 }
     803         2330 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     804         2330 :                     buf.put_u8(b'a');
     805         2330 :                     buf.put_u32(msg.generation.into_inner());
     806         2330 :                     buf.put_u64(msg.term);
     807         2330 :                     buf.put_u64(msg.flush_lsn.into());
     808         2330 :                     buf.put_u64(msg.commit_lsn.into());
     809         2330 :                     buf.put_i64(msg.hs_feedback.ts);
     810         2330 :                     buf.put_u64(msg.hs_feedback.xmin);
     811         2330 :                     buf.put_u64(msg.hs_feedback.catalog_xmin);
     812              : 
     813              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     814              :                     // if it is not present.
     815         2330 :                     if let Some(ref msg) = msg.pageserver_feedback {
     816            0 :                         msg.serialize(buf);
     817         2330 :                     }
     818              :                 }
     819              :             }
     820        24279 :             Ok(())
     821              :         // TODO remove 3 after converting all msgs
     822            0 :         } else if proto_version == SK_PROTO_VERSION_2 {
     823            0 :             match self {
     824            0 :                 AcceptorProposerMessage::Greeting(msg) => {
     825            0 :                     buf.put_u64_le('g' as u64);
     826            0 :                     // v2 didn't have mconf and fields were reordered
     827            0 :                     buf.put_u64_le(msg.term);
     828            0 :                     buf.put_u64_le(msg.node_id.0);
     829            0 :                 }
     830            0 :                 AcceptorProposerMessage::VoteResponse(msg) => {
     831            0 :                     // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
     832            0 :                     buf.put_u64_le('v' as u64);
     833            0 :                     buf.put_u64_le(msg.term);
     834            0 :                     buf.put_u64_le(msg.vote_given as u64);
     835            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     836            0 :                     buf.put_u64_le(msg.truncate_lsn.into());
     837            0 :                     buf.put_u32_le(msg.term_history.0.len() as u32);
     838            0 :                     for e in &msg.term_history.0 {
     839            0 :                         buf.put_u64_le(e.term);
     840            0 :                         buf.put_u64_le(e.lsn.into());
     841            0 :                     }
     842              :                     // removed timeline_start_lsn
     843            0 :                     buf.put_u64_le(0);
     844              :                 }
     845            0 :                 AcceptorProposerMessage::AppendResponse(msg) => {
     846            0 :                     // v2 didn't have generation
     847            0 :                     buf.put_u64_le('a' as u64);
     848            0 :                     buf.put_u64_le(msg.term);
     849            0 :                     buf.put_u64_le(msg.flush_lsn.into());
     850            0 :                     buf.put_u64_le(msg.commit_lsn.into());
     851            0 :                     buf.put_i64_le(msg.hs_feedback.ts);
     852            0 :                     buf.put_u64_le(msg.hs_feedback.xmin);
     853            0 :                     buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     854              : 
     855              :                     // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
     856              :                     // if it is not present.
     857            0 :                     if let Some(ref msg) = msg.pageserver_feedback {
     858            0 :                         msg.serialize(buf);
     859            0 :                     }
     860              :                 }
     861              :             }
     862            0 :             Ok(())
     863              :         } else {
     864            0 :             bail!("unsupported protocol version {}", proto_version);
     865              :         }
     866        24279 :     }
     867              : }
     868              : 
     869              : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     870              : /// It controls all WAL disk writes and updates of control file.
     871              : ///
     872              : /// Currently safekeeper processes:
     873              : /// - messages from compute (proposers) and provides replies
     874              : /// - messages from broker peers
     875              : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     876              :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     877              :     /// determines last_log_term switch point.
     878              :     pub term_start_lsn: Lsn,
     879              : 
     880              :     pub state: TimelineState<CTRL>, // persistent state storage
     881              :     pub wal_store: WAL,
     882              : 
     883              :     node_id: NodeId, // safekeeper's node id
     884              : }
     885              : 
     886              : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     887              : where
     888              :     CTRL: control_file::Storage,
     889              :     WAL: wal_storage::Storage,
     890              : {
     891              :     /// Accepts a control file storage containing the safekeeper state.
     892              :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     893              :     /// and `server` (`wal_seg_size` inside it) fields.
     894         8602 :     pub fn new(
     895         8602 :         state: TimelineState<CTRL>,
     896         8602 :         wal_store: WAL,
     897         8602 :         node_id: NodeId,
     898         8602 :     ) -> Result<SafeKeeper<CTRL, WAL>> {
     899         8602 :         if state.tenant_id == TenantId::from([0u8; 16])
     900         8602 :             || state.timeline_id == TimelineId::from([0u8; 16])
     901              :         {
     902            0 :             bail!(
     903            0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     904            0 :                 state.tenant_id,
     905            0 :                 state.timeline_id
     906            0 :             );
     907         8602 :         }
     908         8602 : 
     909         8602 :         Ok(SafeKeeper {
     910         8602 :             term_start_lsn: Lsn(0),
     911         8602 :             state,
     912         8602 :             wal_store,
     913         8602 :             node_id,
     914         8602 :         })
     915            9 :     }
     916              : 
     917              :     /// Get history of term switches for the available WAL
     918         3254 :     fn get_term_history(&self) -> TermHistory {
     919         3254 :         self.state
     920         3254 :             .acceptor_state
     921         3254 :             .term_history
     922         3254 :             .up_to(self.flush_lsn())
     923         3254 :     }
     924              : 
     925           43 :     pub fn get_last_log_term(&self) -> Term {
     926           43 :         self.state
     927           43 :             .acceptor_state
     928           43 :             .get_last_log_term(self.flush_lsn())
     929           43 :     }
     930              : 
     931              :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     932        15282 :     pub fn flush_lsn(&self) -> Lsn {
     933        15282 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     934        15282 :     }
     935              : 
     936              :     /// Process message from proposer and possibly form reply. Concurrent
     937              :     /// callers must exclude each other.
     938        28897 :     pub async fn process_msg(
     939        28897 :         &mut self,
     940        28897 :         msg: &ProposerAcceptorMessage,
     941        28897 :     ) -> Result<Option<AcceptorProposerMessage>> {
     942        28897 :         match msg {
     943        19317 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     944         2635 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     945          621 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     946            5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     947            5 :                 self.handle_append_request(msg, true).await
     948              :             }
     949         3369 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     950         3369 :                 self.handle_append_request(msg, false).await
     951              :             }
     952         2950 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     953              :         }
     954         1256 :     }
     955              : 
     956              :     /// Handle initial message from proposer: check its sanity and send my
     957              :     /// current term.
     958        19317 :     async fn handle_greeting(
     959        19317 :         &mut self,
     960        19317 :         msg: &ProposerGreeting,
     961        19317 :     ) -> Result<Option<AcceptorProposerMessage>> {
     962        19317 :         /* Postgres major version mismatch is treated as fatal error
     963        19317 :          * because safekeepers parse WAL headers and the format
     964        19317 :          * may change between versions.
     965        19317 :          */
     966        19317 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     967            0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     968              :         {
     969            0 :             bail!(
     970            0 :                 "incompatible server version {}, expected {}",
     971            0 :                 msg.pg_version,
     972            0 :                 self.state.server.pg_version
     973            0 :             );
     974        19317 :         }
     975        19317 : 
     976        19317 :         if msg.tenant_id != self.state.tenant_id {
     977            0 :             bail!(
     978            0 :                 "invalid tenant ID, got {}, expected {}",
     979            0 :                 msg.tenant_id,
     980            0 :                 self.state.tenant_id
     981            0 :             );
     982        19317 :         }
     983        19317 :         if msg.timeline_id != self.state.timeline_id {
     984            0 :             bail!(
     985            0 :                 "invalid timeline ID, got {}, expected {}",
     986            0 :                 msg.timeline_id,
     987            0 :                 self.state.timeline_id
     988            0 :             );
     989        19317 :         }
     990        19317 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     991            0 :             bail!(
     992            0 :                 "invalid wal_seg_size, got {}, expected {}",
     993            0 :                 msg.wal_seg_size,
     994            0 :                 self.state.server.wal_seg_size
     995            0 :             );
     996        19317 :         }
     997        19317 : 
     998        19317 :         // system_id will be updated on mismatch
     999        19317 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
    1000        19317 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
    1001            0 :             if self.state.server.system_id != 0 {
    1002            0 :                 warn!(
    1003            0 :                     "unexpected system ID arrived, got {}, expected {}",
    1004            0 :                     msg.system_id, self.state.server.system_id
    1005              :                 );
    1006            0 :             }
    1007              : 
    1008            0 :             let mut state = self.state.start_change();
    1009            0 :             state.server.system_id = msg.system_id;
    1010            0 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
    1011            0 :                 state.server.pg_version = msg.pg_version;
    1012            0 :             }
    1013            0 :             self.state.finish_change(&state).await?;
    1014            0 :         }
    1015              : 
    1016              :         // Switch into conf given by proposer conf if it is higher.
    1017        19317 :         self.state.membership_switch(msg.mconf.clone()).await?;
    1018              : 
    1019        19317 :         let apg = AcceptorGreeting {
    1020        19317 :             node_id: self.node_id,
    1021        19317 :             mconf: self.state.mconf.clone(),
    1022        19317 :             term: self.state.acceptor_state.term,
    1023        19317 :         };
    1024        19317 :         info!(
    1025            0 :             "processed greeting {:?} from walproposer, sending {:?}",
    1026              :             msg, apg
    1027              :         );
    1028        19317 :         Ok(Some(AcceptorProposerMessage::Greeting(apg)))
    1029            0 :     }
    1030              : 
    1031              :     /// Give vote for the given term, if we haven't done that previously.
    1032         2635 :     async fn handle_vote_request(
    1033         2635 :         &mut self,
    1034         2635 :         msg: &VoteRequest,
    1035         2635 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1036         2635 :         if self.state.mconf.generation != msg.generation {
    1037            1 :             bail!(
    1038            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1039            1 :                 msg,
    1040            1 :                 self.state.mconf.generation
    1041            1 :             );
    1042         2634 :         }
    1043         2634 :         // Once voted, we won't accept data from older proposers; flush
    1044         2634 :         // everything we've already received so that new proposer starts
    1045         2634 :         // streaming at end of our WAL, without overlap. WAL is truncated at
    1046         2634 :         // streaming point and commit_lsn may be advanced from peers, so this
    1047         2634 :         // also avoids possible spurious attempt to truncate committed WAL.
    1048         2634 :         self.wal_store.flush_wal().await?;
    1049              :         // initialize with refusal
    1050         2634 :         let mut resp = VoteResponse {
    1051         2634 :             generation: self.state.mconf.generation,
    1052         2634 :             term: self.state.acceptor_state.term,
    1053         2634 :             vote_given: false,
    1054         2634 :             flush_lsn: self.flush_lsn(),
    1055         2634 :             truncate_lsn: self.state.inmem.peer_horizon_lsn,
    1056         2634 :             term_history: self.get_term_history(),
    1057         2634 :         };
    1058         2634 :         if self.state.acceptor_state.term < msg.term {
    1059         2500 :             let mut state = self.state.start_change();
    1060         2500 :             state.acceptor_state.term = msg.term;
    1061         2500 :             // persist vote before sending it out
    1062         2500 :             self.state.finish_change(&state).await?;
    1063              : 
    1064         2500 :             resp.term = self.state.acceptor_state.term;
    1065         2500 :             resp.vote_given = true;
    1066            1 :         }
    1067         2634 :         info!("processed {:?}: sending {:?}", msg, &resp);
    1068         2634 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
    1069            3 :     }
    1070              : 
    1071              :     /// Form AppendResponse from current state.
    1072         2953 :     fn append_response(&self) -> AppendResponse {
    1073         2953 :         let ar = AppendResponse {
    1074         2953 :             generation: self.state.mconf.generation,
    1075         2953 :             term: self.state.acceptor_state.term,
    1076         2953 :             flush_lsn: self.flush_lsn(),
    1077         2953 :             commit_lsn: self.state.commit_lsn,
    1078         2953 :             // will be filled by the upper code to avoid bothering safekeeper
    1079         2953 :             hs_feedback: HotStandbyFeedback::empty(),
    1080         2953 :             pageserver_feedback: None,
    1081         2953 :         };
    1082         2953 :         trace!("formed AppendResponse {:?}", ar);
    1083         2953 :         ar
    1084         2953 :     }
    1085              : 
    1086          621 :     async fn handle_elected(
    1087          621 :         &mut self,
    1088          621 :         msg: &ProposerElected,
    1089          621 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1090          621 :         let _timer = MISC_OPERATION_SECONDS
    1091          621 :             .with_label_values(&["handle_elected"])
    1092          621 :             .start_timer();
    1093          621 : 
    1094          621 :         info!(
    1095            0 :             "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
    1096            0 :             msg,
    1097            0 :             self.state.acceptor_state.term,
    1098            0 :             self.get_last_log_term(),
    1099            0 :             self.flush_lsn()
    1100              :         );
    1101          621 :         if self.state.mconf.generation != msg.generation {
    1102            1 :             bail!(
    1103            1 :                 "refusing {:?} due to generation mismatch: sk generation {}",
    1104            1 :                 msg,
    1105            1 :                 self.state.mconf.generation
    1106            1 :             );
    1107          620 :         }
    1108          620 :         if self.state.acceptor_state.term < msg.term {
    1109            7 :             let mut state = self.state.start_change();
    1110            7 :             state.acceptor_state.term = msg.term;
    1111            7 :             self.state.finish_change(&state).await?;
    1112            0 :         }
    1113              : 
    1114              :         // If our term is higher, ignore the message (next feedback will inform the compute)
    1115          620 :         if self.state.acceptor_state.term > msg.term {
    1116            0 :             return Ok(None);
    1117          620 :         }
    1118          620 : 
    1119          620 :         // Before truncating WAL check-cross the check divergence point received
    1120          620 :         // from the walproposer.
    1121          620 :         let sk_th = self.get_term_history();
    1122          620 :         let last_common_point = match TermHistory::find_highest_common_point(
    1123          620 :             &msg.term_history,
    1124          620 :             &sk_th,
    1125          620 :             self.flush_lsn(),
    1126          620 :         ) {
    1127              :             // No common point. Expect streaming from the beginning of the
    1128              :             // history like walproposer while we don't have proper init.
    1129          135 :             None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
    1130          135 :                 "empty walproposer term history {:?}",
    1131          135 :                 msg.term_history
    1132          135 :             ))?,
    1133          485 :             Some(lcp) => lcp,
    1134              :         };
    1135              :         // This is expected to happen in a rare race when another connection
    1136              :         // from the same walproposer writes + flushes WAL after this connection
    1137              :         // sent flush_lsn in VoteRequest; for instance, very late
    1138              :         // ProposerElected message delivery after another connection was
    1139              :         // established and wrote WAL. In such cases error is transient;
    1140              :         // reconnection makes safekeeper send newest term history and flush_lsn
    1141              :         // and walproposer recalculates the streaming point. OTOH repeating
    1142              :         // error indicates a serious bug.
    1143          620 :         if last_common_point.lsn != msg.start_streaming_at {
    1144            0 :             bail!(
    1145            0 :                 "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1146            0 :                 last_common_point,
    1147            0 :                 msg.start_streaming_at,
    1148            0 :                 self.state.acceptor_state.term,
    1149            0 :                 sk_th,
    1150            0 :                 self.flush_lsn(),
    1151            0 :                 msg.term_history,
    1152            0 :             );
    1153          620 :         }
    1154          620 : 
    1155          620 :         // We are also expected to never attempt to truncate committed data.
    1156          620 :         assert!(
    1157          620 :             msg.start_streaming_at >= self.state.inmem.commit_lsn,
    1158            0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
    1159            0 :             msg.start_streaming_at,
    1160            0 :             self.state.inmem.commit_lsn,
    1161            0 :             self.state.acceptor_state.term,
    1162            0 :             sk_th,
    1163            0 :             self.flush_lsn(),
    1164              :             msg.term_history,
    1165              :         );
    1166              : 
    1167              :         // Before first WAL write initialize its segment. It makes first segment
    1168              :         // pg_waldump'able because stream from compute doesn't include its
    1169              :         // segment and page headers.
    1170              :         //
    1171              :         // If we fail before first WAL write flush this action would be
    1172              :         // repeated, that's ok because it is idempotent.
    1173          620 :         if self.wal_store.flush_lsn() == Lsn::INVALID {
    1174          135 :             self.wal_store
    1175          135 :                 .initialize_first_segment(msg.start_streaming_at)
    1176          135 :                 .await?;
    1177            0 :         }
    1178              : 
    1179              :         // truncate wal, update the LSNs
    1180          620 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
    1181              : 
    1182              :         // and now adopt term history from proposer
    1183              :         {
    1184          620 :             let mut state = self.state.start_change();
    1185              : 
    1186              :             // Here we learn initial LSN for the first time, set fields
    1187              :             // interested in that.
    1188              : 
    1189          620 :             if let Some(start_lsn) = msg.term_history.0.first() {
    1190          620 :                 if state.timeline_start_lsn == Lsn(0) {
    1191              :                     // Remember point where WAL begins globally. In the future it
    1192              :                     // will be intialized immediately on timeline creation.
    1193          135 :                     state.timeline_start_lsn = start_lsn.lsn;
    1194          135 :                     info!(
    1195            0 :                         "setting timeline_start_lsn to {:?}",
    1196              :                         state.timeline_start_lsn
    1197              :                     );
    1198            0 :                 }
    1199            0 :             }
    1200              : 
    1201          620 :             if state.peer_horizon_lsn == Lsn(0) {
    1202          135 :                 // Update peer_horizon_lsn as soon as we know where timeline starts.
    1203          135 :                 // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
    1204          135 :                 state.peer_horizon_lsn = state.timeline_start_lsn;
    1205          135 :             }
    1206          620 :             if state.local_start_lsn == Lsn(0) {
    1207          135 :                 state.local_start_lsn = msg.start_streaming_at;
    1208          135 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
    1209            0 :             }
    1210              :             // Initializing commit_lsn before acking first flushed record is
    1211              :             // important to let find_end_of_wal skip the hole in the beginning
    1212              :             // of the first segment.
    1213              :             //
    1214              :             // NB: on new clusters, this happens at the same time as
    1215              :             // timeline_start_lsn initialization, it is taken outside to provide
    1216              :             // upgrade.
    1217          620 :             state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
    1218          620 : 
    1219          620 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
    1220          620 :             state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
    1221          620 :             // similar for remote_consistent_lsn
    1222          620 :             state.remote_consistent_lsn =
    1223          620 :                 max(state.remote_consistent_lsn, state.timeline_start_lsn);
    1224          620 : 
    1225          620 :             state.acceptor_state.term_history = msg.term_history.clone();
    1226          620 :             self.state.finish_change(&state).await?;
    1227              :         }
    1228              : 
    1229          620 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
    1230              : 
    1231              :         // Cache LSN where term starts to immediately fsync control file with
    1232              :         // commit_lsn once we reach it -- sync-safekeepers finishes when
    1233              :         // persisted commit_lsn on majority of safekeepers aligns.
    1234          620 :         self.term_start_lsn = match msg.term_history.0.last() {
    1235            0 :             None => bail!("proposer elected with empty term history"),
    1236          620 :             Some(term_lsn_start) => term_lsn_start.lsn,
    1237          620 :         };
    1238          620 : 
    1239          620 :         Ok(None)
    1240            8 :     }
    1241              : 
    1242              :     /// Advance commit_lsn taking into account what we have locally.
    1243              :     ///
    1244              :     /// Note: it is assumed that 'WAL we have is from the right term' check has
    1245              :     /// already been done outside.
    1246         2368 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
    1247         2368 :         // Both peers and walproposer communicate this value, we might already
    1248         2368 :         // have a fresher (higher) version.
    1249         2368 :         candidate = max(candidate, self.state.inmem.commit_lsn);
    1250         2368 :         let commit_lsn = min(candidate, self.flush_lsn());
    1251         2368 :         assert!(
    1252         2368 :             commit_lsn >= self.state.inmem.commit_lsn,
    1253            0 :             "commit_lsn monotonicity violated: old={} new={}",
    1254              :             self.state.inmem.commit_lsn,
    1255              :             commit_lsn
    1256              :         );
    1257              : 
    1258         2368 :         self.state.inmem.commit_lsn = commit_lsn;
    1259         2368 : 
    1260         2368 :         // If new commit_lsn reached term switch, force sync of control
    1261         2368 :         // file: walproposer in sync mode is very interested when this
    1262         2368 :         // happens. Note: this is for sync-safekeepers mode only, as
    1263         2368 :         // otherwise commit_lsn might jump over term_start_lsn.
    1264         2368 :         if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
    1265           93 :             self.state.flush().await?;
    1266          620 :         }
    1267              : 
    1268         2368 :         Ok(())
    1269          620 :     }
    1270              : 
    1271              :     /// Handle request to append WAL.
    1272              :     #[allow(clippy::comparison_chain)]
    1273         3374 :     async fn handle_append_request(
    1274         3374 :         &mut self,
    1275         3374 :         msg: &AppendRequest,
    1276         3374 :         require_flush: bool,
    1277         3374 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1278         3374 :         // Refuse message on generation mismatch. On reconnect wp will get full
    1279         3374 :         // configuration from greeting.
    1280         3374 :         if self.state.mconf.generation != msg.h.generation {
    1281            1 :             bail!(
    1282            1 :                 "refusing append request due to generation mismatch: request {}, sk {}",
    1283            1 :                 msg.h.generation,
    1284            1 :                 self.state.mconf.generation
    1285            1 :             );
    1286         3373 :         }
    1287         3373 : 
    1288         3373 :         if self.state.acceptor_state.term < msg.h.term {
    1289            0 :             bail!("got AppendRequest before ProposerElected");
    1290         3373 :         }
    1291         3373 : 
    1292         3373 :         // If our term is higher, immediately refuse the message. Send term only
    1293         3373 :         // response; elected walproposer can never advance the term, so it will
    1294         3373 :         // figure out the refusal from it -- which is important as term change
    1295         3373 :         // should cause not just reconnection but whole walproposer re-election.
    1296         3373 :         if self.state.acceptor_state.term > msg.h.term {
    1297            0 :             let resp = AppendResponse::term_only(
    1298            0 :                 self.state.mconf.generation,
    1299            0 :                 self.state.acceptor_state.term,
    1300            0 :             );
    1301            0 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
    1302         3373 :         }
    1303         3373 : 
    1304         3373 :         // Disallow any non-sequential writes, which can result in gaps or
    1305         3373 :         // overwrites. If we need to move the pointer, ProposerElected message
    1306         3373 :         // should have truncated WAL first accordingly. Note that the first
    1307         3373 :         // condition (WAL rewrite) is quite expected in real world; it happens
    1308         3373 :         // when walproposer reconnects to safekeeper and writes some more data
    1309         3373 :         // while first connection still gets some packets later. It might be
    1310         3373 :         // better to not log this as error! above.
    1311         3373 :         let write_lsn = self.wal_store.write_lsn();
    1312         3373 :         let flush_lsn = self.wal_store.flush_lsn();
    1313         3373 :         if write_lsn > msg.h.begin_lsn {
    1314            1 :             bail!(
    1315            1 :                 "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
    1316            1 :                 write_lsn,
    1317            1 :                 msg.h.begin_lsn
    1318            1 :             );
    1319         3372 :         }
    1320         3372 :         if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
    1321            0 :             bail!(
    1322            0 :                 "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
    1323            0 :                 write_lsn,
    1324            0 :                 msg.h.begin_lsn,
    1325            0 :             );
    1326         3372 :         }
    1327         3372 : 
    1328         3372 :         // Now we know that we are in the same term as the proposer, process the
    1329         3372 :         // message.
    1330         3372 : 
    1331         3372 :         // do the job
    1332         3372 :         if !msg.wal_data.is_empty() {
    1333         1367 :             self.wal_store
    1334         1367 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
    1335         1367 :                 .await?;
    1336            0 :         }
    1337              : 
    1338              :         // flush wal to the disk, if required
    1339         3372 :         if require_flush {
    1340            3 :             self.wal_store.flush_wal().await?;
    1341          620 :         }
    1342              : 
    1343              :         // Update commit_lsn. It will be flushed to the control file regularly by the timeline
    1344              :         // manager, off of the WAL ingest hot path.
    1345         3372 :         if msg.h.commit_lsn != Lsn(0) {
    1346         2368 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
    1347            3 :         }
    1348              :         // Value calculated by walproposer can always lag:
    1349              :         // - safekeepers can forget inmem value and send to proposer lower
    1350              :         //   persisted one on restart;
    1351              :         // - if we make safekeepers always send persistent value,
    1352              :         //   any compute restart would pull it down.
    1353              :         // Thus, take max before adopting.
    1354         3372 :         self.state.inmem.peer_horizon_lsn =
    1355         3372 :             max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
    1356         3372 : 
    1357         3372 :         trace!(
    1358            0 :             "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
    1359            0 :             msg.wal_data.len(),
    1360              :             msg.h.begin_lsn,
    1361              :             msg.h.end_lsn,
    1362              :             msg.h.commit_lsn,
    1363              :             msg.h.truncate_lsn,
    1364              :             require_flush,
    1365              :         );
    1366              : 
    1367              :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
    1368              :         // This is the common case for !require_flush, but a flush can still
    1369              :         // happen on segment bounds.
    1370         3372 :         if !require_flush && flush_lsn == self.flush_lsn() {
    1371         3369 :             return Ok(None);
    1372            3 :         }
    1373            3 : 
    1374            3 :         let resp = self.append_response();
    1375            3 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
    1376          625 :     }
    1377              : 
    1378              :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
    1379         2950 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
    1380         2950 :         self.wal_store.flush_wal().await?;
    1381         2950 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
    1382         2950 :             self.append_response(),
    1383         2950 :         )))
    1384          620 :     }
    1385              : 
    1386              :     /// Update commit_lsn from peer safekeeper data.
    1387            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
    1388            0 :         if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
    1389              :             // Note: the check is too restrictive, generally we can update local
    1390              :             // commit_lsn if our history matches (is part of) history of advanced
    1391              :             // commit_lsn provider.
    1392            0 :             if sk_info.last_log_term == self.get_last_log_term() {
    1393            0 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
    1394            0 :             }
    1395            0 :         }
    1396            0 :         Ok(())
    1397            0 :     }
    1398              : }
    1399              : 
    1400              : #[cfg(test)]
    1401              : mod tests {
    1402              :     use std::ops::Deref;
    1403              :     use std::str::FromStr;
    1404              :     use std::time::{Instant, UNIX_EPOCH};
    1405              : 
    1406              :     use futures::future::BoxFuture;
    1407              :     use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
    1408              :     use safekeeper_api::ServerInfo;
    1409              :     use safekeeper_api::membership::{
    1410              :         Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
    1411              :     };
    1412              : 
    1413              :     use super::*;
    1414              :     use crate::state::{EvictionState, TimelinePersistentState};
    1415              : 
    1416              :     // fake storage for tests
    1417              :     struct InMemoryState {
    1418              :         persisted_state: TimelinePersistentState,
    1419              :     }
    1420              : 
    1421              :     impl control_file::Storage for InMemoryState {
    1422            5 :         async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
    1423            5 :             self.persisted_state = s.clone();
    1424            5 :             Ok(())
    1425            5 :         }
    1426              : 
    1427            0 :         fn last_persist_at(&self) -> Instant {
    1428            0 :             Instant::now()
    1429            0 :         }
    1430              :     }
    1431              : 
    1432              :     impl Deref for InMemoryState {
    1433              :         type Target = TimelinePersistentState;
    1434              : 
    1435          100 :         fn deref(&self) -> &Self::Target {
    1436          100 :             &self.persisted_state
    1437          100 :         }
    1438              :     }
    1439              : 
    1440            3 :     fn test_sk_state() -> TimelinePersistentState {
    1441            3 :         let mut state = TimelinePersistentState::empty();
    1442            3 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1443            3 :         state.tenant_id = TenantId::from([1u8; 16]);
    1444            3 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1445            3 :         state
    1446            3 :     }
    1447              : 
    1448              :     struct DummyWalStore {
    1449              :         lsn: Lsn,
    1450              :     }
    1451              : 
    1452              :     impl wal_storage::Storage for DummyWalStore {
    1453            4 :         fn write_lsn(&self) -> Lsn {
    1454            4 :             self.lsn
    1455            4 :         }
    1456              : 
    1457           19 :         fn flush_lsn(&self) -> Lsn {
    1458           19 :             self.lsn
    1459           19 :         }
    1460              : 
    1461            2 :         async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
    1462            2 :             Ok(())
    1463            2 :         }
    1464              : 
    1465            3 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1466            3 :             self.lsn = startpos + buf.len() as u64;
    1467            3 :             Ok(())
    1468            3 :         }
    1469              : 
    1470            2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1471            2 :             self.lsn = end_pos;
    1472            2 :             Ok(())
    1473            2 :         }
    1474              : 
    1475            5 :         async fn flush_wal(&mut self) -> Result<()> {
    1476            5 :             Ok(())
    1477            5 :         }
    1478              : 
    1479            0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1480            0 :             Box::pin(async { Ok(()) })
    1481            0 :         }
    1482              : 
    1483            0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1484            0 :             crate::metrics::WalStorageMetrics::default()
    1485            0 :         }
    1486              :     }
    1487              : 
    1488              :     #[tokio::test]
    1489            1 :     async fn test_voting() {
    1490            1 :         let storage = InMemoryState {
    1491            1 :             persisted_state: test_sk_state(),
    1492            1 :         };
    1493            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1494            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1495            1 : 
    1496            1 :         // Vote with generation mismatch should be rejected.
    1497            1 :         let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1498            1 :             generation: SafekeeperGeneration::new(42),
    1499            1 :             term: 1,
    1500            1 :         });
    1501            1 :         assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
    1502            1 : 
    1503            1 :         // check voting for 1 is ok
    1504            1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
    1505            1 :             generation: Generation::new(0),
    1506            1 :             term: 1,
    1507            1 :         });
    1508            1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1509            1 :         match vote_resp.unwrap() {
    1510            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
    1511            1 :             r => panic!("unexpected response: {:?}", r),
    1512            1 :         }
    1513            1 : 
    1514            1 :         // reboot...
    1515            1 :         let state = sk.state.deref().clone();
    1516            1 :         let storage = InMemoryState {
    1517            1 :             persisted_state: state,
    1518            1 :         };
    1519            1 : 
    1520            1 :         sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
    1521            1 : 
    1522            1 :         // and ensure voting second time for 1 is not ok
    1523            1 :         vote_resp = sk.process_msg(&vote_request).await;
    1524            1 :         match vote_resp.unwrap() {
    1525            1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
    1526            1 :             r => panic!("unexpected response: {:?}", r),
    1527            1 :         }
    1528            1 :     }
    1529              : 
    1530              :     #[tokio::test]
    1531            1 :     async fn test_last_log_term_switch() {
    1532            1 :         let storage = InMemoryState {
    1533            1 :             persisted_state: test_sk_state(),
    1534            1 :         };
    1535            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1536            1 : 
    1537            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1538            1 : 
    1539            1 :         let mut ar_hdr = AppendRequestHeader {
    1540            1 :             generation: Generation::new(0),
    1541            1 :             term: 2,
    1542            1 :             begin_lsn: Lsn(1),
    1543            1 :             end_lsn: Lsn(2),
    1544            1 :             commit_lsn: Lsn(0),
    1545            1 :             truncate_lsn: Lsn(0),
    1546            1 :         };
    1547            1 :         let mut append_request = AppendRequest {
    1548            1 :             h: ar_hdr.clone(),
    1549            1 :             wal_data: Bytes::from_static(b"b"),
    1550            1 :         };
    1551            1 : 
    1552            1 :         let pem = ProposerElected {
    1553            1 :             generation: Generation::new(0),
    1554            1 :             term: 2,
    1555            1 :             start_streaming_at: Lsn(1),
    1556            1 :             term_history: TermHistory(vec![
    1557            1 :                 TermLsn {
    1558            1 :                     term: 1,
    1559            1 :                     lsn: Lsn(1),
    1560            1 :                 },
    1561            1 :                 TermLsn {
    1562            1 :                     term: 2,
    1563            1 :                     lsn: Lsn(3),
    1564            1 :                 },
    1565            1 :             ]),
    1566            1 :         };
    1567            1 : 
    1568            1 :         // check that elected msg with generation mismatch is rejected
    1569            1 :         let mut pem_gen_mismatch = pem.clone();
    1570            1 :         pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1571            1 :         assert!(
    1572            1 :             sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
    1573            1 :                 .await
    1574            1 :                 .is_err()
    1575            1 :         );
    1576            1 : 
    1577            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1578            1 :             .await
    1579            1 :             .unwrap();
    1580            1 : 
    1581            1 :         // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
    1582            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1583            1 :             .await
    1584            1 :             .unwrap();
    1585            1 :         assert_eq!(sk.get_last_log_term(), 1);
    1586            1 : 
    1587            1 :         // but record at term_start_lsn does the switch
    1588            1 :         ar_hdr.begin_lsn = Lsn(2);
    1589            1 :         ar_hdr.end_lsn = Lsn(3);
    1590            1 :         append_request = AppendRequest {
    1591            1 :             h: ar_hdr,
    1592            1 :             wal_data: Bytes::from_static(b"b"),
    1593            1 :         };
    1594            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1595            1 :             .await
    1596            1 :             .unwrap();
    1597            1 :         assert_eq!(sk.get_last_log_term(), 2);
    1598            1 :     }
    1599              : 
    1600              :     #[tokio::test]
    1601            1 :     async fn test_non_consecutive_write() {
    1602            1 :         let storage = InMemoryState {
    1603            1 :             persisted_state: test_sk_state(),
    1604            1 :         };
    1605            1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1606            1 : 
    1607            1 :         let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
    1608            1 : 
    1609            1 :         let pem = ProposerElected {
    1610            1 :             generation: Generation::new(0),
    1611            1 :             term: 1,
    1612            1 :             start_streaming_at: Lsn(1),
    1613            1 :             term_history: TermHistory(vec![TermLsn {
    1614            1 :                 term: 1,
    1615            1 :                 lsn: Lsn(1),
    1616            1 :             }]),
    1617            1 :         };
    1618            1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1619            1 :             .await
    1620            1 :             .unwrap();
    1621            1 : 
    1622            1 :         let ar_hdr = AppendRequestHeader {
    1623            1 :             generation: Generation::new(0),
    1624            1 :             term: 1,
    1625            1 :             begin_lsn: Lsn(1),
    1626            1 :             end_lsn: Lsn(2),
    1627            1 :             commit_lsn: Lsn(0),
    1628            1 :             truncate_lsn: Lsn(0),
    1629            1 :         };
    1630            1 :         let append_request = AppendRequest {
    1631            1 :             h: ar_hdr.clone(),
    1632            1 :             wal_data: Bytes::from_static(b"b"),
    1633            1 :         };
    1634            1 : 
    1635            1 :         // check that append request with generation mismatch is rejected
    1636            1 :         let mut ar_hdr_gen_mismatch = ar_hdr.clone();
    1637            1 :         ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
    1638            1 :         let append_request_gen_mismatch = AppendRequest {
    1639            1 :             h: ar_hdr_gen_mismatch,
    1640            1 :             wal_data: Bytes::from_static(b"b"),
    1641            1 :         };
    1642            1 :         assert!(
    1643            1 :             sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
    1644            1 :                 append_request_gen_mismatch
    1645            1 :             ))
    1646            1 :             .await
    1647            1 :             .is_err()
    1648            1 :         );
    1649            1 : 
    1650            1 :         // do write ending at 2, it should be ok
    1651            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1652            1 :             .await
    1653            1 :             .unwrap();
    1654            1 :         let mut ar_hrd2 = ar_hdr.clone();
    1655            1 :         ar_hrd2.begin_lsn = Lsn(4);
    1656            1 :         ar_hrd2.end_lsn = Lsn(5);
    1657            1 :         let append_request = AppendRequest {
    1658            1 :             h: ar_hdr,
    1659            1 :             wal_data: Bytes::from_static(b"b"),
    1660            1 :         };
    1661            1 :         // and now starting at 4, it must fail
    1662            1 :         sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1663            1 :             .await
    1664            1 :             .unwrap_err();
    1665            1 :     }
    1666              : 
    1667              :     #[test]
    1668            1 :     fn test_find_highest_common_point_none() {
    1669            1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1670            1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1671            1 :         assert_eq!(
    1672            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1673            1 :             None
    1674            1 :         );
    1675            1 :     }
    1676              : 
    1677              :     #[test]
    1678            1 :     fn test_find_highest_common_point_middle() {
    1679            1 :         let prop_th = TermHistory(vec![
    1680            1 :             (1, Lsn(10)).into(),
    1681            1 :             (2, Lsn(20)).into(),
    1682            1 :             (4, Lsn(40)).into(),
    1683            1 :         ]);
    1684            1 :         let sk_th = TermHistory(vec![
    1685            1 :             (1, Lsn(10)).into(),
    1686            1 :             (2, Lsn(20)).into(),
    1687            1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1688            1 :         ]);
    1689            1 :         assert_eq!(
    1690            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1691            1 :             Some(TermLsn {
    1692            1 :                 term: 2,
    1693            1 :                 lsn: Lsn(30),
    1694            1 :             })
    1695            1 :         );
    1696            1 :     }
    1697              : 
    1698              :     #[test]
    1699            1 :     fn test_find_highest_common_point_sk_end() {
    1700            1 :         let prop_th = TermHistory(vec![
    1701            1 :             (1, Lsn(10)).into(),
    1702            1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1703            1 :             (4, Lsn(40)).into(),
    1704            1 :         ]);
    1705            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1706            1 :         assert_eq!(
    1707            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1708            1 :             Some(TermLsn {
    1709            1 :                 term: 2,
    1710            1 :                 lsn: Lsn(32),
    1711            1 :             })
    1712            1 :         );
    1713            1 :     }
    1714              : 
    1715              :     #[test]
    1716            1 :     fn test_find_highest_common_point_walprop() {
    1717            1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1718            1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1719            1 :         assert_eq!(
    1720            1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1721            1 :             Some(TermLsn {
    1722            1 :                 term: 2,
    1723            1 :                 lsn: Lsn(32),
    1724            1 :             })
    1725            1 :         );
    1726            1 :     }
    1727              : 
    1728              :     #[test]
    1729            1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1730            1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1731            1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1732            1 :         let state = TimelinePersistentState {
    1733            1 :             tenant_id,
    1734            1 :             timeline_id,
    1735            1 :             mconf: Configuration {
    1736            1 :                 generation: SafekeeperGeneration::new(42),
    1737            1 :                 members: MemberSet::new(vec![SafekeeperId {
    1738            1 :                     id: NodeId(1),
    1739            1 :                     host: "hehe.org".to_owned(),
    1740            1 :                     pg_port: 5432,
    1741            1 :                 }])
    1742            1 :                 .expect("duplicate member"),
    1743            1 :                 new_members: None,
    1744            1 :             },
    1745            1 :             acceptor_state: AcceptorState {
    1746            1 :                 term: 42,
    1747            1 :                 term_history: TermHistory(vec![TermLsn {
    1748            1 :                     lsn: Lsn(0x1),
    1749            1 :                     term: 41,
    1750            1 :                 }]),
    1751            1 :             },
    1752            1 :             server: ServerInfo {
    1753            1 :                 pg_version: 14,
    1754            1 :                 system_id: 0x1234567887654321,
    1755            1 :                 wal_seg_size: 0x12345678,
    1756            1 :             },
    1757            1 :             proposer_uuid: {
    1758            1 :                 let mut arr = timeline_id.as_arr();
    1759            1 :                 arr.reverse();
    1760            1 :                 arr
    1761            1 :             },
    1762            1 :             timeline_start_lsn: Lsn(0x12345600),
    1763            1 :             local_start_lsn: Lsn(0x12),
    1764            1 :             commit_lsn: Lsn(1234567800),
    1765            1 :             backup_lsn: Lsn(1234567300),
    1766            1 :             peer_horizon_lsn: Lsn(9999999),
    1767            1 :             remote_consistent_lsn: Lsn(1234560000),
    1768            1 :             partial_backup: crate::wal_backup_partial::State::default(),
    1769            1 :             eviction_state: EvictionState::Present,
    1770            1 :             creation_ts: UNIX_EPOCH,
    1771            1 :         };
    1772            1 : 
    1773            1 :         let ser = state.ser().unwrap();
    1774            1 : 
    1775            1 :         let deser = TimelinePersistentState::des(&ser).unwrap();
    1776            1 : 
    1777            1 :         assert_eq!(deser, state);
    1778            1 :     }
    1779              : }
        

Generated by: LCOV version 2.1-beta