LCOV - code coverage report
Current view: top level - safekeeper/src - (source / functions) Coverage Total Hit
Test: Lines: 93.7 % 459 430
Test Date: 2024-02-12 20:26:03 Functions: 45.9 % 159 73

            Line data    Source code
       1              : //! This module implements the streaming side of replication protocol, starting
       2              : //! with the "START_REPLICATION" message, and registry of walsenders.
       3              : 
       4              : use crate::handler::SafekeeperPostgresHandler;
       5              : use crate::safekeeper::{Term, TermLsn};
       6              : use crate::timeline::Timeline;
       7              : use crate::wal_service::ConnectionId;
       8              : use crate::wal_storage::WalReader;
       9              : use crate::GlobalTimelines;
      10              : use anyhow::{bail, Context as AnyhowContext};
      11              : use bytes::Bytes;
      12              : use parking_lot::Mutex;
      13              : use postgres_backend::PostgresBackend;
      14              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
      15              : use postgres_ffi::get_current_timestamp;
      16              : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
      17              : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      18              : use serde::{Deserialize, Serialize};
      19              : use tokio::io::{AsyncRead, AsyncWrite};
      20              : use utils::failpoint_support;
      21              : use utils::id::TenantTimelineId;
      22              : use utils::pageserver_feedback::PageserverFeedback;
      23              : 
      24              : use std::cmp::{max, min};
      25              : use std::net::SocketAddr;
      26              : use std::str;
      27              : use std::sync::Arc;
      28              : use std::time::Duration;
      29              : use tokio::sync::watch::Receiver;
      30              : use tokio::time::timeout;
      31              : use tracing::*;
      32              : use utils::{bin_ser::BeSer, lsn::Lsn};
      33              : 
      34              : // See:
      35              : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      36              : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      37              : // neon extension of replication protocol
      38              : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      39              : 
      40              : type FullTransactionId = u64;
      41              : 
      42              : /// Hot standby feedback received from replica
      43            6 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      44              : pub struct HotStandbyFeedback {
      45              :     pub ts: TimestampTz,
      46              :     pub xmin: FullTransactionId,
      47              :     pub catalog_xmin: FullTransactionId,
      48              : }
      49              : 
      50              : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      51              : 
      52              : impl HotStandbyFeedback {
      53      1661312 :     pub fn empty() -> HotStandbyFeedback {
      54      1661312 :         HotStandbyFeedback {
      55      1661312 :             ts: 0,
      56      1661312 :             xmin: 0,
      57      1661312 :             catalog_xmin: 0,
      58      1661312 :         }
      59      1661312 :     }
      60              : }
      61              : 
      62              : /// Standby status update
      63         1225 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      64              : pub struct StandbyReply {
      65              :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
      66              :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
      67              :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
      68              :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
      69              :     pub reply_requested: bool,
      70              : }
      71              : 
      72              : impl StandbyReply {
      73            8 :     fn empty() -> Self {
      74            8 :         StandbyReply {
      75            8 :             write_lsn: Lsn::INVALID,
      76            8 :             flush_lsn: Lsn::INVALID,
      77            8 :             apply_lsn: Lsn::INVALID,
      78            8 :             reply_ts: 0,
      79            8 :             reply_requested: false,
      80            8 :         }
      81            8 :     }
      82              : }
      83              : 
      84            3 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      85              : pub struct StandbyFeedback {
      86              :     reply: StandbyReply,
      87              :     hs_feedback: HotStandbyFeedback,
      88              : }
      89              : 
      90              : /// WalSenders registry. Timeline holds it (wrapped in Arc).
      91              : pub struct WalSenders {
      92              :     mutex: Mutex<WalSendersShared>,
      93              : }
      94              : 
      95              : impl WalSenders {
      96          612 :     pub fn new() -> Arc<WalSenders> {
      97          612 :         Arc::new(WalSenders {
      98          612 :             mutex: Mutex::new(WalSendersShared::new()),
      99          612 :         })
     100          612 :     }
     101              : 
     102              :     /// Register new walsender. Returned guard provides access to the slot and
     103              :     /// automatically deregisters in Drop.
     104          756 :     fn register(
     105          756 :         self: &Arc<WalSenders>,
     106          756 :         ttid: TenantTimelineId,
     107          756 :         addr: SocketAddr,
     108          756 :         conn_id: ConnectionId,
     109          756 :         appname: Option<String>,
     110          756 :     ) -> WalSenderGuard {
     111          756 :         let slots = &mut self.mutex.lock().slots;
     112          756 :         let walsender_state = WalSenderState {
     113          756 :             ttid,
     114          756 :             addr,
     115          756 :             conn_id,
     116          756 :             appname,
     117          756 :             feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     118          756 :         };
     119              :         // find empty slot or create new one
     120          756 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
     121          231 :             slots[pos] = Some(walsender_state);
     122          231 :             pos
     123              :         } else {
     124          525 :             let pos = slots.len();
     125          525 :             slots.push(Some(walsender_state));
     126          525 :             pos
     127              :         };
     128          756 :         WalSenderGuard {
     129          756 :             id: pos,
     130          756 :             walsenders: self.clone(),
     131          756 :         }
     132          756 :     }
     133              : 
     134              :     /// Get state of all walsenders.
     135          260 :     pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
     136          260 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     137          260 :     }
     138              : 
     139              :     /// Get aggregated pageserver feedback.
     140           51 :     pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
     141           51 :         self.mutex.lock().agg_ps_feedback
     142           51 :     }
     143              : 
     144              :     /// Get aggregated pageserver and hot standby feedback (we send them to compute).
     145      1660289 :     pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
     146      1660289 :         let shared = self.mutex.lock();
     147      1660289 :         (shared.agg_ps_feedback, shared.agg_hs_feedback)
     148      1660289 :     }
     149              : 
     150              :     /// Record new pageserver feedback, update aggregated values.
     151       797257 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     152       797257 :         let mut shared = self.mutex.lock();
     153       797257 :         shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
     154       797257 :         shared.update_ps_feedback();
     155       797257 :     }
     156              : 
     157              :     /// Record standby reply.
     158         1225 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     159         1225 :         let mut shared = self.mutex.lock();
     160         1225 :         let slot = shared.get_slot_mut(id);
     161         1225 :         match &mut {
     162         1222 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     163              :             ReplicationFeedback::Pageserver(_) => {
     164            3 :        = ReplicationFeedback::Standby(StandbyFeedback {
     165            3 :                     reply: *reply,
     166            3 :                     hs_feedback: HotStandbyFeedback::empty(),
     167            3 :                 })
     168              :             }
     169              :         }
     170         1225 :     }
     171              : 
     172              :     /// Record hot standby feedback, update aggregated value.
     173            2 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     174            2 :         let mut shared = self.mutex.lock();
     175            2 :         let slot = shared.get_slot_mut(id);
     176            2 :         match &mut {
     177            2 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     178              :             ReplicationFeedback::Pageserver(_) => {
     179            0 :        = ReplicationFeedback::Standby(StandbyFeedback {
     180            0 :                     reply: StandbyReply::empty(),
     181            0 :                     hs_feedback: *feedback,
     182            0 :                 })
     183              :             }
     184              :         }
     185            2 :         shared.update_hs_feedback();
     186            2 :     }
     187              : 
     188              :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     189              :     /// client is not pageserver.
     190         3332 :     fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     191         3332 :         let shared = self.mutex.lock();
     192         3332 :         let slot = shared.get_slot(id);
     193         3332 :         match {
     194         3332 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     195            0 :             _ => None,
     196              :         }
     197         3332 :     }
     198              : 
     199              :     /// Unregister walsender.
     200          392 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     201          392 :         let mut shared = self.mutex.lock();
     202          392 :         shared.slots[id] = None;
     203          392 :         shared.update_hs_feedback();
     204          392 :     }
     205              : }
     206              : 
     207              : struct WalSendersShared {
     208              :     // aggregated over all walsenders value
     209              :     agg_hs_feedback: HotStandbyFeedback,
     210              :     // aggregated over all walsenders value
     211              :     agg_ps_feedback: PageserverFeedback,
     212              :     slots: Vec<Option<WalSenderState>>,
     213              : }
     214              : 
     215              : impl WalSendersShared {
     216          618 :     fn new() -> Self {
     217          618 :         WalSendersShared {
     218          618 :             agg_hs_feedback: HotStandbyFeedback::empty(),
     219          618 :             agg_ps_feedback: PageserverFeedback::empty(),
     220          618 :             slots: Vec::new(),
     221          618 :         }
     222          618 :     }
     223              : 
     224              :     /// Get content of provided id slot, it must exist.
     225         3332 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     226         3332 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     227         3332 :     }
     228              : 
     229              :     /// Get mut content of provided id slot, it must exist.
     230       798484 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     231       798484 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     232       798484 :     }
     233              : 
     234              :     /// Update aggregated hot standy feedback. We just take min of valid xmins
     235              :     /// and ts.
     236          398 :     fn update_hs_feedback(&mut self) {
     237          398 :         let mut agg = HotStandbyFeedback::empty();
     238          398 :         for ws_state in self.slots.iter().flatten() {
     239          211 :             if let ReplicationFeedback::Standby(standby_feedback) = {
     240           10 :                 let hs_feedback = standby_feedback.hs_feedback;
     241           10 :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     242           10 :                 // would be nicer, but we serialize/deserialize this struct
     243           10 :                 // directly, so leave as is for now
     244           10 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     245            4 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     246            2 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     247            2 :                     } else {
     248            2 :                         agg.xmin = hs_feedback.xmin;
     249            2 :                     }
     250            4 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     251            6 :                 }
     252           10 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     253            0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     254            0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     255            0 :                     } else {
     256            0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     257            0 :                     }
     258            0 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     259           10 :                 }
     260          201 :             }
     261              :         }
     262          398 :         self.agg_hs_feedback = agg;
     263          398 :     }
     264              : 
     265              :     /// Update aggregated pageserver feedback. LSNs (last_received,
     266              :     /// disk_consistent, remote_consistent) and reply timestamp are just
     267              :     /// maximized; timeline_size if taken from feedback with highest
     268              :     /// last_received lsn. This is generally reasonable, but we might want to
     269              :     /// implement other policies once multiple pageservers start to be actively
     270              :     /// used.
     271       797259 :     fn update_ps_feedback(&mut self) {
     272       797259 :         let init = PageserverFeedback::empty();
     273       797259 :         let acc =
     274       797259 :             self.slots
     275       797259 :                 .iter()
     276       797259 :                 .flatten()
     277      1167694 :                 .fold(init, |mut acc, ws_state| match {
     278      1167248 :                     ReplicationFeedback::Pageserver(feedback) => {
     279      1167248 :                         if feedback.last_received_lsn > acc.last_received_lsn {
     280       861240 :                             acc.current_timeline_size = feedback.current_timeline_size;
     281       861240 :                         }
     282      1167248 :                         acc.last_received_lsn =
     283      1167248 :                             max(feedback.last_received_lsn, acc.last_received_lsn);
     284      1167248 :                         acc.disk_consistent_lsn =
     285      1167248 :                             max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
     286      1167248 :                         acc.remote_consistent_lsn =
     287      1167248 :                             max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
     288      1167248 :                         acc.replytime = max(feedback.replytime, acc.replytime);
     289      1167248 :                         acc
     290              :                     }
     291          446 :                     ReplicationFeedback::Standby(_) => acc,
     292      1167694 :                 });
     293       797259 :         self.agg_ps_feedback = acc;
     294       797259 :     }
     295              : }
     296              : 
     297              : // Serialized is used only for pretty printing in json.
     298          114 : #[derive(Debug, Clone, Serialize, Deserialize)]
     299              : pub struct WalSenderState {
     300              :     ttid: TenantTimelineId,
     301              :     addr: SocketAddr,
     302              :     conn_id: ConnectionId,
     303              :     // postgres application_name
     304              :     appname: Option<String>,
     305              :     feedback: ReplicationFeedback,
     306              : }
     307              : 
     308              : // Receiver is either pageserver or regular standby, which have different
     309              : // feedbacks.
     310          114 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     311              : enum ReplicationFeedback {
     312              :     Pageserver(PageserverFeedback),
     313              :     Standby(StandbyFeedback),
     314              : }
     315              : 
     316              : // id of the occupied slot in WalSenders to access it (and save in the
     317              : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     318              : // much sense in that as values aggregation which is performed on each feedback
     319              : // receival iterates over all walsenders.
     320              : pub type WalSenderId = usize;
     321              : 
     322              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     323              : /// Drop.
     324              : pub struct WalSenderGuard {
     325              :     id: WalSenderId,
     326              :     walsenders: Arc<WalSenders>,
     327              : }
     328              : 
     329              : impl Drop for WalSenderGuard {
     330          392 :     fn drop(&mut self) {
     331          392 :         self.walsenders.unregister(;
     332          392 :     }
     333              : }
     334              : 
     335              : impl SafekeeperPostgresHandler {
     336              :     /// Wrapper around handle_start_replication_guts handling result. Error is
     337              :     /// handled here while we're still in walsender ttid span; with API
     338              :     /// extension, this can probably be moved into postgres_backend.
     339          756 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
     340          756 :         &mut self,
     341          756 :         pgb: &mut PostgresBackend<IO>,
     342          756 :         start_pos: Lsn,
     343          756 :         term: Option<Term>,
     344          756 :     ) -> Result<(), QueryError> {
     345          756 :         if let Err(end) = self
     346          756 :             .handle_start_replication_guts(pgb, start_pos, term)
     347      3151075 :             .await
     348              :         {
     349              :             // Log the result and probably send it to the client, closing the stream.
     350          392 :             pgb.handle_copy_stream_end(end).await;
     351            0 :         }
     352          392 :         Ok(())
     353          392 :     }
     354              : 
     355          756 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     356          756 :         &mut self,
     357          756 :         pgb: &mut PostgresBackend<IO>,
     358          756 :         start_pos: Lsn,
     359          756 :         term: Option<Term>,
     360          756 :     ) -> Result<(), CopyStreamHandlerEnd> {
     361          756 :         let appname = self.appname.clone();
     362          756 :         let tli =
     363          756 :             GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
     364              : 
     365              :         // Use a guard object to remove our entry from the timeline when we are done.
     366          756 :         let ws_guard = Arc::new(tli.get_walsenders().register(
     367          756 :             self.ttid,
     368          756 :             *pgb.get_peer_addr(),
     369          756 :             self.conn_id,
     370          756 :             self.appname.clone(),
     371          756 :         ));
     372              : 
     373              :         // Walsender can operate in one of two modes which we select by
     374              :         // application_name: give only committed WAL (used by pageserver) or all
     375              :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     376              :         // The second case is always driven by a consensus leader which term
     377              :         // must be supplied.
     378          756 :         let end_watch = if term.is_some() {
     379           19 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     380              :         } else {
     381          737 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     382              :         };
     383              :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     384          756 :         let end_pos = end_watch.get();
     385          756 : 
     386          756 :         if end_pos < start_pos {
     387            9 :             warn!(
     388            9 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     389            9 :                 start_pos, end_pos
     390            9 :             );
     391          747 :         }
     392              : 
     393          756 :         info!(
     394          756 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
     395          756 :             start_pos,
     396          756 :             end_pos,
     397          756 :             matches!(end_watch, EndWatch::Flush(_)),
     398          756 :             appname
     399          756 :         );
     400              : 
     401              :         // switch to copy
     402          756 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     403              : 
     404          756 :         let (_, persisted_state) = tli.get_state().await;
     405          756 :         let wal_reader = WalReader::new(
     406          756 :             self.conf.workdir.clone(),
     407          756 :             self.conf.timeline_dir(&tli.ttid),
     408          756 :             &persisted_state,
     409          756 :             start_pos,
     410          756 :             self.conf.is_wal_backup_enabled(),
     411          756 :         )?;
     412              : 
     413              :         // Split to concurrently receive and send data; replies are generally
     414              :         // not synchronized with sends, so this avoids deadlocks.
     415          756 :         let reader = pgb.split().context("START_REPLICATION split")?;
     416              : 
     417          756 :         let mut sender = WalSender {
     418          756 :             pgb,
     419          756 :             tli: tli.clone(),
     420          756 :             appname,
     421          756 :             start_pos,
     422          756 :             end_pos,
     423          756 :             term,
     424          756 :             end_watch,
     425          756 :             ws_guard: ws_guard.clone(),
     426          756 :             wal_reader,
     427          756 :             send_buf: [0; MAX_SEND_SIZE],
     428          756 :         };
     429          756 :         let mut reply_reader = ReplyReader {
     430          756 :             reader,
     431          756 :             ws_guard,
     432          756 :             tli,
     433          756 :         };
     434              : 
     435          756 :         let res = tokio::select! {
     436              :             // todo: add read|write .context to these errors
     437           34 :             r = => r,
     438          358 :             r = => r,
     439              :         };
     440              :         // Join pg backend back.
     441          392 :         pgb.unsplit(reply_reader.reader)?;
     442              : 
     443          392 :         res
     444          392 :     }
     445              : }
     446              : 
     447              : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     448              : /// given term (recovery by walproposer or peer safekeeper).
     449              : enum EndWatch {
     450              :     Commit(Receiver<Lsn>),
     451              :     Flush(Receiver<TermLsn>),
     452              : }
     453              : 
     454              : impl EndWatch {
     455              :     /// Get current end of WAL.
     456      3340749 :     fn get(&self) -> Lsn {
     457      3340749 :         match self {
     458      3338459 :             EndWatch::Commit(r) => *r.borrow(),
     459         2290 :             EndWatch::Flush(r) => r.borrow().lsn,
     460              :         }
     461      3340749 :     }
     462              : 
     463              :     /// Wait for the update.
     464      1836858 :     async fn changed(&mut self) -> anyhow::Result<()> {
     465      1836858 :         match self {
     466      2337970 :             EndWatch::Commit(r) => r.changed().await?,
     467           39 :             EndWatch::Flush(r) => r.changed().await?,
     468              :         }
     469      1832822 :         Ok(())
     470      1832822 :     }
     471              : }
     472              : 
     473              : /// A half driving sending WAL.
     474              : struct WalSender<'a, IO> {
     475              :     pgb: &'a mut PostgresBackend<IO>,
     476              :     tli: Arc<Timeline>,
     477              :     appname: Option<String>,
     478              :     // Position since which we are sending next chunk.
     479              :     start_pos: Lsn,
     480              :     // WAL up to this position is known to be locally available.
     481              :     // Usually this is the same as the latest commit_lsn, but in case of
     482              :     // walproposer recovery, this is flush_lsn.
     483              :     //
     484              :     // We send this LSN to the receiver as wal_end, so that it knows how much
     485              :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     486              :     end_pos: Lsn,
     487              :     /// When streaming uncommitted part, the term the client acts as the leader
     488              :     /// in. Streaming is stopped if local term changes to a different (higher)
     489              :     /// value.
     490              :     term: Option<Term>,
     491              :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     492              :     end_watch: EndWatch,
     493              :     ws_guard: Arc<WalSenderGuard>,
     494              :     wal_reader: WalReader,
     495              :     // buffer for readling WAL into to send it
     496              :     send_buf: [u8; MAX_SEND_SIZE],
     497              : }
     498              : 
     499              : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     500              :     /// Send WAL until
     501              :     /// - an error occurs
     502              :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     503              :     ///
     504              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     505              :     /// convenience.
     506          756 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     507              :         loop {
     508              :             // Wait for the next portion if it is not there yet, or just
     509              :             // update our end of WAL available for sending value, we
     510              :             // communicate it to the receiver.
     511      2338200 :             self.wait_wal().await?;
     512       799477 :             assert!(
     513       799477 :                 self.end_pos > self.start_pos,
     514            0 :                 "nothing to send after waiting for WAL"
     515              :             );
     516              : 
     517              :             // try to send as much as available, capped by MAX_SEND_SIZE
     518       799477 :             let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
     519       799477 :             // if we went behind available WAL, back off
     520       799477 :             if chunk_end_pos >= self.end_pos {
     521       718583 :                 chunk_end_pos = self.end_pos;
     522       718583 :             } else {
     523        80894 :                 // If sending not up to end pos, round down to page boundary to
     524        80894 :                 // avoid breaking WAL record not at page boundary, as protocol
     525        80894 :                 // demands. See walsender.c (XLogSendPhysical).
     526        80894 :                 chunk_end_pos = chunk_end_pos
     527        80894 :                     .checked_sub(chunk_end_pos.block_offset())
     528        80894 :                     .unwrap();
     529        80894 :             }
     530       799477 :             let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
     531       799477 :             let send_buf = &mut self.send_buf[..send_size];
     532              :             let send_size: usize;
     533              :             {
     534              :                 // If uncommitted part is being pulled, check that the term is
     535              :                 // still the expected one.
     536       799477 :                 let _term_guard = if let Some(t) = self.term {
     537         2224 :                     Some(self.tli.acquire_term(t).await?)
     538              :                 } else {
     539       797253 :                     None
     540              :                 };
     541              :                 // Read WAL into buffer. send_size can be additionally capped to
     542              :                 // segment boundary here.
     543       799476 :                 send_size =
     544              :             };
     545       799473 :             let send_buf = &send_buf[..send_size];
     546       799473 : 
     547       799473 :             // and send it
     548       799473 :             self.pgb
     549       799473 :                 .write_message(&BeMessage::XLogData(XLogDataBody {
     550       799473 :                     wal_start: self.start_pos.0,
     551       799473 :                     wal_end: self.end_pos.0,
     552       799473 :                     timestamp: get_current_timestamp(),
     553       799473 :                     data: send_buf,
     554       799473 :                 }))
     555        22821 :                 .await?;
     556              : 
     557       799451 :             if let Some(appname) = &self.appname {
     558       797370 :                 if appname == "replica" {
     559            0 :                     failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
     560       796816 :                 }
     561         2081 :             }
     562            0 :             trace!(
     563            0 :                 "sent {} bytes of WAL {}-{}",
     564            0 :                 send_size,
     565            0 :                 self.start_pos,
     566            0 :                 self.start_pos + send_size as u64
     567            0 :             );
     568       799451 :             self.start_pos += send_size as u64;
     569              :         }
     570           34 :     }
     571              : 
     572              :     /// wait until we have WAL to stream, sending keepalives and checking for
     573              :     /// exit in the meanwhile
     574       800207 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     575              :         loop {
     576       803513 :             self.end_pos = self.end_watch.get();
     577       803513 :             if self.end_pos > self.start_pos {
     578              :                 // We have something to send.
     579            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     580        99855 :                 return Ok(());
     581       703658 :             }
     582              : 
     583              :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     584      2338009 :             if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
     585       699622 :                 self.end_pos = lsn;
     586            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     587       699622 :                 return Ok(());
     588         3333 :             }
     589         3333 : 
     590         3333 :             // Timed out waiting for WAL, check for termination and send KA.
     591         3333 :             // Check for termination only if we are streaming up to commit_lsn
     592         3333 :             // (to pageserver).
     593         3333 :             if let EndWatch::Commit(_) = self.end_watch {
     594         3332 :                 if let Some(remote_consistent_lsn) = self
     595         3332 :                     .ws_guard
     596         3332 :                     .walsenders
     597         3332 :                     .get_ws_remote_consistent_lsn(
     598              :                 {
     599         3332 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     600              :                         // Terminate if there is nothing more to send.
     601              :                         // Note that "ending streaming" part of the string is used by
     602              :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     603              :                         // do not change this string without updating pageserver.
     604           27 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     605           27 :                         "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     606           27 :                         self.appname, self.start_pos,
     607           27 :                     )));
     608         3305 :                     }
     609            0 :                 }
     610            1 :             }
     611              : 
     612         3306 :             self.pgb
     613         3306 :                 .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     614         3306 :                     wal_end: self.end_pos.0,
     615         3306 :                     timestamp: get_current_timestamp(),
     616         3306 :                     request_reply: true,
     617         3306 :                 }))
     618            0 :                 .await?;
     619              :         }
     620       799504 :     }
     621              : }
     622              : 
     623              : /// A half driving receiving replies.
     624              : struct ReplyReader<IO> {
     625              :     reader: PostgresBackendReader<IO>,
     626              :     ws_guard: Arc<WalSenderGuard>,
     627              :     tli: Arc<Timeline>,
     628              : }
     629              : 
     630              : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     631          756 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     632              :         loop {
     633      3015466 :             let msg = self.reader.read_copy_message().await?;
     634       798484 :             self.handle_feedback(&msg).await?
     635              :         }
     636          358 :     }
     637              : 
     638       798484 :     async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
     639       798484 :         match msg.first().cloned() {
     640            2 :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
     641            2 :                 // Note: deserializing is on m[1..] because we skip the tag byte.
     642            2 :                 let hs_feedback = HotStandbyFeedback::des(&msg[1..])
     643            2 :                     .context("failed to deserialize HotStandbyFeedback")?;
     644            2 :                 self.ws_guard
     645            2 :                     .walsenders
     646            2 :                     .record_hs_feedback(, &hs_feedback);
     647              :             }
     648         1225 :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
     649         1225 :                 let reply =
     650         1225 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
     651         1225 :                 self.ws_guard
     652         1225 :                     .walsenders
     653         1225 :                     .record_standby_reply(, &reply);
     654              :             }
     655              :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
     656              :                 // pageserver sends this.
     657              :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
     658       797257 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
     659       797257 :                 let ps_feedback = PageserverFeedback::parse(buf);
     660              : 
     661            0 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
     662       797257 :                 self.ws_guard
     663       797257 :                     .walsenders
     664       797257 :                     .record_ps_feedback(, &ps_feedback);
     665       797257 :                 self.tli
     666       797257 :                     .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
     667       135526 :                     .await;
     668              :                 // in principle new remote_consistent_lsn could allow to
     669              :                 // deactivate the timeline, but we check that regularly through
     670              :                 // broker updated, not need to do it here
     671              :             }
     672            0 :             _ => warn!("unexpected message {:?}", msg),
     673              :         }
     674       798483 :         Ok(())
     675       798483 :     }
     676              : }
     677              : 
     678              : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     679              : 
     680              : /// Wait until we have available WAL > start_pos or timeout expires. Returns
     681              : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     682              : /// - Ok(None) if timeout expired;
     683              : /// - Err in case of error -- only if 1) term changed while fetching in recovery
     684              : ///   mode 2) watch channel closed, which must never happen.
     685       703658 : async fn wait_for_lsn(
     686       703658 :     rx: &mut EndWatch,
     687       703658 :     client_term: Option<Term>,
     688       703658 :     start_pos: Lsn,
     689       703658 : ) -> anyhow::Result<Option<Lsn>> {
     690       703658 :     let res = timeout(POLL_STATE_TIMEOUT, async move {
     691              :         loop {
     692      2536480 :             let end_pos = rx.get();
     693      2536480 :             if end_pos > start_pos {
     694       699622 :                 return Ok(end_pos);
     695      1836858 :             }
     696      1836858 :             if let EndWatch::Flush(rx) = rx {
     697           31 :                 let curr_term = rx.borrow().term;
     698           31 :                 if let Some(client_term) = client_term {
     699           31 :                     if curr_term != client_term {
     700            0 :                         bail!("term changed: requested {}, now {}", client_term, curr_term);
     701           31 :                     }
     702            0 :                 }
     703      1836827 :             }
     704      2338009 :             rx.changed().await?;
     705              :         }
     706       703658 :     })
     707      2338009 :     .await;
     708              : 
     709       699622 :     match res {
     710              :         // success
     711       699622 :         Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     712              :         // error inside closure
     713            0 :         Ok(Err(err)) => Err(err),
     714              :         // timeout
     715         3333 :         Err(_) => Ok(None),
     716              :     }
     717       702955 : }
     718              : 
     719              : #[cfg(test)]
     720              : mod tests {
     721              :     use postgres_protocol::PG_EPOCH;
     722              :     use utils::id::{TenantId, TimelineId};
     723              : 
     724              :     use super::*;
     725              : 
     726           12 :     fn mock_ttid() -> TenantTimelineId {
     727           12 :         TenantTimelineId {
     728           12 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     729           12 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
     730           12 :         }
     731           12 :     }
     732              : 
     733           12 :     fn mock_addr() -> SocketAddr {
     734           12 :         "".parse().unwrap()
     735           12 :     }
     736              : 
     737              :     // add to wss specified feedback setting other fields to dummy values
     738           12 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
     739           12 :         let walsender_state = WalSenderState {
     740           12 :             ttid: mock_ttid(),
     741           12 :             addr: mock_addr(),
     742           12 :             conn_id: 1,
     743           12 :             appname: None,
     744           12 :             feedback,
     745           12 :         };
     746           12 :         wss.slots.push(Some(walsender_state))
     747           12 :     }
     748              : 
     749              :     // form standby feedback with given hot standby feedback ts/xmin and the
     750              :     // rest set to dummy values.
     751            8 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
     752            8 :         ReplicationFeedback::Standby(StandbyFeedback {
     753            8 :             reply: StandbyReply::empty(),
     754            8 :             hs_feedback: HotStandbyFeedback {
     755            8 :                 ts,
     756            8 :                 xmin,
     757            8 :                 catalog_xmin: 0,
     758            8 :             },
     759            8 :         })
     760            8 :     }
     761              : 
     762              :     // test that hs aggregation works as expected
     763            2 :     #[test]
     764            2 :     fn test_hs_feedback_no_valid() {
     765            2 :         let mut wss = WalSendersShared::new();
     766            2 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     767            2 :         wss.update_hs_feedback();
     768            2 :         assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
     769            2 :     }
     770              : 
     771            2 :     #[test]
     772            2 :     fn test_hs_feedback() {
     773            2 :         let mut wss = WalSendersShared::new();
     774            2 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     775            2 :         push_feedback(&mut wss, hs_feedback(1, 42));
     776            2 :         push_feedback(&mut wss, hs_feedback(1, 64));
     777            2 :         wss.update_hs_feedback();
     778            2 :         assert_eq!(wss.agg_hs_feedback.xmin, 42);
     779            2 :     }
     780              : 
     781              :     // form pageserver feedback with given last_record_lsn / tli size and the
     782              :     // rest set to dummy values.
     783            4 :     fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
     784            4 :         ReplicationFeedback::Pageserver(PageserverFeedback {
     785            4 :             current_timeline_size,
     786            4 :             last_received_lsn,
     787            4 :             disk_consistent_lsn: Lsn::INVALID,
     788            4 :             remote_consistent_lsn: Lsn::INVALID,
     789            4 :             replytime: *PG_EPOCH,
     790            4 :         })
     791            4 :     }
     792              : 
     793              :     // test that ps aggregation works as expected
     794            2 :     #[test]
     795            2 :     fn test_ps_feedback() {
     796            2 :         let mut wss = WalSendersShared::new();
     797            2 :         push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
     798            2 :         push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
     799            2 :         wss.update_ps_feedback();
     800            2 :         assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
     801            2 :         assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
     802            2 :     }
     803              : }

Generated by: LCOV version 2.1-beta