LCOV - differential code coverage report
Current view: top level - safekeeper/src - safekeeper.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.8 % 902 810 92 810
Current Date: 2024-01-09 02:06:09 Functions: 57.3 % 344 197 147 197
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Acceptor part of proposer-acceptor consensus algorithm.
       2                 : 
       3                 : use anyhow::{bail, Context, Result};
       4                 : use byteorder::{LittleEndian, ReadBytesExt};
       5                 : use bytes::{Buf, BufMut, Bytes, BytesMut};
       6                 : 
       7                 : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
       8                 : use serde::{Deserialize, Serialize};
       9                 : use std::cmp::max;
      10                 : use std::cmp::min;
      11                 : use std::fmt;
      12                 : use std::io::Read;
      13                 : use std::time::Duration;
      14                 : use storage_broker::proto::SafekeeperTimelineInfo;
      15                 : 
      16                 : use tracing::*;
      17                 : 
      18                 : use crate::control_file;
      19                 : use crate::send_wal::HotStandbyFeedback;
      20                 : 
      21                 : use crate::wal_storage;
      22                 : use pq_proto::SystemId;
      23                 : use utils::pageserver_feedback::PageserverFeedback;
      24                 : use utils::{
      25                 :     bin_ser::LeSer,
      26                 :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      27                 :     lsn::Lsn,
      28                 : };
      29                 : 
      30                 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      31                 : pub const SK_FORMAT_VERSION: u32 = 7;
      32                 : const SK_PROTOCOL_VERSION: u32 = 2;
      33                 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
      34                 : 
      35                 : /// Consensus logical timestamp.
      36                 : pub type Term = u64;
      37                 : pub const INVALID_TERM: Term = 0;
      38                 : 
      39 CBC        8907 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
      40                 : pub struct TermLsn {
      41                 :     pub term: Term,
      42                 :     pub lsn: Lsn,
      43                 : }
      44                 : 
      45                 : // Creation from tuple provides less typing (e.g. for unit tests).
      46                 : impl From<(Term, Lsn)> for TermLsn {
      47         3274993 :     fn from(pair: (Term, Lsn)) -> TermLsn {
      48         3274993 :         TermLsn {
      49         3274993 :             term: pair.0,
      50         3274993 :             lsn: pair.1,
      51         3274993 :         }
      52         3274993 :     }
      53                 : }
      54                 : 
      55           13927 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
      56                 : pub struct TermHistory(pub Vec<TermLsn>);
      57                 : 
      58                 : impl TermHistory {
      59             501 :     pub fn empty() -> TermHistory {
      60             501 :         TermHistory(Vec::new())
      61             501 :     }
      62                 : 
      63                 :     // Parse TermHistory as n_entries followed by TermLsn pairs
      64             830 :     pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
      65             830 :         if bytes.remaining() < 4 {
      66 UBC           0 :             bail!("TermHistory misses len");
      67 CBC         830 :         }
      68             830 :         let n_entries = bytes.get_u32_le();
      69             830 :         let mut res = Vec::with_capacity(n_entries as usize);
      70             830 :         for _ in 0..n_entries {
      71            3499 :             if bytes.remaining() < 16 {
      72 UBC           0 :                 bail!("TermHistory is incomplete");
      73 CBC        3499 :             }
      74            3499 :             res.push(TermLsn {
      75            3499 :                 term: bytes.get_u64_le(),
      76            3499 :                 lsn: bytes.get_u64_le().into(),
      77            3499 :             })
      78                 :         }
      79             830 :         Ok(TermHistory(res))
      80             830 :     }
      81                 : 
      82                 :     /// Return copy of self with switches happening strictly after up_to
      83                 :     /// truncated.
      84           23337 :     pub fn up_to(&self, up_to: Lsn) -> TermHistory {
      85           23337 :         let mut res = Vec::with_capacity(self.0.len());
      86           62048 :         for e in &self.0 {
      87           38746 :             if e.lsn > up_to {
      88              35 :                 break;
      89           38711 :             }
      90           38711 :             res.push(*e);
      91                 :         }
      92           23337 :         TermHistory(res)
      93           23337 :     }
      94                 : 
      95                 :     /// Find point of divergence between leader (walproposer) term history and
      96                 :     /// safekeeper. Arguments are not symmetrics as proposer history ends at
      97                 :     /// +infinity while safekeeper at flush_lsn.
      98                 :     /// C version is at walproposer SendProposerElected.
      99               5 :     pub fn find_highest_common_point(
     100               5 :         prop_th: &TermHistory,
     101               5 :         sk_th: &TermHistory,
     102               5 :         sk_wal_end: Lsn,
     103               5 :     ) -> Option<TermLsn> {
     104               5 :         let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
     105                 : 
     106               5 :         if let Some(sk_th_last) = sk_th.last() {
     107               5 :             assert!(
     108               5 :                 sk_th_last.lsn <= sk_wal_end,
     109 UBC           0 :                 "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
     110                 :                 sk_th_last,
     111                 :                 sk_wal_end
     112                 :             );
     113               0 :         }
     114                 : 
     115                 :         // find last common term, if any...
     116 CBC           5 :         let mut last_common_idx = None;
     117               9 :         for i in 0..min(sk_th.len(), prop_th.len()) {
     118               9 :             if prop_th[i].term != sk_th[i].term {
     119               2 :                 break;
     120               7 :             }
     121                 :             // If term is the same, LSN must be equal as well.
     122               7 :             assert!(
     123               7 :                 prop_th[i].lsn == sk_th[i].lsn,
     124 UBC           0 :                 "same term {} has different start LSNs: prop {}, sk {}",
     125               0 :                 prop_th[i].term,
     126               0 :                 prop_th[i].lsn,
     127               0 :                 sk_th[i].lsn
     128                 :             );
     129 CBC           7 :             last_common_idx = Some(i);
     130                 :         }
     131               5 :         let last_common_idx = match last_common_idx {
     132               1 :             None => return None, // no common point
     133               4 :             Some(lci) => lci,
     134               4 :         };
     135               4 :         // Now find where it ends at both prop and sk and take min. End of
     136               4 :         // (common) term is the start of the next except it is the last one;
     137               4 :         // there it is flush_lsn in case of safekeeper or, in case of proposer
     138               4 :         // +infinity, so we just take flush_lsn then.
     139               4 :         if last_common_idx == prop_th.len() - 1 {
     140               1 :             Some(TermLsn {
     141               1 :                 term: prop_th[last_common_idx].term,
     142               1 :                 lsn: sk_wal_end,
     143               1 :             })
     144                 :         } else {
     145               3 :             let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
     146               3 :             let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
     147               1 :                 sk_th[last_common_idx + 1].lsn
     148                 :             } else {
     149               2 :                 sk_wal_end
     150                 :             };
     151               3 :             Some(TermLsn {
     152               3 :                 term: prop_th[last_common_idx].term,
     153               3 :                 lsn: min(prop_common_term_end, sk_common_term_end),
     154               3 :             })
     155                 :         }
     156               5 :     }
     157                 : }
     158                 : 
     159                 : /// Display only latest entries for Debug.
     160                 : impl fmt::Debug for TermHistory {
     161            2577 :     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
     162            2577 :         let n_printed = 20;
     163            2577 :         write!(
     164            2577 :             fmt,
     165            2577 :             "{}{:?}",
     166            2577 :             if self.0.len() > n_printed { "... " } else { "" },
     167            2577 :             self.0
     168            2577 :                 .iter()
     169            2577 :                 .rev()
     170            2577 :                 .take(n_printed)
     171            6567 :                 .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
     172            2577 :                 .collect::<Vec<_>>()
     173            2577 :         )
     174            2577 :     }
     175                 : }
     176                 : 
     177                 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
     178                 : pub type PgUuid = [u8; 16];
     179                 : 
     180                 : /// Persistent consensus state of the acceptor.
     181           13092 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     182                 : pub struct AcceptorState {
     183                 :     /// acceptor's last term it voted for (advanced in 1 phase)
     184                 :     pub term: Term,
     185                 :     /// History of term switches for safekeeper's WAL.
     186                 :     /// Actually it often goes *beyond* WAL contents as we adopt term history
     187                 :     /// from the proposer before recovery.
     188                 :     pub term_history: TermHistory,
     189                 : }
     190                 : 
     191                 : impl AcceptorState {
     192                 :     /// acceptor's epoch is the term of the highest entry in the log
     193           21589 :     pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
     194           21589 :         let th = self.term_history.up_to(flush_lsn);
     195           21589 :         match th.0.last() {
     196           21098 :             Some(e) => e.term,
     197             491 :             None => 0,
     198                 :         }
     199           21589 :     }
     200                 : }
     201                 : 
     202                 : /// Information about Postgres. Safekeeper gets it once and then verifies
     203                 : /// all further connections from computes match.
     204           13140 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     205                 : pub struct ServerInfo {
     206                 :     /// Postgres server version
     207                 :     pub pg_version: u32,
     208                 :     pub system_id: SystemId,
     209                 :     pub wal_seg_size: u32,
     210                 : }
     211                 : 
     212               4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     213                 : pub struct PersistedPeerInfo {
     214                 :     /// LSN up to which safekeeper offloaded WAL to s3.
     215                 :     pub backup_lsn: Lsn,
     216                 :     /// Term of the last entry.
     217                 :     pub term: Term,
     218                 :     /// LSN of the last record.
     219                 :     pub flush_lsn: Lsn,
     220                 :     /// Up to which LSN safekeeper regards its WAL as committed.
     221                 :     pub commit_lsn: Lsn,
     222                 : }
     223                 : 
     224                 : impl PersistedPeerInfo {
     225 UBC           0 :     fn new() -> Self {
     226               0 :         Self {
     227               0 :             backup_lsn: Lsn::INVALID,
     228               0 :             term: INVALID_TERM,
     229               0 :             flush_lsn: Lsn(0),
     230               0 :             commit_lsn: Lsn(0),
     231               0 :         }
     232               0 :     }
     233                 : }
     234                 : 
     235 CBC       13092 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     236                 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
     237                 : 
     238                 : /// Persistent information stored on safekeeper node
     239                 : /// On disk data is prefixed by magic and format version and followed by checksum.
     240           13092 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     241                 : pub struct SafeKeeperState {
     242                 :     #[serde(with = "hex")]
     243                 :     pub tenant_id: TenantId,
     244                 :     #[serde(with = "hex")]
     245                 :     pub timeline_id: TimelineId,
     246                 :     /// persistent acceptor state
     247                 :     pub acceptor_state: AcceptorState,
     248                 :     /// information about server
     249                 :     pub server: ServerInfo,
     250                 :     /// Unique id of the last *elected* proposer we dealt with. Not needed
     251                 :     /// for correctness, exists for monitoring purposes.
     252                 :     #[serde(with = "hex")]
     253                 :     pub proposer_uuid: PgUuid,
     254                 :     /// Since which LSN this timeline generally starts. Safekeeper might have
     255                 :     /// joined later.
     256                 :     pub timeline_start_lsn: Lsn,
     257                 :     /// Since which LSN safekeeper has (had) WAL for this timeline.
     258                 :     /// All WAL segments next to one containing local_start_lsn are
     259                 :     /// filled with data from the beginning.
     260                 :     pub local_start_lsn: Lsn,
     261                 :     /// Part of WAL acknowledged by quorum *and available locally*. Always points
     262                 :     /// to record boundary.
     263                 :     pub commit_lsn: Lsn,
     264                 :     /// LSN that points to the end of the last backed up segment. Useful to
     265                 :     /// persist to avoid finding out offloading progress on boot.
     266                 :     pub backup_lsn: Lsn,
     267                 :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
     268                 :     /// of last record streamed to everyone). Persisting it helps skipping
     269                 :     /// recovery in walproposer, generally we compute it from peers. In
     270                 :     /// walproposer proto called 'truncate_lsn'. Updates are currently drived
     271                 :     /// only by walproposer.
     272                 :     pub peer_horizon_lsn: Lsn,
     273                 :     /// LSN of the oldest known checkpoint made by pageserver and successfully
     274                 :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
     275                 :     /// informational purposes, we receive it from pageserver (or broker).
     276                 :     pub remote_consistent_lsn: Lsn,
     277                 :     // Peers and their state as we remember it. Knowing peers themselves is
     278                 :     // fundamental; but state is saved here only for informational purposes and
     279                 :     // obviously can be stale. (Currently not saved at all, but let's provision
     280                 :     // place to have less file version upgrades).
     281                 :     pub peers: PersistedPeers,
     282                 : }
     283                 : 
     284            2792 : #[derive(Debug, Clone, Serialize, Deserialize)]
     285                 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
     286                 : // are not flushed yet.
     287                 : pub struct SafekeeperMemState {
     288                 :     pub commit_lsn: Lsn,
     289                 :     pub backup_lsn: Lsn,
     290                 :     pub peer_horizon_lsn: Lsn,
     291                 :     #[serde(with = "hex")]
     292                 :     pub proposer_uuid: PgUuid,
     293                 : }
     294                 : 
     295                 : impl SafeKeeperState {
     296             501 :     pub fn new(
     297             501 :         ttid: &TenantTimelineId,
     298             501 :         server_info: ServerInfo,
     299             501 :         peers: Vec<NodeId>,
     300             501 :         commit_lsn: Lsn,
     301             501 :         local_start_lsn: Lsn,
     302             501 :     ) -> SafeKeeperState {
     303             501 :         SafeKeeperState {
     304             501 :             tenant_id: ttid.tenant_id,
     305             501 :             timeline_id: ttid.timeline_id,
     306             501 :             acceptor_state: AcceptorState {
     307             501 :                 term: 0,
     308             501 :                 term_history: TermHistory::empty(),
     309             501 :             },
     310             501 :             server: server_info,
     311             501 :             proposer_uuid: [0; 16],
     312             501 :             timeline_start_lsn: Lsn(0),
     313             501 :             local_start_lsn,
     314             501 :             commit_lsn,
     315             501 :             backup_lsn: local_start_lsn,
     316             501 :             peer_horizon_lsn: local_start_lsn,
     317             501 :             remote_consistent_lsn: Lsn(0),
     318             501 :             peers: PersistedPeers(
     319             501 :                 peers
     320             501 :                     .iter()
     321             501 :                     .map(|p| (*p, PersistedPeerInfo::new()))
     322             501 :                     .collect(),
     323             501 :             ),
     324             501 :         }
     325             501 :     }
     326                 : 
     327                 :     #[cfg(test)]
     328               4 :     pub fn empty() -> Self {
     329               4 :         SafeKeeperState::new(
     330               4 :             &TenantTimelineId::empty(),
     331               4 :             ServerInfo {
     332               4 :                 pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
     333               4 :                 system_id: 0,                       /* Postgres system identifier */
     334               4 :                 wal_seg_size: 0,
     335               4 :             },
     336               4 :             vec![],
     337               4 :             Lsn::INVALID,
     338               4 :             Lsn::INVALID,
     339               4 :         )
     340               4 :     }
     341                 : }
     342                 : 
     343                 : // protocol messages
     344                 : 
     345                 : /// Initial Proposer -> Acceptor message
     346            1745 : #[derive(Debug, Deserialize)]
     347                 : pub struct ProposerGreeting {
     348                 :     /// proposer-acceptor protocol version
     349                 :     pub protocol_version: u32,
     350                 :     /// Postgres server version
     351                 :     pub pg_version: u32,
     352                 :     pub proposer_id: PgUuid,
     353                 :     pub system_id: SystemId,
     354                 :     pub timeline_id: TimelineId,
     355                 :     pub tenant_id: TenantId,
     356                 :     pub tli: TimeLineID,
     357                 :     pub wal_seg_size: u32,
     358                 : }
     359                 : 
     360                 : /// Acceptor -> Proposer initial response: the highest term known to me
     361                 : /// (acceptor voted for).
     362 UBC           0 : #[derive(Debug, Serialize)]
     363                 : pub struct AcceptorGreeting {
     364                 :     term: u64,
     365                 :     node_id: NodeId,
     366                 : }
     367                 : 
     368                 : /// Vote request sent from proposer to safekeepers
     369 CBC        1742 : #[derive(Debug, Deserialize)]
     370                 : pub struct VoteRequest {
     371                 :     pub term: Term,
     372                 : }
     373                 : 
     374                 : /// Vote itself, sent from safekeeper to proposer
     375            1743 : #[derive(Debug, Serialize)]
     376                 : pub struct VoteResponse {
     377                 :     pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     378                 :     vote_given: u64, // fixme u64 due to padding
     379                 :     // Safekeeper flush_lsn (end of WAL) + history of term switches allow
     380                 :     // proposer to choose the most advanced one.
     381                 :     pub flush_lsn: Lsn,
     382                 :     truncate_lsn: Lsn,
     383                 :     pub term_history: TermHistory,
     384                 :     timeline_start_lsn: Lsn,
     385                 : }
     386                 : 
     387                 : /*
     388                 :  * Proposer -> Acceptor message announcing proposer is elected and communicating
     389                 :  * term history to it.
     390                 :  */
     391             834 : #[derive(Debug)]
     392                 : pub struct ProposerElected {
     393                 :     pub term: Term,
     394                 :     pub start_streaming_at: Lsn,
     395                 :     pub term_history: TermHistory,
     396                 :     pub timeline_start_lsn: Lsn,
     397                 : }
     398                 : 
     399                 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
     400                 : /// communicates commit_lsn.
     401 UBC           0 : #[derive(Debug)]
     402                 : pub struct AppendRequest {
     403                 :     pub h: AppendRequestHeader,
     404                 :     pub wal_data: Bytes,
     405                 : }
     406 CBC     1975841 : #[derive(Debug, Clone, Deserialize)]
     407                 : pub struct AppendRequestHeader {
     408                 :     // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
     409                 :     pub term: Term,
     410                 :     // TODO: remove this field, it in unused -- LSN of term switch can be taken
     411                 :     // from ProposerElected (as well as from term history).
     412                 :     pub epoch_start_lsn: Lsn,
     413                 :     /// start position of message in WAL
     414                 :     pub begin_lsn: Lsn,
     415                 :     /// end position of message in WAL
     416                 :     pub end_lsn: Lsn,
     417                 :     /// LSN committed by quorum of safekeepers
     418                 :     pub commit_lsn: Lsn,
     419                 :     /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
     420                 :     pub truncate_lsn: Lsn,
     421                 :     // only for logging/debugging
     422                 :     pub proposer_uuid: PgUuid,
     423                 : }
     424                 : 
     425                 : /// Report safekeeper state to proposer
     426               3 : #[derive(Debug, Serialize)]
     427                 : pub struct AppendResponse {
     428                 :     // Current term of the safekeeper; if it is higher than proposer's, the
     429                 :     // compute is out of date.
     430                 :     pub term: Term,
     431                 :     // NOTE: this is physical end of wal on safekeeper; currently it doesn't
     432                 :     // make much sense without taking epoch into account, as history can be
     433                 :     // diverged.
     434                 :     pub flush_lsn: Lsn,
     435                 :     // We report back our awareness about which WAL is committed, as this is
     436                 :     // a criterion for walproposer --sync mode exit
     437                 :     pub commit_lsn: Lsn,
     438                 :     pub hs_feedback: HotStandbyFeedback,
     439                 :     pub pageserver_feedback: PageserverFeedback,
     440                 : }
     441                 : 
     442                 : impl AppendResponse {
     443               1 :     fn term_only(term: Term) -> AppendResponse {
     444               1 :         AppendResponse {
     445               1 :             term,
     446               1 :             flush_lsn: Lsn(0),
     447               1 :             commit_lsn: Lsn(0),
     448               1 :             hs_feedback: HotStandbyFeedback::empty(),
     449               1 :             pageserver_feedback: PageserverFeedback::empty(),
     450               1 :         }
     451               1 :     }
     452                 : }
     453                 : 
     454                 : /// Proposer -> Acceptor messages
     455 UBC           0 : #[derive(Debug)]
     456                 : pub enum ProposerAcceptorMessage {
     457                 :     Greeting(ProposerGreeting),
     458                 :     VoteRequest(VoteRequest),
     459                 :     Elected(ProposerElected),
     460                 :     AppendRequest(AppendRequest),
     461                 :     NoFlushAppendRequest(AppendRequest),
     462                 :     FlushWAL,
     463                 : }
     464                 : 
     465                 : impl ProposerAcceptorMessage {
     466                 :     /// Parse proposer message.
     467 CBC     1980158 :     pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
     468         1980158 :         // xxx using Reader is inefficient but easy to work with bincode
     469         1980158 :         let mut stream = msg_bytes.reader();
     470                 :         // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
     471         1980158 :         let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
     472         1980158 :         match tag {
     473                 :             'g' => {
     474            1745 :                 let msg = ProposerGreeting::des_from(&mut stream)?;
     475            1745 :                 Ok(ProposerAcceptorMessage::Greeting(msg))
     476                 :             }
     477                 :             'v' => {
     478            1742 :                 let msg = VoteRequest::des_from(&mut stream)?;
     479            1742 :                 Ok(ProposerAcceptorMessage::VoteRequest(msg))
     480                 :             }
     481                 :             'e' => {
     482             830 :                 let mut msg_bytes = stream.into_inner();
     483             830 :                 if msg_bytes.remaining() < 16 {
     484 UBC           0 :                     bail!("ProposerElected message is not complete");
     485 CBC         830 :                 }
     486             830 :                 let term = msg_bytes.get_u64_le();
     487             830 :                 let start_streaming_at = msg_bytes.get_u64_le().into();
     488             830 :                 let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
     489             830 :                 if msg_bytes.remaining() < 8 {
     490 UBC           0 :                     bail!("ProposerElected message is not complete");
     491 CBC         830 :                 }
     492             830 :                 let timeline_start_lsn = msg_bytes.get_u64_le().into();
     493             830 :                 let msg = ProposerElected {
     494             830 :                     term,
     495             830 :                     start_streaming_at,
     496             830 :                     timeline_start_lsn,
     497             830 :                     term_history,
     498             830 :                 };
     499             830 :                 Ok(ProposerAcceptorMessage::Elected(msg))
     500                 :             }
     501                 :             'a' => {
     502                 :                 // read header followed by wal data
     503         1975841 :                 let hdr = AppendRequestHeader::des_from(&mut stream)?;
     504         1975841 :                 let rec_size = hdr
     505         1975841 :                     .end_lsn
     506         1975841 :                     .checked_sub(hdr.begin_lsn)
     507         1975841 :                     .context("begin_lsn > end_lsn in AppendRequest")?
     508                 :                     .0 as usize;
     509         1975841 :                 if rec_size > MAX_SEND_SIZE {
     510 UBC           0 :                     bail!(
     511               0 :                         "AppendRequest is longer than MAX_SEND_SIZE ({})",
     512               0 :                         MAX_SEND_SIZE
     513               0 :                     );
     514 CBC     1975841 :                 }
     515         1975841 : 
     516         1975841 :                 let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
     517         1975841 :                 stream.read_exact(&mut wal_data_vec)?;
     518         1975841 :                 let wal_data = Bytes::from(wal_data_vec);
     519         1975841 :                 let msg = AppendRequest { h: hdr, wal_data };
     520         1975841 : 
     521         1975841 :                 Ok(ProposerAcceptorMessage::AppendRequest(msg))
     522                 :             }
     523 UBC           0 :             _ => bail!("unknown proposer-acceptor message tag: {}", tag),
     524                 :         }
     525 CBC     1980158 :     }
     526                 : }
     527                 : 
     528                 : /// Acceptor -> Proposer messages
     529 UBC           0 : #[derive(Debug)]
     530                 : pub enum AcceptorProposerMessage {
     531                 :     Greeting(AcceptorGreeting),
     532                 :     VoteResponse(VoteResponse),
     533                 :     AppendResponse(AppendResponse),
     534                 : }
     535                 : 
     536                 : impl AcceptorProposerMessage {
     537                 :     /// Serialize acceptor -> proposer message.
     538 CBC     1297710 :     pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
     539         1297710 :         match self {
     540            1744 :             AcceptorProposerMessage::Greeting(msg) => {
     541            1744 :                 buf.put_u64_le('g' as u64);
     542            1744 :                 buf.put_u64_le(msg.term);
     543            1744 :                 buf.put_u64_le(msg.node_id.0);
     544            1744 :             }
     545            1740 :             AcceptorProposerMessage::VoteResponse(msg) => {
     546            1740 :                 buf.put_u64_le('v' as u64);
     547            1740 :                 buf.put_u64_le(msg.term);
     548            1740 :                 buf.put_u64_le(msg.vote_given);
     549            1740 :                 buf.put_u64_le(msg.flush_lsn.into());
     550            1740 :                 buf.put_u64_le(msg.truncate_lsn.into());
     551            1740 :                 buf.put_u32_le(msg.term_history.0.len() as u32);
     552            5584 :                 for e in &msg.term_history.0 {
     553            3844 :                     buf.put_u64_le(e.term);
     554            3844 :                     buf.put_u64_le(e.lsn.into());
     555            3844 :                 }
     556            1740 :                 buf.put_u64_le(msg.timeline_start_lsn.into());
     557                 :             }
     558         1294226 :             AcceptorProposerMessage::AppendResponse(msg) => {
     559         1294226 :                 buf.put_u64_le('a' as u64);
     560         1294226 :                 buf.put_u64_le(msg.term);
     561         1294226 :                 buf.put_u64_le(msg.flush_lsn.into());
     562         1294226 :                 buf.put_u64_le(msg.commit_lsn.into());
     563         1294226 :                 buf.put_i64_le(msg.hs_feedback.ts);
     564         1294226 :                 buf.put_u64_le(msg.hs_feedback.xmin);
     565         1294226 :                 buf.put_u64_le(msg.hs_feedback.catalog_xmin);
     566         1294226 : 
     567         1294226 :                 msg.pageserver_feedback.serialize(buf);
     568         1294226 :             }
     569                 :         }
     570                 : 
     571         1297710 :         Ok(())
     572         1297710 :     }
     573                 : }
     574                 : 
     575                 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
     576                 : /// It controls all WAL disk writes and updates of control file.
     577                 : ///
     578                 : /// Currently safekeeper processes:
     579                 : /// - messages from compute (proposers) and provides replies
     580                 : /// - messages from broker peers
     581                 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
     582                 :     /// LSN since the proposer safekeeper currently talking to appends WAL;
     583                 :     /// determines epoch switch point.
     584                 :     pub epoch_start_lsn: Lsn,
     585                 : 
     586                 :     pub inmem: SafekeeperMemState, // in memory part
     587                 :     pub state: CTRL,               // persistent state storage
     588                 : 
     589                 :     pub wal_store: WAL,
     590                 : 
     591                 :     node_id: NodeId, // safekeeper's node id
     592                 : }
     593                 : 
     594                 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
     595                 : where
     596                 :     CTRL: control_file::Storage,
     597                 :     WAL: wal_storage::Storage,
     598                 : {
     599                 :     /// Accepts a control file storage containing the safekeeper state.
     600                 :     /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
     601                 :     /// and `server` (`wal_seg_size` inside it) fields.
     602             585 :     pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
     603             585 :         if state.tenant_id == TenantId::from([0u8; 16])
     604             585 :             || state.timeline_id == TimelineId::from([0u8; 16])
     605                 :         {
     606 UBC           0 :             bail!(
     607               0 :                 "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
     608               0 :                 state.tenant_id,
     609               0 :                 state.timeline_id
     610               0 :             );
     611 CBC         585 :         }
     612             585 : 
     613             585 :         Ok(SafeKeeper {
     614             585 :             epoch_start_lsn: Lsn(0),
     615             585 :             inmem: SafekeeperMemState {
     616             585 :                 commit_lsn: state.commit_lsn,
     617             585 :                 backup_lsn: state.backup_lsn,
     618             585 :                 peer_horizon_lsn: state.peer_horizon_lsn,
     619             585 :                 proposer_uuid: state.proposer_uuid,
     620             585 :             },
     621             585 :             state,
     622             585 :             wal_store,
     623             585 :             node_id,
     624             585 :         })
     625             585 :     }
     626                 : 
     627                 :     /// Get history of term switches for the available WAL
     628            1745 :     fn get_term_history(&self) -> TermHistory {
     629            1745 :         self.state
     630            1745 :             .acceptor_state
     631            1745 :             .term_history
     632            1745 :             .up_to(self.flush_lsn())
     633            1745 :     }
     634                 : 
     635                 :     /// Get current term.
     636         3274526 :     pub fn get_term(&self) -> Term {
     637         3274526 :         self.state.acceptor_state.term
     638         3274526 :     }
     639                 : 
     640           21342 :     pub fn get_epoch(&self) -> Term {
     641           21342 :         self.state.acceptor_state.get_epoch(self.flush_lsn())
     642           21342 :     }
     643                 : 
     644                 :     /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
     645         6587367 :     pub fn flush_lsn(&self) -> Lsn {
     646         6587367 :         max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
     647         6587367 :     }
     648                 : 
     649                 :     /// Process message from proposer and possibly form reply. Concurrent
     650                 :     /// callers must exclude each other.
     651         3274399 :     pub async fn process_msg(
     652         3274399 :         &mut self,
     653         3274399 :         msg: &ProposerAcceptorMessage,
     654         3274399 :     ) -> Result<Option<AcceptorProposerMessage>> {
     655         3274399 :         match msg {
     656            1745 :             ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
     657            5065 :             ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
     658         1491227 :             ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
     659               5 :             ProposerAcceptorMessage::AppendRequest(msg) => {
     660              21 :                 self.handle_append_request(msg, true).await
     661                 :             }
     662         1975841 :             ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
     663         2487787 :                 self.handle_append_request(msg, false).await
     664                 :             }
     665         1294228 :             ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
     666                 :         }
     667         3274398 :     }
     668                 : 
     669                 :     /// Handle initial message from proposer: check its sanity and send my
     670                 :     /// current term.
     671            1745 :     async fn handle_greeting(
     672            1745 :         &mut self,
     673            1745 :         msg: &ProposerGreeting,
     674            1745 :     ) -> Result<Option<AcceptorProposerMessage>> {
     675            1745 :         // Check protocol compatibility
     676            1745 :         if msg.protocol_version != SK_PROTOCOL_VERSION {
     677 UBC           0 :             bail!(
     678               0 :                 "incompatible protocol version {}, expected {}",
     679               0 :                 msg.protocol_version,
     680               0 :                 SK_PROTOCOL_VERSION
     681               0 :             );
     682 CBC        1745 :         }
     683            1745 :         /* Postgres major version mismatch is treated as fatal error
     684            1745 :          * because safekeepers parse WAL headers and the format
     685            1745 :          * may change between versions.
     686            1745 :          */
     687            1745 :         if msg.pg_version / 10000 != self.state.server.pg_version / 10000
     688 UBC           0 :             && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
     689                 :         {
     690               0 :             bail!(
     691               0 :                 "incompatible server version {}, expected {}",
     692               0 :                 msg.pg_version,
     693               0 :                 self.state.server.pg_version
     694               0 :             );
     695 CBC        1745 :         }
     696            1745 : 
     697            1745 :         if msg.tenant_id != self.state.tenant_id {
     698 UBC           0 :             bail!(
     699               0 :                 "invalid tenant ID, got {}, expected {}",
     700               0 :                 msg.tenant_id,
     701               0 :                 self.state.tenant_id
     702               0 :             );
     703 CBC        1745 :         }
     704            1745 :         if msg.timeline_id != self.state.timeline_id {
     705 UBC           0 :             bail!(
     706               0 :                 "invalid timeline ID, got {}, expected {}",
     707               0 :                 msg.timeline_id,
     708               0 :                 self.state.timeline_id
     709               0 :             );
     710 CBC        1745 :         }
     711            1745 :         if self.state.server.wal_seg_size != msg.wal_seg_size {
     712 UBC           0 :             bail!(
     713               0 :                 "invalid wal_seg_size, got {}, expected {}",
     714               0 :                 msg.wal_seg_size,
     715               0 :                 self.state.server.wal_seg_size
     716               0 :             );
     717 CBC        1745 :         }
     718            1745 : 
     719            1745 :         // system_id will be updated on mismatch
     720            1745 :         // sync-safekeepers doesn't know sysid and sends 0, ignore it
     721            1745 :         if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
     722             443 :             if self.state.server.system_id != 0 {
     723 UBC           0 :                 warn!(
     724               0 :                     "unexpected system ID arrived, got {}, expected {}",
     725               0 :                     msg.system_id, self.state.server.system_id
     726               0 :                 );
     727 CBC         443 :             }
     728                 : 
     729             443 :             let mut state = self.state.clone();
     730             443 :             state.server.system_id = msg.system_id;
     731             443 :             if msg.pg_version != UNKNOWN_SERVER_VERSION {
     732             443 :                 state.server.pg_version = msg.pg_version;
     733             443 :             }
     734            1326 :             self.state.persist(&state).await?;
     735            1302 :         }
     736                 : 
     737            1745 :         info!(
     738            1745 :             "processed greeting from walproposer {}, sending term {:?}",
     739           27920 :             msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
     740            1745 :             self.state.acceptor_state.term
     741            1745 :         );
     742            1745 :         Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
     743            1745 :             term: self.state.acceptor_state.term,
     744            1745 :             node_id: self.node_id,
     745            1745 :         })))
     746            1745 :     }
     747                 : 
     748                 :     /// Give vote for the given term, if we haven't done that previously.
     749            1745 :     async fn handle_vote_request(
     750            1745 :         &mut self,
     751            1745 :         msg: &VoteRequest,
     752            1745 :     ) -> Result<Option<AcceptorProposerMessage>> {
     753            1745 :         // Once voted, we won't accept data from older proposers; flush
     754            1745 :         // everything we've already received so that new proposer starts
     755            1745 :         // streaming at end of our WAL, without overlap. Currently we truncate
     756            1745 :         // WAL at streaming point, so this avoids truncating already committed
     757            1745 :         // WAL.
     758            1745 :         //
     759            1745 :         // TODO: it would be smoother to not truncate committed piece at
     760            1745 :         // handle_elected instead. Currently not a big deal, as proposer is the
     761            1745 :         // only source of WAL; with peer2peer recovery it would be more
     762            1745 :         // important.
     763            1745 :         self.wal_store.flush_wal().await?;
     764                 :         // initialize with refusal
     765            1745 :         let mut resp = VoteResponse {
     766            1745 :             term: self.state.acceptor_state.term,
     767            1745 :             vote_given: false as u64,
     768            1745 :             flush_lsn: self.flush_lsn(),
     769            1745 :             truncate_lsn: self.inmem.peer_horizon_lsn,
     770            1745 :             term_history: self.get_term_history(),
     771            1745 :             timeline_start_lsn: self.state.timeline_start_lsn,
     772            1745 :         };
     773            1745 :         if self.state.acceptor_state.term < msg.term {
     774            1691 :             let mut state = self.state.clone();
     775            1691 :             state.acceptor_state.term = msg.term;
     776            1691 :             // persist vote before sending it out
     777            5065 :             self.state.persist(&state).await?;
     778                 : 
     779            1691 :             resp.term = self.state.acceptor_state.term;
     780            1691 :             resp.vote_given = true as u64;
     781              54 :         }
     782            1743 :         info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
     783            1745 :         Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
     784            1745 :     }
     785                 : 
     786                 :     /// Form AppendResponse from current state.
     787         1294233 :     fn append_response(&self) -> AppendResponse {
     788         1294233 :         let ar = AppendResponse {
     789         1294233 :             term: self.state.acceptor_state.term,
     790         1294233 :             flush_lsn: self.flush_lsn(),
     791         1294233 :             commit_lsn: self.state.commit_lsn,
     792         1294233 :             // will be filled by the upper code to avoid bothering safekeeper
     793         1294233 :             hs_feedback: HotStandbyFeedback::empty(),
     794         1294233 :             pageserver_feedback: PageserverFeedback::empty(),
     795         1294233 :         };
     796         1294233 :         trace!("formed AppendResponse {:?}", ar);
     797         1294233 :         ar
     798         1294233 :     }
     799                 : 
     800             835 :     async fn handle_elected(
     801             835 :         &mut self,
     802             835 :         msg: &ProposerElected,
     803             835 :     ) -> Result<Option<AcceptorProposerMessage>> {
     804             834 :         info!("received ProposerElected {:?}", msg);
     805             835 :         if self.state.acceptor_state.term < msg.term {
     806               4 :             let mut state = self.state.clone();
     807               4 :             state.acceptor_state.term = msg.term;
     808               9 :             self.state.persist(&state).await?;
     809             831 :         }
     810                 : 
     811                 :         // If our term is higher, ignore the message (next feedback will inform the compute)
     812             835 :         if self.state.acceptor_state.term > msg.term {
     813 UBC           0 :             return Ok(None);
     814 CBC         835 :         }
     815             835 : 
     816             835 :         // This might happen in a rare race when another (old) connection from
     817             835 :         // the same walproposer writes + flushes WAL after this connection
     818             835 :         // already sent flush_lsn in VoteRequest. It is generally safe to
     819             835 :         // proceed, but to prevent commit_lsn surprisingly going down we should
     820             835 :         // either refuse the session (simpler) or skip the part we already have
     821             835 :         // from the stream (can be implemented).
     822             835 :         if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
     823 UBC           0 :             bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
     824               0 :                    msg.term, self.flush_lsn(), msg.start_streaming_at)
     825 CBC         835 :         }
     826             835 :         // Otherwise we must never attempt to truncate committed data.
     827             835 :         assert!(
     828             835 :             msg.start_streaming_at >= self.inmem.commit_lsn,
     829 UBC           0 :             "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
     830                 :             msg.start_streaming_at,
     831                 :             self.inmem.commit_lsn
     832                 :         );
     833                 : 
     834                 :         // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
     835                 :         // intersection of our history and history from msg
     836                 : 
     837                 :         // truncate wal, update the LSNs
     838 CBC     1488771 :         self.wal_store.truncate_wal(msg.start_streaming_at).await?;
     839                 : 
     840                 :         // and now adopt term history from proposer
     841                 :         {
     842             835 :             let mut state = self.state.clone();
     843             835 : 
     844             835 :             // Here we learn initial LSN for the first time, set fields
     845             835 :             // interested in that.
     846             835 : 
     847             835 :             if state.timeline_start_lsn == Lsn(0) {
     848                 :                 // Remember point where WAL begins globally.
     849             448 :                 state.timeline_start_lsn = msg.timeline_start_lsn;
     850             447 :                 info!(
     851             447 :                     "setting timeline_start_lsn to {:?}",
     852             447 :                     state.timeline_start_lsn
     853             447 :                 );
     854             387 :             }
     855             835 :             if state.local_start_lsn == Lsn(0) {
     856             445 :                 state.local_start_lsn = msg.start_streaming_at;
     857             444 :                 info!("setting local_start_lsn to {:?}", state.local_start_lsn);
     858             390 :             }
     859                 :             // Initializing commit_lsn before acking first flushed record is
     860                 :             // important to let find_end_of_wal skip the hole in the beginning
     861                 :             // of the first segment.
     862                 :             //
     863                 :             // NB: on new clusters, this happens at the same time as
     864                 :             // timeline_start_lsn initialization, it is taken outside to provide
     865                 :             // upgrade.
     866             835 :             self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
     867             835 : 
     868             835 :             // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
     869             835 :             self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
     870             835 : 
     871             835 :             state.acceptor_state.term_history = msg.term_history.clone();
     872            2447 :             self.persist_control_file(state).await?;
     873                 :         }
     874                 : 
     875             834 :         info!("start receiving WAL since {:?}", msg.start_streaming_at);
     876                 : 
     877                 :         // Cache LSN where term starts to immediately fsync control file with
     878                 :         // commit_lsn once we reach it -- sync-safekeepers finishes when
     879                 :         // persisted commit_lsn on majority of safekeepers aligns.
     880             835 :         self.epoch_start_lsn = match msg.term_history.0.last() {
     881 UBC           0 :             None => bail!("proposer elected with empty term history"),
     882 CBC         835 :             Some(term_lsn_start) => term_lsn_start.lsn,
     883             835 :         };
     884             835 : 
     885             835 :         Ok(None)
     886             835 :     }
     887                 : 
     888                 :     /// Advance commit_lsn taking into account what we have locally.
     889                 :     ///
     890                 :     /// Note: it is assumed that 'WAL we have is from the right term' check has
     891                 :     /// already been done outside.
     892         1985059 :     async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
     893         1985059 :         // Both peers and walproposer communicate this value, we might already
     894         1985059 :         // have a fresher (higher) version.
     895         1985059 :         candidate = max(candidate, self.inmem.commit_lsn);
     896         1985059 :         let commit_lsn = min(candidate, self.flush_lsn());
     897         1985059 :         assert!(
     898         1985059 :             commit_lsn >= self.inmem.commit_lsn,
     899 UBC           0 :             "commit_lsn monotonicity violated: old={} new={}",
     900                 :             self.inmem.commit_lsn,
     901                 :             commit_lsn
     902                 :         );
     903                 : 
     904 CBC     1985059 :         self.inmem.commit_lsn = commit_lsn;
     905         1985059 : 
     906         1985059 :         // If new commit_lsn reached epoch switch, force sync of control
     907         1985059 :         // file: walproposer in sync mode is very interested when this
     908         1985059 :         // happens. Note: this is for sync-safekeepers mode only, as
     909         1985059 :         // otherwise commit_lsn might jump over epoch_start_lsn.
     910         1985059 :         if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
     911              88 :             self.persist_control_file(self.state.clone()).await?;
     912         1985029 :         }
     913                 : 
     914         1985059 :         Ok(())
     915         1985059 :     }
     916                 : 
     917                 :     /// Persist in-memory state of control file to disk.
     918                 :     //
     919                 :     // TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better
     920                 :     // separate state completely and give Arc to all those who need it.
     921             920 :     pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
     922             920 :         let mut state = self.state.clone();
     923             920 :         state.remote_consistent_lsn = inmem_remote_consistent_lsn;
     924            2851 :         self.persist_control_file(state).await
     925             920 :     }
     926                 : 
     927                 :     /// Persist in-memory state to the disk, taking other data from state.
     928            2964 :     async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
     929            2964 :         state.commit_lsn = self.inmem.commit_lsn;
     930            2964 :         state.backup_lsn = self.inmem.backup_lsn;
     931            2964 :         state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
     932            2964 :         state.proposer_uuid = self.inmem.proposer_uuid;
     933            8840 :         self.state.persist(&state).await
     934            2964 :     }
     935                 : 
     936                 :     /// Persist control file if there is something to save and enough time
     937                 :     /// passed after the last save.
     938            1846 :     pub async fn maybe_persist_inmem_control_file(
     939            1846 :         &mut self,
     940            1846 :         inmem_remote_consistent_lsn: Lsn,
     941            1846 :     ) -> Result<()> {
     942            1846 :         const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
     943            1846 :         if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
     944            1842 :             return Ok(());
     945               4 :         }
     946               4 :         let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
     947               2 :             || self.inmem.backup_lsn > self.state.backup_lsn
     948               2 :             || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
     949               2 :             || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
     950               4 :         if need_persist {
     951               6 :             self.persist_inmem(inmem_remote_consistent_lsn).await?;
     952 UBC           0 :             trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
     953 CBC           2 :         }
     954               4 :         Ok(())
     955            1846 :     }
     956                 : 
     957                 :     /// Handle request to append WAL.
     958                 :     #[allow(clippy::comparison_chain)]
     959         1975846 :     async fn handle_append_request(
     960         1975846 :         &mut self,
     961         1975846 :         msg: &AppendRequest,
     962         1975846 :         require_flush: bool,
     963         1975846 :     ) -> Result<Option<AcceptorProposerMessage>> {
     964         1975846 :         if self.state.acceptor_state.term < msg.h.term {
     965 UBC           0 :             bail!("got AppendRequest before ProposerElected");
     966 CBC     1975846 :         }
     967         1975846 : 
     968         1975846 :         // If our term is higher, immediately refuse the message.
     969         1975846 :         if self.state.acceptor_state.term > msg.h.term {
     970               1 :             let resp = AppendResponse::term_only(self.state.acceptor_state.term);
     971               1 :             return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
     972         1975845 :         }
     973         1975845 : 
     974         1975845 :         // Now we know that we are in the same term as the proposer,
     975         1975845 :         // processing the message.
     976         1975845 : 
     977         1975845 :         self.inmem.proposer_uuid = msg.h.proposer_uuid;
     978         1975845 : 
     979         1975845 :         // do the job
     980         1975845 :         if !msg.wal_data.is_empty() {
     981         1216621 :             self.wal_store
     982         1216621 :                 .write_wal(msg.h.begin_lsn, &msg.wal_data)
     983         2485345 :                 .await?;
     984          759224 :         }
     985                 : 
     986                 :         // flush wal to the disk, if required
     987         1975844 :         if require_flush {
     988               5 :             self.wal_store.flush_wal().await?;
     989         1975839 :         }
     990                 : 
     991                 :         // Update commit_lsn.
     992         1975844 :         if msg.h.commit_lsn != Lsn(0) {
     993         1973273 :             self.update_commit_lsn(msg.h.commit_lsn).await?;
     994            2571 :         }
     995                 :         // Value calculated by walproposer can always lag:
     996                 :         // - safekeepers can forget inmem value and send to proposer lower
     997                 :         //   persisted one on restart;
     998                 :         // - if we make safekeepers always send persistent value,
     999                 :         //   any compute restart would pull it down.
    1000                 :         // Thus, take max before adopting.
    1001         1975844 :         self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
    1002         1975844 : 
    1003         1975844 :         // Update truncate and commit LSN in control file.
    1004         1975844 :         // To avoid negative impact on performance of extra fsync, do it only
    1005         1975844 :         // when truncate_lsn delta exceeds WAL segment size.
    1006         1975844 :         if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
    1007         1975844 :             < self.inmem.peer_horizon_lsn
    1008                 :         {
    1009            2387 :             self.persist_control_file(self.state.clone()).await?;
    1010         1975021 :         }
    1011                 : 
    1012 UBC           0 :         trace!(
    1013               0 :             "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
    1014               0 :             msg.wal_data.len(),
    1015               0 :             msg.h.end_lsn,
    1016               0 :             msg.h.commit_lsn,
    1017               0 :             msg.h.truncate_lsn,
    1018               0 :             require_flush,
    1019               0 :         );
    1020                 : 
    1021                 :         // If flush_lsn hasn't updated, AppendResponse is not very useful.
    1022 CBC     1975844 :         if !require_flush {
    1023         1975839 :             return Ok(None);
    1024               5 :         }
    1025               5 : 
    1026               5 :         let resp = self.append_response();
    1027               5 :         Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
    1028         1975845 :     }
    1029                 : 
    1030                 :     /// Flush WAL to disk. Return AppendResponse with latest LSNs.
    1031         1294228 :     async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
    1032         1294228 :         self.wal_store.flush_wal().await?;
    1033         1294228 :         Ok(Some(AcceptorProposerMessage::AppendResponse(
    1034         1294228 :             self.append_response(),
    1035         1294228 :         )))
    1036         1294228 :     }
    1037                 : 
    1038                 :     /// Update timeline state with peer safekeeper data.
    1039           11960 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
    1040           11960 :         let mut sync_control_file = false;
    1041           11960 : 
    1042           11960 :         if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
    1043                 :             // Note: the check is too restrictive, generally we can update local
    1044                 :             // commit_lsn if our history matches (is part of) history of advanced
    1045                 :             // commit_lsn provider.
    1046           11841 :             if sk_info.last_log_term == self.get_epoch() {
    1047           11786 :                 self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
    1048              55 :             }
    1049             119 :         }
    1050                 : 
    1051           11960 :         let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
    1052           11960 :         sync_control_file |=
    1053           11960 :             self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
    1054           11960 :         self.inmem.backup_lsn = new_backup_lsn;
    1055           11960 : 
    1056           11960 :         // value in sk_info should be maximized over our local in memory value.
    1057           11960 :         let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
    1058           11960 :         assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
    1059           11960 :         sync_control_file |= self.state.remote_consistent_lsn
    1060           11960 :             + (self.state.server.wal_seg_size as u64)
    1061           11960 :             < new_remote_consistent_lsn;
    1062           11960 : 
    1063           11960 :         let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
    1064           11960 :         sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
    1065           11960 :             < new_peer_horizon_lsn;
    1066           11960 :         self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
    1067           11960 : 
    1068           11960 :         if sync_control_file {
    1069             356 :             let mut state = self.state.clone();
    1070             356 :             state.remote_consistent_lsn = new_remote_consistent_lsn;
    1071            1067 :             self.persist_control_file(state).await?;
    1072           11604 :         }
    1073           11960 :         Ok(())
    1074           11960 :     }
    1075                 : 
    1076                 :     /// Get oldest segno we still need to keep. We hold WAL till it is consumed
    1077                 :     /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
    1078                 :     /// offloading.
    1079                 :     /// While it is safe to use inmem values for determining horizon,
    1080                 :     /// we use persistent to make possible normal states less surprising.
    1081            1846 :     pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
    1082            1846 :         let mut horizon_lsn = min(
    1083            1846 :             self.state.remote_consistent_lsn,
    1084            1846 :             self.state.peer_horizon_lsn,
    1085            1846 :         );
    1086            1846 :         if wal_backup_enabled {
    1087            1846 :             horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
    1088            1846 :         }
    1089            1846 :         horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
    1090            1846 :     }
    1091                 : }
    1092                 : 
    1093                 : #[cfg(test)]
    1094                 : mod tests {
    1095                 :     use futures::future::BoxFuture;
    1096                 :     use postgres_ffi::WAL_SEGMENT_SIZE;
    1097                 : 
    1098                 :     use super::*;
    1099                 :     use crate::wal_storage::Storage;
    1100                 :     use std::{ops::Deref, str::FromStr, time::Instant};
    1101                 : 
    1102                 :     // fake storage for tests
    1103                 :     struct InMemoryState {
    1104                 :         persisted_state: SafeKeeperState,
    1105                 :     }
    1106                 : 
    1107                 :     #[async_trait::async_trait]
    1108                 :     impl control_file::Storage for InMemoryState {
    1109               3 :         async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
    1110               3 :             self.persisted_state = s.clone();
    1111               3 :             Ok(())
    1112               3 :         }
    1113                 : 
    1114 UBC           0 :         fn last_persist_at(&self) -> Instant {
    1115               0 :             Instant::now()
    1116               0 :         }
    1117                 :     }
    1118                 : 
    1119                 :     impl Deref for InMemoryState {
    1120                 :         type Target = SafeKeeperState;
    1121                 : 
    1122 CBC          56 :         fn deref(&self) -> &Self::Target {
    1123              56 :             &self.persisted_state
    1124              56 :         }
    1125                 :     }
    1126                 : 
    1127               2 :     fn test_sk_state() -> SafeKeeperState {
    1128               2 :         let mut state = SafeKeeperState::empty();
    1129               2 :         state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
    1130               2 :         state.tenant_id = TenantId::from([1u8; 16]);
    1131               2 :         state.timeline_id = TimelineId::from([1u8; 16]);
    1132               2 :         state
    1133               2 :     }
    1134                 : 
    1135                 :     struct DummyWalStore {
    1136                 :         lsn: Lsn,
    1137                 :     }
    1138                 : 
    1139                 :     #[async_trait::async_trait]
    1140                 :     impl wal_storage::Storage for DummyWalStore {
    1141               9 :         fn flush_lsn(&self) -> Lsn {
    1142               9 :             self.lsn
    1143               9 :         }
    1144                 : 
    1145               2 :         async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
    1146               2 :             self.lsn = startpos + buf.len() as u64;
    1147               2 :             Ok(())
    1148               2 :         }
    1149                 : 
    1150               2 :         async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
    1151               2 :             self.lsn = end_pos;
    1152               2 :             Ok(())
    1153               2 :         }
    1154                 : 
    1155               4 :         async fn flush_wal(&mut self) -> Result<()> {
    1156               4 :             Ok(())
    1157               4 :         }
    1158                 : 
    1159 UBC           0 :         fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
    1160               0 :             Box::pin(async { Ok(()) })
    1161               0 :         }
    1162                 : 
    1163               0 :         fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
    1164               0 :             crate::metrics::WalStorageMetrics::default()
    1165               0 :         }
    1166                 :     }
    1167                 : 
    1168 CBC           1 :     #[tokio::test]
    1169               1 :     async fn test_voting() {
    1170               1 :         let storage = InMemoryState {
    1171               1 :             persisted_state: test_sk_state(),
    1172               1 :         };
    1173               1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1174               1 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1175               1 : 
    1176               1 :         // check voting for 1 is ok
    1177               1 :         let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
    1178               1 :         let mut vote_resp = sk.process_msg(&vote_request).await;
    1179               1 :         match vote_resp.unwrap() {
    1180               1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
    1181 UBC           0 :             r => panic!("unexpected response: {:?}", r),
    1182                 :         }
    1183                 : 
    1184                 :         // reboot...
    1185 CBC           1 :         let state = sk.state.persisted_state.clone();
    1186               1 :         let storage = InMemoryState {
    1187               1 :             persisted_state: state,
    1188               1 :         };
    1189               1 : 
    1190               1 :         sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
    1191                 : 
    1192                 :         // and ensure voting second time for 1 is not ok
    1193               1 :         vote_resp = sk.process_msg(&vote_request).await;
    1194               1 :         match vote_resp.unwrap() {
    1195               1 :             Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
    1196 UBC           0 :             r => panic!("unexpected response: {:?}", r),
    1197                 :         }
    1198                 :     }
    1199                 : 
    1200 CBC           1 :     #[tokio::test]
    1201               1 :     async fn test_epoch_switch() {
    1202               1 :         let storage = InMemoryState {
    1203               1 :             persisted_state: test_sk_state(),
    1204               1 :         };
    1205               1 :         let wal_store = DummyWalStore { lsn: Lsn(0) };
    1206               1 : 
    1207               1 :         let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
    1208               1 : 
    1209               1 :         let mut ar_hdr = AppendRequestHeader {
    1210               1 :             term: 1,
    1211               1 :             epoch_start_lsn: Lsn(3),
    1212               1 :             begin_lsn: Lsn(1),
    1213               1 :             end_lsn: Lsn(2),
    1214               1 :             commit_lsn: Lsn(0),
    1215               1 :             truncate_lsn: Lsn(0),
    1216               1 :             proposer_uuid: [0; 16],
    1217               1 :         };
    1218               1 :         let mut append_request = AppendRequest {
    1219               1 :             h: ar_hdr.clone(),
    1220               1 :             wal_data: Bytes::from_static(b"b"),
    1221               1 :         };
    1222               1 : 
    1223               1 :         let pem = ProposerElected {
    1224               1 :             term: 1,
    1225               1 :             start_streaming_at: Lsn(1),
    1226               1 :             term_history: TermHistory(vec![TermLsn {
    1227               1 :                 term: 1,
    1228               1 :                 lsn: Lsn(3),
    1229               1 :             }]),
    1230               1 :             timeline_start_lsn: Lsn(0),
    1231               1 :         };
    1232               1 :         sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
    1233 UBC           0 :             .await
    1234 CBC           1 :             .unwrap();
    1235                 : 
    1236                 :         // check that AppendRequest before epochStartLsn doesn't switch epoch
    1237               1 :         let resp = sk
    1238               1 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1239 UBC           0 :             .await;
    1240 CBC           1 :         assert!(resp.is_ok());
    1241               1 :         assert_eq!(sk.get_epoch(), 0);
    1242                 : 
    1243                 :         // but record at epochStartLsn does the switch
    1244               1 :         ar_hdr.begin_lsn = Lsn(2);
    1245               1 :         ar_hdr.end_lsn = Lsn(3);
    1246               1 :         append_request = AppendRequest {
    1247               1 :             h: ar_hdr,
    1248               1 :             wal_data: Bytes::from_static(b"b"),
    1249               1 :         };
    1250               1 :         let resp = sk
    1251               1 :             .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
    1252 UBC           0 :             .await;
    1253 CBC           1 :         assert!(resp.is_ok());
    1254               1 :         sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
    1255               1 :         assert_eq!(sk.get_epoch(), 1);
    1256                 :     }
    1257                 : 
    1258               1 :     #[test]
    1259               1 :     fn test_find_highest_common_point_none() {
    1260               1 :         let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
    1261               1 :         let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
    1262               1 :         assert_eq!(
    1263               1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
    1264               1 :             None
    1265               1 :         );
    1266               1 :     }
    1267                 : 
    1268               1 :     #[test]
    1269               1 :     fn test_find_highest_common_point_middle() {
    1270               1 :         let prop_th = TermHistory(vec![
    1271               1 :             (1, Lsn(10)).into(),
    1272               1 :             (2, Lsn(20)).into(),
    1273               1 :             (4, Lsn(40)).into(),
    1274               1 :         ]);
    1275               1 :         let sk_th = TermHistory(vec![
    1276               1 :             (1, Lsn(10)).into(),
    1277               1 :             (2, Lsn(20)).into(),
    1278               1 :             (3, Lsn(30)).into(), // sk ends last common term 2 at 30
    1279               1 :         ]);
    1280               1 :         assert_eq!(
    1281               1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
    1282               1 :             Some(TermLsn {
    1283               1 :                 term: 2,
    1284               1 :                 lsn: Lsn(30),
    1285               1 :             })
    1286               1 :         );
    1287               1 :     }
    1288                 : 
    1289               1 :     #[test]
    1290               1 :     fn test_find_highest_common_point_sk_end() {
    1291               1 :         let prop_th = TermHistory(vec![
    1292               1 :             (1, Lsn(10)).into(),
    1293               1 :             (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
    1294               1 :             (4, Lsn(40)).into(),
    1295               1 :         ]);
    1296               1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1297               1 :         assert_eq!(
    1298               1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1299               1 :             Some(TermLsn {
    1300               1 :                 term: 2,
    1301               1 :                 lsn: Lsn(32),
    1302               1 :             })
    1303               1 :         );
    1304               1 :     }
    1305                 : 
    1306               1 :     #[test]
    1307               1 :     fn test_find_highest_common_point_walprop() {
    1308               1 :         let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1309               1 :         let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
    1310               1 :         assert_eq!(
    1311               1 :             TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
    1312               1 :             Some(TermLsn {
    1313               1 :                 term: 2,
    1314               1 :                 lsn: Lsn(32),
    1315               1 :             })
    1316               1 :         );
    1317               1 :     }
    1318                 : 
    1319               1 :     #[test]
    1320               1 :     fn test_sk_state_bincode_serde_roundtrip() {
    1321               1 :         use utils::Hex;
    1322               1 :         let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
    1323               1 :         let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
    1324               1 :         let state = SafeKeeperState {
    1325               1 :             tenant_id,
    1326               1 :             timeline_id,
    1327               1 :             acceptor_state: AcceptorState {
    1328               1 :                 term: 42,
    1329               1 :                 term_history: TermHistory(vec![TermLsn {
    1330               1 :                     lsn: Lsn(0x1),
    1331               1 :                     term: 41,
    1332               1 :                 }]),
    1333               1 :             },
    1334               1 :             server: ServerInfo {
    1335               1 :                 pg_version: 14,
    1336               1 :                 system_id: 0x1234567887654321,
    1337               1 :                 wal_seg_size: 0x12345678,
    1338               1 :             },
    1339               1 :             proposer_uuid: {
    1340               1 :                 let mut arr = timeline_id.as_arr();
    1341               1 :                 arr.reverse();
    1342               1 :                 arr
    1343               1 :             },
    1344               1 :             timeline_start_lsn: Lsn(0x12345600),
    1345               1 :             local_start_lsn: Lsn(0x12),
    1346               1 :             commit_lsn: Lsn(1234567800),
    1347               1 :             backup_lsn: Lsn(1234567300),
    1348               1 :             peer_horizon_lsn: Lsn(9999999),
    1349               1 :             remote_consistent_lsn: Lsn(1234560000),
    1350               1 :             peers: PersistedPeers(vec![(
    1351               1 :                 NodeId(1),
    1352               1 :                 PersistedPeerInfo {
    1353               1 :                     backup_lsn: Lsn(1234567000),
    1354               1 :                     term: 42,
    1355               1 :                     flush_lsn: Lsn(1234567800 - 8),
    1356               1 :                     commit_lsn: Lsn(1234567600),
    1357               1 :                 },
    1358               1 :             )]),
    1359               1 :         };
    1360               1 : 
    1361               1 :         let ser = state.ser().unwrap();
    1362               1 : 
    1363               1 :         #[rustfmt::skip]
    1364               1 :         let expected = [
    1365               1 :             // tenant_id as length prefixed hex
    1366               1 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1367               1 :             0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
    1368               1 :             // timeline_id as length prefixed hex
    1369               1 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1370               1 :             0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
    1371               1 :             // term
    1372               1 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1373               1 :             // length prefix
    1374               1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1375               1 :             // unsure why this order is swapped
    1376               1 :             0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1377               1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1378               1 :             // pg_version
    1379               1 :             0x0e, 0x00, 0x00, 0x00,
    1380               1 :             // systemid
    1381               1 :             0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
    1382               1 :             // wal_seg_size
    1383               1 :             0x78, 0x56, 0x34, 0x12,
    1384               1 :             // pguuid as length prefixed hex
    1385               1 :             0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1386               1 :             0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
    1387               1 : 
    1388               1 :             // timeline_start_lsn
    1389               1 :             0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
    1390               1 :             0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1391               1 :             0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1392               1 :             0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1393               1 :             0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
    1394               1 :             0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1395               1 :             // length prefix for persistentpeers
    1396               1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1397               1 :             // nodeid
    1398               1 :             0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1399               1 :             // backuplsn
    1400               1 :             0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
    1401               1 :             0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    1402               1 :             0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1403               1 :             0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
    1404               1 :         ];
    1405               1 : 
    1406               1 :         assert_eq!(Hex(&ser), Hex(&expected));
    1407                 : 
    1408               1 :         let deser = SafeKeeperState::des(&ser).unwrap();
    1409               1 : 
    1410               1 :         assert_eq!(deser, state);
    1411               1 :     }
    1412                 : }
        

Generated by: LCOV version 2.1-beta