LCOV - differential code coverage report
Current view: top level - safekeeper/src - send_wal.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 93.7 % 462 433 1 28 433
Current Date: 2024-01-09 02:06:09 Functions: 45.3 % 159 72 2 85 72
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module implements the streaming side of replication protocol, starting
       2                 : //! with the "START_REPLICATION" message, and registry of walsenders.
       3                 : 
       4                 : use crate::handler::SafekeeperPostgresHandler;
       5                 : use crate::safekeeper::{Term, TermLsn};
       6                 : use crate::timeline::Timeline;
       7                 : use crate::wal_service::ConnectionId;
       8                 : use crate::wal_storage::WalReader;
       9                 : use crate::GlobalTimelines;
      10                 : use anyhow::{bail, Context as AnyhowContext};
      11                 : use bytes::Bytes;
      12                 : use parking_lot::Mutex;
      13                 : use postgres_backend::PostgresBackend;
      14                 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
      15                 : use postgres_ffi::get_current_timestamp;
      16                 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
      17                 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      18                 : use serde::{Deserialize, Serialize};
      19                 : use tokio::io::{AsyncRead, AsyncWrite};
      20                 : use utils::failpoint_support;
      21                 : use utils::id::TenantTimelineId;
      22                 : use utils::lsn::AtomicLsn;
      23                 : use utils::pageserver_feedback::PageserverFeedback;
      24                 : 
      25                 : use std::cmp::{max, min};
      26                 : use std::net::SocketAddr;
      27                 : use std::str;
      28                 : use std::sync::Arc;
      29                 : use std::time::Duration;
      30                 : use tokio::sync::watch::Receiver;
      31                 : use tokio::time::timeout;
      32                 : use tracing::*;
      33                 : use utils::{bin_ser::BeSer, lsn::Lsn};
      34                 : 
      35                 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
      36                 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      37                 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      38                 : // neon extension of replication protocol
      39                 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      40                 : 
      41                 : type FullTransactionId = u64;
      42                 : 
      43                 : /// Hot standby feedback received from replica
      44 CBC           3 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      45                 : pub struct HotStandbyFeedback {
      46                 :     pub ts: TimestampTz,
      47                 :     pub xmin: FullTransactionId,
      48                 :     pub catalog_xmin: FullTransactionId,
      49                 : }
      50                 : 
      51                 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      52                 : 
      53                 : impl HotStandbyFeedback {
      54         1295244 :     pub fn empty() -> HotStandbyFeedback {
      55         1295244 :         HotStandbyFeedback {
      56         1295244 :             ts: 0,
      57         1295244 :             xmin: 0,
      58         1295244 :             catalog_xmin: 0,
      59         1295244 :         }
      60         1295244 :     }
      61                 : }
      62                 : 
      63                 : /// Standby status update
      64            1295 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      65                 : pub struct StandbyReply {
      66                 :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
      67                 :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
      68                 :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
      69                 :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
      70                 :     pub reply_requested: bool,
      71                 : }
      72                 : 
      73                 : impl StandbyReply {
      74               4 :     fn empty() -> Self {
      75               4 :         StandbyReply {
      76               4 :             write_lsn: Lsn::INVALID,
      77               4 :             flush_lsn: Lsn::INVALID,
      78               4 :             apply_lsn: Lsn::INVALID,
      79               4 :             reply_ts: 0,
      80               4 :             reply_requested: false,
      81               4 :         }
      82               4 :     }
      83                 : }
      84                 : 
      85 LBC         (2) : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      86                 : pub struct StandbyFeedback {
      87                 :     reply: StandbyReply,
      88                 :     hs_feedback: HotStandbyFeedback,
      89                 : }
      90                 : 
      91                 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
      92                 : pub struct WalSenders {
      93                 :     /// Lsn maximized over all walsenders *and* peer data, so might be higher
      94                 :     /// than what we receive from replicas.
      95                 :     remote_consistent_lsn: AtomicLsn,
      96                 :     mutex: Mutex<WalSendersShared>,
      97                 : }
      98                 : 
      99                 : impl WalSenders {
     100 CBC         582 :     pub fn new(remote_consistent_lsn: Lsn) -> Arc<WalSenders> {
     101             582 :         Arc::new(WalSenders {
     102             582 :             remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn),
     103             582 :             mutex: Mutex::new(WalSendersShared::new()),
     104             582 :         })
     105             582 :     }
     106                 : 
     107                 :     /// Register new walsender. Returned guard provides access to the slot and
     108                 :     /// automatically deregisters in Drop.
     109             734 :     fn register(
     110             734 :         self: &Arc<WalSenders>,
     111             734 :         ttid: TenantTimelineId,
     112             734 :         addr: SocketAddr,
     113             734 :         conn_id: ConnectionId,
     114             734 :         appname: Option<String>,
     115             734 :     ) -> WalSenderGuard {
     116             734 :         let slots = &mut self.mutex.lock().slots;
     117             734 :         let walsender_state = WalSenderState {
     118             734 :             ttid,
     119             734 :             addr,
     120             734 :             conn_id,
     121             734 :             appname,
     122             734 :             feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     123             734 :         };
     124                 :         // find empty slot or create new one
     125             734 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
     126             256 :             slots[pos] = Some(walsender_state);
     127             256 :             pos
     128                 :         } else {
     129             478 :             let pos = slots.len();
     130             478 :             slots.push(Some(walsender_state));
     131             478 :             pos
     132                 :         };
     133             734 :         WalSenderGuard {
     134             734 :             id: pos,
     135             734 :             walsenders: self.clone(),
     136             734 :         }
     137             734 :     }
     138                 : 
     139                 :     /// Get state of all walsenders.
     140             252 :     pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
     141             252 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     142             252 :     }
     143                 : 
     144                 :     /// Get aggregated pageserver feedback.
     145              51 :     pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
     146              51 :         self.mutex.lock().agg_ps_feedback
     147              51 :     }
     148                 : 
     149                 :     /// Get aggregated pageserver and hot standby feedback (we send them to compute).
     150         1294232 :     pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
     151         1294232 :         let shared = self.mutex.lock();
     152         1294232 :         (shared.agg_ps_feedback, shared.agg_hs_feedback)
     153         1294232 :     }
     154                 : 
     155                 :     /// Record new pageserver feedback, update aggregated values.
     156          565811 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     157          565811 :         let mut shared = self.mutex.lock();
     158          565811 :         shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
     159          565811 :         shared.update_ps_feedback();
     160          565811 :         self.update_remote_consistent_lsn(shared.agg_ps_feedback.remote_consistent_lsn);
     161          565811 :     }
     162                 : 
     163                 :     /// Record standby reply.
     164            1295 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     165            1295 :         let mut shared = self.mutex.lock();
     166            1295 :         let slot = shared.get_slot_mut(id);
     167            1295 :         match &mut slot.feedback {
     168            1292 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     169                 :             ReplicationFeedback::Pageserver(_) => {
     170               3 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     171               3 :                     reply: *reply,
     172               3 :                     hs_feedback: HotStandbyFeedback::empty(),
     173               3 :                 })
     174                 :             }
     175                 :         }
     176            1295 :     }
     177                 : 
     178                 :     /// Record hot standby feedback, update aggregated value.
     179               2 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     180               2 :         let mut shared = self.mutex.lock();
     181               2 :         let slot = shared.get_slot_mut(id);
     182               2 :         match &mut slot.feedback {
     183               2 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     184                 :             ReplicationFeedback::Pageserver(_) => {
     185 UBC           0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     186               0 :                     reply: StandbyReply::empty(),
     187               0 :                     hs_feedback: *feedback,
     188               0 :                 })
     189                 :             }
     190                 :         }
     191 CBC           2 :         shared.update_hs_feedback();
     192               2 :     }
     193                 : 
     194                 :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     195                 :     /// client is not pageserver.
     196            3461 :     fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     197            3461 :         let shared = self.mutex.lock();
     198            3461 :         let slot = shared.get_slot(id);
     199            3461 :         match slot.feedback {
     200            3461 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     201 UBC           0 :             _ => None,
     202                 :         }
     203 CBC        3461 :     }
     204                 : 
     205                 :     /// Get remote_consistent_lsn maximized across all walsenders and peers.
     206           26716 :     pub fn get_remote_consistent_lsn(self: &Arc<WalSenders>) -> Lsn {
     207           26716 :         self.remote_consistent_lsn.load()
     208           26716 :     }
     209                 : 
     210                 :     /// Update maximized remote_consistent_lsn, return new (potentially) value.
     211          577771 :     pub fn update_remote_consistent_lsn(self: &Arc<WalSenders>, candidate: Lsn) -> Lsn {
     212          577771 :         self.remote_consistent_lsn
     213          577771 :             .fetch_max(candidate)
     214          577771 :             .max(candidate)
     215          577771 :     }
     216                 : 
     217                 :     /// Unregister walsender.
     218             418 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     219             418 :         let mut shared = self.mutex.lock();
     220             418 :         shared.slots[id] = None;
     221             418 :         shared.update_hs_feedback();
     222             418 :     }
     223                 : }
     224                 : 
     225                 : struct WalSendersShared {
     226                 :     // aggregated over all walsenders value
     227                 :     agg_hs_feedback: HotStandbyFeedback,
     228                 :     // aggregated over all walsenders value
     229                 :     agg_ps_feedback: PageserverFeedback,
     230                 :     slots: Vec<Option<WalSenderState>>,
     231                 : }
     232                 : 
     233                 : impl WalSendersShared {
     234             585 :     fn new() -> Self {
     235             585 :         WalSendersShared {
     236             585 :             agg_hs_feedback: HotStandbyFeedback::empty(),
     237             585 :             agg_ps_feedback: PageserverFeedback::empty(),
     238             585 :             slots: Vec::new(),
     239             585 :         }
     240             585 :     }
     241                 : 
     242                 :     /// Get content of provided id slot, it must exist.
     243            3461 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     244            3461 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     245            3461 :     }
     246                 : 
     247                 :     /// Get mut content of provided id slot, it must exist.
     248          567108 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     249          567108 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     250          567108 :     }
     251                 : 
     252                 :     /// Update aggregated hot standy feedback. We just take min of valid xmins
     253                 :     /// and ts.
     254             422 :     fn update_hs_feedback(&mut self) {
     255             422 :         let mut agg = HotStandbyFeedback::empty();
     256             422 :         for ws_state in self.slots.iter().flatten() {
     257             123 :             if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
     258               6 :                 let hs_feedback = standby_feedback.hs_feedback;
     259               6 :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     260               6 :                 // would be nicer, but we serialize/deserialize this struct
     261               6 :                 // directly, so leave as is for now
     262               6 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     263               2 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     264               1 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     265               1 :                     } else {
     266               1 :                         agg.xmin = hs_feedback.xmin;
     267               1 :                     }
     268               2 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     269               4 :                 }
     270               6 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     271 UBC           0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     272               0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     273               0 :                     } else {
     274               0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     275               0 :                     }
     276               0 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     277 CBC           6 :                 }
     278             117 :             }
     279                 :         }
     280             422 :         self.agg_hs_feedback = agg;
     281             422 :     }
     282                 : 
     283                 :     /// Update aggregated pageserver feedback. LSNs (last_received,
     284                 :     /// disk_consistent, remote_consistent) and reply timestamp are just
     285                 :     /// maximized; timeline_size if taken from feedback with highest
     286                 :     /// last_received lsn. This is generally reasonable, but we might want to
     287                 :     /// implement other policies once multiple pageservers start to be actively
     288                 :     /// used.
     289          565812 :     fn update_ps_feedback(&mut self) {
     290          565812 :         let init = PageserverFeedback::empty();
     291          565812 :         let acc =
     292          565812 :             self.slots
     293          565812 :                 .iter()
     294          565812 :                 .flatten()
     295          570374 :                 .fold(init, |mut acc, ws_state| match ws_state.feedback {
     296          569918 :                     ReplicationFeedback::Pageserver(feedback) => {
     297          569918 :                         if feedback.last_received_lsn > acc.last_received_lsn {
     298          566715 :                             acc.current_timeline_size = feedback.current_timeline_size;
     299          566715 :                         }
     300          569918 :                         acc.last_received_lsn =
     301          569918 :                             max(feedback.last_received_lsn, acc.last_received_lsn);
     302          569918 :                         acc.disk_consistent_lsn =
     303          569918 :                             max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
     304          569918 :                         acc.remote_consistent_lsn =
     305          569918 :                             max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
     306          569918 :                         acc.replytime = max(feedback.replytime, acc.replytime);
     307          569918 :                         acc
     308                 :                     }
     309             456 :                     ReplicationFeedback::Standby(_) => acc,
     310          570374 :                 });
     311          565812 :         self.agg_ps_feedback = acc;
     312          565812 :     }
     313                 : }
     314                 : 
     315                 : // Serialized is used only for pretty printing in json.
     316              92 : #[derive(Debug, Clone, Serialize, Deserialize)]
     317                 : pub struct WalSenderState {
     318                 :     ttid: TenantTimelineId,
     319                 :     addr: SocketAddr,
     320                 :     conn_id: ConnectionId,
     321                 :     // postgres application_name
     322                 :     appname: Option<String>,
     323                 :     feedback: ReplicationFeedback,
     324                 : }
     325                 : 
     326                 : // Receiver is either pageserver or regular standby, which have different
     327                 : // feedbacks.
     328              92 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     329                 : enum ReplicationFeedback {
     330                 :     Pageserver(PageserverFeedback),
     331                 :     Standby(StandbyFeedback),
     332                 : }
     333                 : 
     334                 : // id of the occupied slot in WalSenders to access it (and save in the
     335                 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     336                 : // much sense in that as values aggregation which is performed on each feedback
     337                 : // receival iterates over all walsenders.
     338                 : pub type WalSenderId = usize;
     339                 : 
     340                 : /// Scope guard to access slot in WalSenders registry and unregister from it in
     341                 : /// Drop.
     342                 : pub struct WalSenderGuard {
     343                 :     id: WalSenderId,
     344                 :     walsenders: Arc<WalSenders>,
     345                 : }
     346                 : 
     347                 : impl Drop for WalSenderGuard {
     348             418 :     fn drop(&mut self) {
     349             418 :         self.walsenders.unregister(self.id);
     350             418 :     }
     351                 : }
     352                 : 
     353                 : impl SafekeeperPostgresHandler {
     354                 :     /// Wrapper around handle_start_replication_guts handling result. Error is
     355                 :     /// handled here while we're still in walsender ttid span; with API
     356                 :     /// extension, this can probably be moved into postgres_backend.
     357             734 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
     358             734 :         &mut self,
     359             734 :         pgb: &mut PostgresBackend<IO>,
     360             734 :         start_pos: Lsn,
     361             734 :         term: Option<Term>,
     362             734 :     ) -> Result<(), QueryError> {
     363             734 :         if let Err(end) = self
     364             734 :             .handle_start_replication_guts(pgb, start_pos, term)
     365         2111087 :             .await
     366                 :         {
     367                 :             // Log the result and probably send it to the client, closing the stream.
     368             418 :             pgb.handle_copy_stream_end(end).await;
     369 UBC           0 :         }
     370 CBC         418 :         Ok(())
     371             418 :     }
     372                 : 
     373             734 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     374             734 :         &mut self,
     375             734 :         pgb: &mut PostgresBackend<IO>,
     376             734 :         start_pos: Lsn,
     377             734 :         term: Option<Term>,
     378             734 :     ) -> Result<(), CopyStreamHandlerEnd> {
     379             734 :         let appname = self.appname.clone();
     380             734 :         let tli =
     381             734 :             GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
     382                 : 
     383                 :         // Use a guard object to remove our entry from the timeline when we are done.
     384             734 :         let ws_guard = Arc::new(tli.get_walsenders().register(
     385             734 :             self.ttid,
     386             734 :             *pgb.get_peer_addr(),
     387             734 :             self.conn_id,
     388             734 :             self.appname.clone(),
     389             734 :         ));
     390                 : 
     391                 :         // Walsender can operate in one of two modes which we select by
     392                 :         // application_name: give only committed WAL (used by pageserver) or all
     393                 :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     394                 :         // The second case is always driven by a consensus leader which term
     395                 :         // must be supplied.
     396             734 :         let end_watch = if term.is_some() {
     397              16 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     398                 :         } else {
     399             718 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     400                 :         };
     401                 :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     402             734 :         let end_pos = end_watch.get();
     403             734 : 
     404             734 :         if end_pos < start_pos {
     405              18 :             warn!(
     406              18 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     407              18 :                 start_pos, end_pos
     408              18 :             );
     409             716 :         }
     410                 : 
     411             734 :         info!(
     412             734 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
     413             734 :             start_pos,
     414             734 :             end_pos,
     415             734 :             matches!(end_watch, EndWatch::Flush(_)),
     416             734 :             appname
     417             734 :         );
     418                 : 
     419                 :         // switch to copy
     420             734 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     421                 : 
     422             734 :         let (_, persisted_state) = tli.get_state().await;
     423             734 :         let wal_reader = WalReader::new(
     424             734 :             self.conf.workdir.clone(),
     425             734 :             self.conf.timeline_dir(&tli.ttid),
     426             734 :             &persisted_state,
     427             734 :             start_pos,
     428             734 :             self.conf.wal_backup_enabled,
     429             734 :         )?;
     430                 : 
     431                 :         // Split to concurrently receive and send data; replies are generally
     432                 :         // not synchronized with sends, so this avoids deadlocks.
     433             734 :         let reader = pgb.split().context("START_REPLICATION split")?;
     434                 : 
     435             734 :         let mut sender = WalSender {
     436             734 :             pgb,
     437             734 :             tli: tli.clone(),
     438             734 :             appname,
     439             734 :             start_pos,
     440             734 :             end_pos,
     441             734 :             term,
     442             734 :             end_watch,
     443             734 :             ws_guard: ws_guard.clone(),
     444             734 :             wal_reader,
     445             734 :             send_buf: [0; MAX_SEND_SIZE],
     446             734 :         };
     447             734 :         let mut reply_reader = ReplyReader { reader, ws_guard };
     448                 : 
     449             734 :         let res = tokio::select! {
     450                 :             // todo: add read|write .context to these errors
     451              68 :             r = sender.run() => r,
     452             350 :             r = reply_reader.run() => r,
     453                 :         };
     454                 :         // Join pg backend back.
     455             418 :         pgb.unsplit(reply_reader.reader)?;
     456                 : 
     457             418 :         res
     458             418 :     }
     459                 : }
     460                 : 
     461                 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     462                 : /// given term (recovery by walproposer or peer safekeeper).
     463                 : enum EndWatch {
     464                 :     Commit(Receiver<Lsn>),
     465                 :     Flush(Receiver<TermLsn>),
     466                 : }
     467                 : 
     468                 : impl EndWatch {
     469                 :     /// Get current end of WAL.
     470         2261841 :     fn get(&self) -> Lsn {
     471         2261841 :         match self {
     472         2259871 :             EndWatch::Commit(r) => *r.borrow(),
     473            1970 :             EndWatch::Flush(r) => r.borrow().lsn,
     474                 :         }
     475         2261841 :     }
     476                 : 
     477                 :     /// Wait for the update.
     478         1202617 :     async fn changed(&mut self) -> anyhow::Result<()> {
     479         1202617 :         match self {
     480         1531042 :             EndWatch::Commit(r) => r.changed().await?,
     481              43 :             EndWatch::Flush(r) => r.changed().await?,
     482                 :         }
     483         1198512 :         Ok(())
     484         1198512 :     }
     485                 : }
     486                 : 
     487                 : /// A half driving sending WAL.
     488                 : struct WalSender<'a, IO> {
     489                 :     pgb: &'a mut PostgresBackend<IO>,
     490                 :     tli: Arc<Timeline>,
     491                 :     appname: Option<String>,
     492                 :     // Position since which we are sending next chunk.
     493                 :     start_pos: Lsn,
     494                 :     // WAL up to this position is known to be locally available.
     495                 :     // Usually this is the same as the latest commit_lsn, but in case of
     496                 :     // walproposer recovery, this is flush_lsn.
     497                 :     //
     498                 :     // We send this LSN to the receiver as wal_end, so that it knows how much
     499                 :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     500                 :     end_pos: Lsn,
     501                 :     /// When streaming uncommitted part, the term the client acts as the leader
     502                 :     /// in. Streaming is stopped if local term changes to a different (higher)
     503                 :     /// value.
     504                 :     term: Option<Term>,
     505                 :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     506                 :     end_watch: EndWatch,
     507                 :     ws_guard: Arc<WalSenderGuard>,
     508                 :     wal_reader: WalReader,
     509                 :     // buffer for readling WAL into to send it
     510                 :     send_buf: [u8; MAX_SEND_SIZE],
     511                 : }
     512                 : 
     513                 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     514                 :     /// Send WAL until
     515                 :     /// - an error occurs
     516                 :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     517                 :     ///
     518                 :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     519                 :     /// convenience.
     520             734 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     521                 :         loop {
     522                 :             // Wait for the next portion if it is not there yet, or just
     523                 :             // update our end of WAL available for sending value, we
     524                 :             // communicate it to the receiver.
     525         1531100 :             self.wait_wal().await?;
     526                 :             assert!(
     527          568379 :                 self.end_pos > self.start_pos,
     528 UBC           0 :                 "nothing to send after waiting for WAL"
     529                 :             );
     530                 : 
     531                 :             // try to send as much as available, capped by MAX_SEND_SIZE
     532 CBC      568379 :             let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
     533          568379 :             // if we went behind available WAL, back off
     534          568379 :             if chunk_end_pos >= self.end_pos {
     535          502419 :                 chunk_end_pos = self.end_pos;
     536          502419 :             } else {
     537           65960 :                 // If sending not up to end pos, round down to page boundary to
     538           65960 :                 // avoid breaking WAL record not at page boundary, as protocol
     539           65960 :                 // demands. See walsender.c (XLogSendPhysical).
     540           65960 :                 chunk_end_pos = chunk_end_pos
     541           65960 :                     .checked_sub(chunk_end_pos.block_offset())
     542           65960 :                     .unwrap();
     543           65960 :             }
     544          568379 :             let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
     545          568379 :             let send_buf = &mut self.send_buf[..send_size];
     546                 :             let send_size: usize;
     547                 :             {
     548                 :                 // If uncommitted part is being pulled, check that the term is
     549                 :                 // still the expected one.
     550          568379 :                 let _term_guard = if let Some(t) = self.term {
     551            1905 :                     Some(self.tli.acquire_term(t).await?)
     552                 :                 } else {
     553          566474 :                     None
     554                 :                 };
     555                 :                 // Read WAL into buffer. send_size can be additionally capped to
     556                 :                 // segment boundary here.
     557          568378 :                 send_size = self.wal_reader.read(send_buf).await?
     558                 :             };
     559          568377 :             let send_buf = &send_buf[..send_size];
     560          568377 : 
     561          568377 :             // and send it
     562          568377 :             self.pgb
     563          568377 :                 .write_message(&BeMessage::XLogData(XLogDataBody {
     564          568377 :                     wal_start: self.start_pos.0,
     565          568377 :                     wal_end: self.end_pos.0,
     566          568377 :                     timestamp: get_current_timestamp(),
     567          568377 :                     data: send_buf,
     568          568377 :                 }))
     569           17151 :                 .await?;
     570                 : 
     571          568336 :             if let Some(appname) = &self.appname {
     572          566572 :                 if appname == "replica" {
     573 UBC           0 :                     failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
     574 CBC      566008 :                 }
     575            1764 :             }
     576 UBC           0 :             trace!(
     577               0 :                 "sent {} bytes of WAL {}-{}",
     578               0 :                 send_size,
     579               0 :                 self.start_pos,
     580               0 :                 self.start_pos + send_size as u64
     581               0 :             );
     582 CBC      568336 :             self.start_pos += send_size as u64;
     583                 :         }
     584              68 :     }
     585                 : 
     586                 :     /// wait until we have WAL to stream, sending keepalives and checking for
     587                 :     /// exit in the meanwhile
     588          569070 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     589                 :         loop {
     590          572484 :             self.end_pos = self.end_watch.get();
     591          572484 :             if self.end_pos > self.start_pos {
     592                 :                 // We have something to send.
     593 UBC           0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     594 CBC       82373 :                 return Ok(());
     595          490111 :             }
     596                 : 
     597                 :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     598         1531085 :             if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
     599          486006 :                 self.end_pos = lsn;
     600 UBC           0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     601 CBC      486006 :                 return Ok(());
     602            3462 :             }
     603            3462 : 
     604            3462 :             // Timed out waiting for WAL, check for termination and send KA.
     605            3462 :             // Check for termination only if we are streaming up to commit_lsn
     606            3462 :             // (to pageserver).
     607            3462 :             if let EndWatch::Commit(_) = self.end_watch {
     608            3461 :                 if let Some(remote_consistent_lsn) = self
     609            3461 :                     .ws_guard
     610            3461 :                     .walsenders
     611            3461 :                     .get_ws_remote_consistent_lsn(self.ws_guard.id)
     612                 :                 {
     613            3461 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     614                 :                         // Terminate if there is nothing more to send.
     615                 :                         // Note that "ending streaming" part of the string is used by
     616                 :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     617                 :                         // do not change this string without updating pageserver.
     618              48 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     619              48 :                         "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     620              48 :                         self.appname, self.start_pos,
     621              48 :                     )));
     622            3413 :                     }
     623 UBC           0 :                 }
     624 CBC           1 :             }
     625                 : 
     626            3414 :             self.pgb
     627            3414 :                 .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     628            3414 :                     wal_end: self.end_pos.0,
     629            3414 :                     timestamp: get_current_timestamp(),
     630            3414 :                     request_reply: true,
     631            3414 :                 }))
     632 UBC           0 :                 .await?;
     633                 :         }
     634 CBC      568427 :     }
     635                 : }
     636                 : 
     637                 : /// A half driving receiving replies.
     638                 : struct ReplyReader<IO> {
     639                 :     reader: PostgresBackendReader<IO>,
     640                 :     ws_guard: Arc<WalSenderGuard>,
     641                 : }
     642                 : 
     643                 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     644             734 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     645                 :         loop {
     646         2110936 :             let msg = self.reader.read_copy_message().await?;
     647          567108 :             self.handle_feedback(&msg)?
     648                 :         }
     649             350 :     }
     650                 : 
     651          567108 :     fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
     652          567108 :         match msg.first().cloned() {
     653               2 :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
     654               2 :                 // Note: deserializing is on m[1..] because we skip the tag byte.
     655               2 :                 let hs_feedback = HotStandbyFeedback::des(&msg[1..])
     656               2 :                     .context("failed to deserialize HotStandbyFeedback")?;
     657               2 :                 self.ws_guard
     658               2 :                     .walsenders
     659               2 :                     .record_hs_feedback(self.ws_guard.id, &hs_feedback);
     660                 :             }
     661            1295 :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
     662            1295 :                 let reply =
     663            1295 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
     664            1295 :                 self.ws_guard
     665            1295 :                     .walsenders
     666            1295 :                     .record_standby_reply(self.ws_guard.id, &reply);
     667                 :             }
     668                 :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
     669                 :                 // pageserver sends this.
     670                 :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
     671          565811 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
     672          565811 :                 let ps_feedback = PageserverFeedback::parse(buf);
     673          565811 : 
     674          565811 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
     675          565811 :                 self.ws_guard
     676          565811 :                     .walsenders
     677          565811 :                     .record_ps_feedback(self.ws_guard.id, &ps_feedback);
     678                 :                 // in principle new remote_consistent_lsn could allow to
     679                 :                 // deactivate the timeline, but we check that regularly through
     680                 :                 // broker updated, not need to do it here
     681                 :             }
     682 UBC           0 :             _ => warn!("unexpected message {:?}", msg),
     683                 :         }
     684 CBC      567108 :         Ok(())
     685          567108 :     }
     686                 : }
     687                 : 
     688                 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     689                 : 
     690                 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
     691                 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     692                 : /// - Ok(None) if timeout expired;
     693                 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
     694                 : ///   mode 2) watch channel closed, which must never happen.
     695          490111 : async fn wait_for_lsn(
     696          490111 :     rx: &mut EndWatch,
     697          490111 :     client_term: Option<Term>,
     698          490111 :     start_pos: Lsn,
     699          490111 : ) -> anyhow::Result<Option<Lsn>> {
     700          490111 :     let res = timeout(POLL_STATE_TIMEOUT, async move {
     701                 :         loop {
     702         1688623 :             let end_pos = rx.get();
     703         1688623 :             if end_pos > start_pos {
     704          486006 :                 return Ok(end_pos);
     705         1202617 :             }
     706         1202617 :             if let EndWatch::Flush(rx) = rx {
     707              33 :                 let curr_term = rx.borrow().term;
     708              33 :                 if let Some(client_term) = client_term {
     709              33 :                     if curr_term != client_term {
     710 UBC           0 :                         bail!("term changed: requested {}, now {}", client_term, curr_term);
     711 CBC          33 :                     }
     712 UBC           0 :                 }
     713 CBC     1202584 :             }
     714         1531085 :             rx.changed().await?;
     715                 :         }
     716          490111 :     })
     717         1531085 :     .await;
     718                 : 
     719          486006 :     match res {
     720                 :         // success
     721          486006 :         Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     722                 :         // error inside closure
     723 UBC           0 :         Ok(Err(err)) => Err(err),
     724                 :         // timeout
     725 CBC        3462 :         Err(_) => Ok(None),
     726                 :     }
     727          489468 : }
     728                 : 
     729                 : #[cfg(test)]
     730                 : mod tests {
     731                 :     use postgres_protocol::PG_EPOCH;
     732                 :     use utils::id::{TenantId, TimelineId};
     733                 : 
     734                 :     use super::*;
     735                 : 
     736               6 :     fn mock_ttid() -> TenantTimelineId {
     737               6 :         TenantTimelineId {
     738               6 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     739               6 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
     740               6 :         }
     741               6 :     }
     742                 : 
     743               6 :     fn mock_addr() -> SocketAddr {
     744               6 :         "127.0.0.1:8080".parse().unwrap()
     745               6 :     }
     746                 : 
     747                 :     // add to wss specified feedback setting other fields to dummy values
     748               6 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
     749               6 :         let walsender_state = WalSenderState {
     750               6 :             ttid: mock_ttid(),
     751               6 :             addr: mock_addr(),
     752               6 :             conn_id: 1,
     753               6 :             appname: None,
     754               6 :             feedback,
     755               6 :         };
     756               6 :         wss.slots.push(Some(walsender_state))
     757               6 :     }
     758                 : 
     759                 :     // form standby feedback with given hot standby feedback ts/xmin and the
     760                 :     // rest set to dummy values.
     761               4 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
     762               4 :         ReplicationFeedback::Standby(StandbyFeedback {
     763               4 :             reply: StandbyReply::empty(),
     764               4 :             hs_feedback: HotStandbyFeedback {
     765               4 :                 ts,
     766               4 :                 xmin,
     767               4 :                 catalog_xmin: 0,
     768               4 :             },
     769               4 :         })
     770               4 :     }
     771                 : 
     772                 :     // test that hs aggregation works as expected
     773               1 :     #[test]
     774               1 :     fn test_hs_feedback_no_valid() {
     775               1 :         let mut wss = WalSendersShared::new();
     776               1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     777               1 :         wss.update_hs_feedback();
     778               1 :         assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
     779               1 :     }
     780                 : 
     781               1 :     #[test]
     782               1 :     fn test_hs_feedback() {
     783               1 :         let mut wss = WalSendersShared::new();
     784               1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     785               1 :         push_feedback(&mut wss, hs_feedback(1, 42));
     786               1 :         push_feedback(&mut wss, hs_feedback(1, 64));
     787               1 :         wss.update_hs_feedback();
     788               1 :         assert_eq!(wss.agg_hs_feedback.xmin, 42);
     789               1 :     }
     790                 : 
     791                 :     // form pageserver feedback with given last_record_lsn / tli size and the
     792                 :     // rest set to dummy values.
     793               2 :     fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
     794               2 :         ReplicationFeedback::Pageserver(PageserverFeedback {
     795               2 :             current_timeline_size,
     796               2 :             last_received_lsn,
     797               2 :             disk_consistent_lsn: Lsn::INVALID,
     798               2 :             remote_consistent_lsn: Lsn::INVALID,
     799               2 :             replytime: *PG_EPOCH,
     800               2 :         })
     801               2 :     }
     802                 : 
     803                 :     // test that ps aggregation works as expected
     804               1 :     #[test]
     805               1 :     fn test_ps_feedback() {
     806               1 :         let mut wss = WalSendersShared::new();
     807               1 :         push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
     808               1 :         push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
     809               1 :         wss.update_ps_feedback();
     810               1 :         assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
     811               1 :         assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
     812               1 :     }
     813                 : }
        

Generated by: LCOV version 2.1-beta